[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

invkrh
I am playing with spark 2.0
What I tried to test is:

Create a UDF in which there is a non serializable object.
What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 
It depends also which "action" is called on that dataframe.

Here is the code for reproducing the pb:

============
object DataFrameSerDeTest extends App {
  
  class A(val value: Int) // It is not serializable
  
  def run() = {
    val spark = SparkSession
      .builder()
      .appName("DataFrameSerDeTest")
      .master("local[*]")
      .getOrCreate()
    
    import org.apache.spark.sql.functions.udf
    import spark.sqlContext.implicits._
    
    val notSer = new A(2)
    val add = udf {
      (a: Int) => a + notSer.value
    }
    val df = spark.createDataFrame(Seq(
      (1, 2),
      (2, 2),
      (3, 2),
      (4, 2)
    )).toDF("key", "value")
      .select($"key", add($"value").as("added"))
  
    df.show() // It should not work because the udf contains a non-serializable object, but it works
    
    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)
  }
  
  run()
}
============

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.
It seems only filter() throws the exception. (feature or bug ?)

Any ideas ? Or I just messed things up ?
Any help is highly appreciated.

--
Hao Ren

Data Engineer @ leboncoin

Paris, France
Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

invkrh
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <[hidden email]> wrote:
Hello Hao Ren,

Doesn't the code...

val add = udf {
      (a: Int) => a + notSer.value
    }
Mean UDF function that Int => Int ?

Thanks,
Muthu

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[hidden email]> wrote:
I am playing with spark 2.0
What I tried to test is:

Create a UDF in which there is a non serializable object.
What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 
It depends also which "action" is called on that dataframe.

Here is the code for reproducing the pb:

============
object DataFrameSerDeTest extends App {
  
  class A(val value: Int) // It is not serializable
  
  def run() = {
    val spark = SparkSession
      .builder()
      .appName("DataFrameSerDeTest")
      .master("local[*]")
      .getOrCreate()
    
    import org.apache.spark.sql.functions.udf
    import spark.sqlContext.implicits._
    
    val notSer = new A(2)
    val add = udf {
      (a: Int) => a + notSer.value
    }
    val df = spark.createDataFrame(Seq(
      (1, 2),
      (2, 2),
      (3, 2),
      (4, 2)
    )).toDF("key", "value")
      .select($"key", add($"value").as("added"))
  
    df.show() // It should not work because the udf contains a non-serializable object, but it works
    
    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)
  }
  
  run()
}
============

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.
It seems only filter() throws the exception. (feature or bug ?)

Any ideas ? Or I just messed things up ?
Any help is highly appreciated.

--
Hao Ren

Data Engineer @ leboncoin

Paris, France




--
Hao Ren

Data Engineer @ leboncoin

Paris, France
Reply | Threaded
Open this post in threaded view
|

RE: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

Simon Scott

But does the “notSer” object have to be serialized?

 

The object is immutable by the definition of A, so the only thing that needs to be serialized is the (immutable) Int value? And Ints are serializable?

 

Just thinking out loud

 

Simon Scott

 

Research Developer @ viavisolutions.com

 

From: Hao Ren [mailto:[hidden email]]
Sent: 08 August 2016 09:03
To: Muthu Jayakumar <[hidden email]>
Cc: user <[hidden email]>; dev <[hidden email]>
Subject: Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

 

Yes, it is.

You can define a udf like that.

Basically, it's a udf Int => Int which is a closure contains a non serializable object.

The latter should cause Task not serializable exception.

 

Hao

 

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <[hidden email]> wrote:

Hello Hao Ren,

 

Doesn't the code...

 

val add = udf {

      (a: Int) => a + notSer.value

    }

Mean UDF function that Int => Int ?

 

Thanks,

Muthu

 

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[hidden email]> wrote:

I am playing with spark 2.0

What I tried to test is:

 

Create a UDF in which there is a non serializable object.

What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 

It depends also which "action" is called on that dataframe.

 

Here is the code for reproducing the pb:

 

============

object DataFrameSerDeTest extends App {

  

  class A(val value: Int) // It is not serializable

  

  def run() = {

    val spark = SparkSession

      .builder()

      .appName("DataFrameSerDeTest")

      .master("local[*]")

      .getOrCreate()

    

    import org.apache.spark.sql.functions.udf

    import spark.sqlContext.implicits._

    

    val notSer = new A(2)

    val add = udf {

      (a: Int) => a + notSer.value

    }

    val df = spark.createDataFrame(Seq(

      (1, 2),

      (2, 2),

      (3, 2),

      (4, 2)

    )).toDF("key", "value")

      .select($"key", add($"value").as("added"))

  

    df.show() // It should not work because the udf contains a non-serializable object, but it works

    

    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)

  }

  

  run()

}

============

 

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.

It seems only filter() throws the exception. (feature or bug ?)

 

Any ideas ? Or I just messed things up ?

Any help is highly appreciated.

 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France

 



 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France

Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

rxin
That is unfortunately the way how Scala compiler captures (and defines) closures. Nothing is really final in the JVM. You can always use reflection or unsafe to modify the value of fields. 

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <[hidden email]> wrote:

But does the “notSer” object have to be serialized?

 

The object is immutable by the definition of A, so the only thing that needs to be serialized is the (immutable) Int value? And Ints are serializable?

 

Just thinking out loud

 

Simon Scott

 

Research Developer @ viavisolutions.com

 

From: Hao Ren [mailto:[hidden email]]
Sent: 08 August 2016 09:03
To: Muthu Jayakumar <[hidden email]>
Cc: user <[hidden email]>; dev <[hidden email]>
Subject: Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

 

Yes, it is.

You can define a udf like that.

Basically, it's a udf Int => Int which is a closure contains a non serializable object.

The latter should cause Task not serializable exception.

 

Hao

 

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <[hidden email]> wrote:

Hello Hao Ren,

 

Doesn't the code...

 

val add = udf {

      (a: Int) => a + notSer.value

    }

Mean UDF function that Int => Int ?

 

Thanks,

Muthu

 

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[hidden email]> wrote:

I am playing with spark 2.0

What I tried to test is:

 

Create a UDF in which there is a non serializable object.

What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 

It depends also which "action" is called on that dataframe.

 

Here is the code for reproducing the pb:

 

============

object DataFrameSerDeTest extends App {

  

  class A(val value: Int) // It is not serializable

  

  def run() = {

    val spark = SparkSession

      .builder()

      .appName("DataFrameSerDeTest")

      .master("local[*]")

      .getOrCreate()

    

    import org.apache.spark.sql.functions.udf

    import spark.sqlContext.implicits._

    

    val notSer = new A(2)

    val add = udf {

      (a: Int) => a + notSer.value

    }

    val df = spark.createDataFrame(Seq(

      (1, 2),

      (2, 2),

      (3, 2),

      (4, 2)

    )).toDF("key", "value")

      .select($"key", add($"value").as("added"))

  

    df.show() // It should not work because the udf contains a non-serializable object, but it works

    

    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)

  }

  

  run()

}

============

 

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.

It seems only filter() throws the exception. (feature or bug ?)

 

Any ideas ? Or I just messed things up ?

Any help is highly appreciated.

 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France

 



 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France


Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

invkrh
@Reynold 

Some questions to make things clear:

1. As nothing is really final in the JVM, is the generated code during the execution of `df.show()` different from the one of `df.filter($"key" === 2).show()in my snippet ?

2. When `df.show()` is being executed, it seems that the 'notSer' object is not serialized (since no exception), instead the Int value in it is serialized. Is this correct ?
As for me, this behavior is counterintuitive. 
The analogical problem would be a `RDD.map` 's closure contains 'notSer.value'. For example:
====
rdd.map {
      case (key, value) => value + notSer.value
    }
rdd.count
====
It should thrown a "Task not serializable" exception. But for dataframe, it is not the case because of reflection or unsafe.

3. I am wondering whether this "feature" of scala complier makes the DataFrame API unpredictable ? Any roadmap on this ?
As a user, I can not expect that a `fitler` call before `show` crashes, while a simple `show` on the original df works.

The workaround I can imagine is just to cache and materialize `df` by `df.cache.count()`, and then call `df.filter(...).show()`.
It should work, just a little bit tedious.



On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin <[hidden email]> wrote:
That is unfortunately the way how Scala compiler captures (and defines) closures. Nothing is really final in the JVM. You can always use reflection or unsafe to modify the value of fields. 

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <[hidden email]> wrote:

But does the “notSer” object have to be serialized?

 

The object is immutable by the definition of A, so the only thing that needs to be serialized is the (immutable) Int value? And Ints are serializable?

 

Just thinking out loud

 

Simon Scott

 

Research Developer @ viavisolutions.com

 

From: Hao Ren [mailto:[hidden email]]
Sent: 08 August 2016 09:03
To: Muthu Jayakumar <[hidden email]>
Cc: user <[hidden email]>; dev <[hidden email]>
Subject: Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

 

Yes, it is.

You can define a udf like that.

Basically, it's a udf Int => Int which is a closure contains a non serializable object.

The latter should cause Task not serializable exception.

 

Hao

 

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <[hidden email]> wrote:

Hello Hao Ren,

 

Doesn't the code...

 

val add = udf {

      (a: Int) => a + notSer.value

    }

Mean UDF function that Int => Int ?

 

Thanks,

Muthu

 

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[hidden email]> wrote:

I am playing with spark 2.0

What I tried to test is:

 

Create a UDF in which there is a non serializable object.

What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 

It depends also which "action" is called on that dataframe.

 

Here is the code for reproducing the pb:

 

============

object DataFrameSerDeTest extends App {

  

  class A(val value: Int) // It is not serializable

  

  def run() = {

    val spark = SparkSession

      .builder()

      .appName("DataFrameSerDeTest")

      .master("local[*]")

      .getOrCreate()

    

    import org.apache.spark.sql.functions.udf

    import spark.sqlContext.implicits._

    

    val notSer = new A(2)

    val add = udf {

      (a: Int) => a + notSer.value

    }

    val df = spark.createDataFrame(Seq(

      (1, 2),

      (2, 2),

      (3, 2),

      (4, 2)

    )).toDF("key", "value")

      .select($"key", add($"value").as("added"))

  

    df.show() // It should not work because the udf contains a non-serializable object, but it works

    

    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)

  }

  

  run()

}

============

 

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.

It seems only filter() throws the exception. (feature or bug ?)

 

Any ideas ? Or I just messed things up ?

Any help is highly appreciated.

 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France

 



 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France





--
Hao Ren

Data Engineer @ leboncoin

Paris, France
Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

rxin
The show thing was the result of an optimization that short-circuited any real Spark computation when the input is a local collection, and the result was simply the first few rows. That's why it completed without serializing anything.

It is somewhat inconsistent. One way to eliminate the inconsistency is to always serialize the query plan even for local execution. We did that back in the days for the RDD code path, and we can do similar things for the SQL code path. However, serialization is not free and it will slow down the execution by small percentage.



On Tue, Aug 9, 2016 at 5:05 AM, Hao Ren <[hidden email]> wrote:
@Reynold 

Some questions to make things clear:

1. As nothing is really final in the JVM, is the generated code during the execution of `df.show()` different from the one of `df.filter($"key" === 2).show()in my snippet ?

2. When `df.show()` is being executed, it seems that the 'notSer' object is not serialized (since no exception), instead the Int value in it is serialized. Is this correct ?
As for me, this behavior is counterintuitive. 
The analogical problem would be a `RDD.map` 's closure contains 'notSer.value'. For example:
====
rdd.map {
      case (key, value) => value + notSer.value
    }
rdd.count
====
It should thrown a "Task not serializable" exception. But for dataframe, it is not the case because of reflection or unsafe.

3. I am wondering whether this "feature" of scala complier makes the DataFrame API unpredictable ? Any roadmap on this ?
As a user, I can not expect that a `fitler` call before `show` crashes, while a simple `show` on the original df works.

The workaround I can imagine is just to cache and materialize `df` by `df.cache.count()`, and then call `df.filter(...).show()`.
It should work, just a little bit tedious.



On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin <[hidden email]> wrote:
That is unfortunately the way how Scala compiler captures (and defines) closures. Nothing is really final in the JVM. You can always use reflection or unsafe to modify the value of fields. 

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <[hidden email]> wrote:

But does the “notSer” object have to be serialized?

 

The object is immutable by the definition of A, so the only thing that needs to be serialized is the (immutable) Int value? And Ints are serializable?

 

Just thinking out loud

 

Simon Scott

 

Research Developer @ viavisolutions.com

 

From: Hao Ren [mailto:[hidden email]]
Sent: 08 August 2016 09:03
To: Muthu Jayakumar <[hidden email]>
Cc: user <[hidden email]>; dev <[hidden email]>
Subject: Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

 

Yes, it is.

You can define a udf like that.

Basically, it's a udf Int => Int which is a closure contains a non serializable object.

The latter should cause Task not serializable exception.

 

Hao

 

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <[hidden email]> wrote:

Hello Hao Ren,

 

Doesn't the code...

 

val add = udf {

      (a: Int) => a + notSer.value

    }

Mean UDF function that Int => Int ?

 

Thanks,

Muthu

 

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[hidden email]> wrote:

I am playing with spark 2.0

What I tried to test is:

 

Create a UDF in which there is a non serializable object.

What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. 

It depends also which "action" is called on that dataframe.

 

Here is the code for reproducing the pb:

 

============

object DataFrameSerDeTest extends App {

  

  class A(val value: Int) // It is not serializable

  

  def run() = {

    val spark = SparkSession

      .builder()

      .appName("DataFrameSerDeTest")

      .master("local[*]")

      .getOrCreate()

    

    import org.apache.spark.sql.functions.udf

    import spark.sqlContext.implicits._

    

    val notSer = new A(2)

    val add = udf {

      (a: Int) => a + notSer.value

    }

    val df = spark.createDataFrame(Seq(

      (1, 2),

      (2, 2),

      (3, 2),

      (4, 2)

    )).toDF("key", "value")

      .select($"key", add($"value").as("added"))

  

    df.show() // It should not work because the udf contains a non-serializable object, but it works

    

    df.filter($"key" === 2).show() // It does not work as expected (org.apache.spark.SparkException: Task not serializable)

  }

  

  run()

}

============

 

Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions.

It seems only filter() throws the exception. (feature or bug ?)

 

Any ideas ? Or I just messed things up ?

Any help is highly appreciated.

 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France

 



 

--

Hao Ren

 

Data Engineer @ leboncoin

 

Paris, France





--
Hao Ren

Data Engineer @ leboncoin

Paris, France