Question about Expression Encoders

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Expression Encoders

Mark Hamilton

Dear Spark Developers,

 

In our teams Spark Library we utilize ExpressionEncoders to help us automatically generate spark SQL types from scala case classes.

 

https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/core/schema/SparkBindings.scala

 

However it seems in 3.0 the ability to get types from internal rows and rows has been removed. Is there any guidance on how to get similar behavior in 3.0? Thanks for your help!

 

Best,

Mark

Reply | Threaded
Open this post in threaded view
|

Re: Question about Expression Encoders

Takeshi Yamamuro
Hi,

Have you tried it like this?

------
{ r: InternalRow => enc1.fromRow(r) }

===>

{ r: InternalRow =>
  val fromRow = enc1.createDeserializer()
  fromRow(r)
}


Bests,
Takeshi

On Thu, Aug 20, 2020 at 1:52 PM Mark Hamilton <[hidden email]> wrote:

Dear Spark Developers,

 

In our teams Spark Library we utilize ExpressionEncoders to help us automatically generate spark SQL types from scala case classes.

 

https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/core/schema/SparkBindings.scala

 

However it seems in 3.0 the ability to get types from internal rows and rows has been removed. Is there any guidance on how to get similar behavior in 3.0? Thanks for your help!

 

Best,

Mark



--
---
Takeshi Yamamuro
Reply | Threaded
Open this post in threaded view
|

Re: Question about Expression Encoders

Robert Berke
Hi everyone

Thanks Takeshi. I run into the same issue as Mark for my row to case class converter:
def rowToCaseClass[C <: Product : TypeTag](r: Row)(implicit encs: (ExpressionEncoder[Row], ExpressionEncoder[C]))C = {
  val ir = encs._1.toRow(r)
encs._2.fromRow(ir)
}
So in Spark3.0 I would do:
def rowToCaseClass[C <: Product : TypeTag](r: Row)(implicit enc: ExpressionEncoder[C]): C = enc.createDeserializer()(InternalRow(r))

Unfortunately this yields the following error below when calling 

val r: Row = Row(1.0,2000L)
case class C(d: Double, l: Long)
implicit val enc: ExpressionEncoder[C] = Encoders.product[C].asInstanceOf[ExpressionEncoder[C]].resolveAndBind()

rowToCaseClass[C](r)


Cheers, Robert 


Caused by: java.lang.RuntimeException: Error while decoding: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to java.lang.Double newInstance(class line58c8cc5e7d9841e5b85d34e76c692e7d31.$read$$iw$$iw$$iw$$iw$$iw$$iw$C) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:201) at io.beprop.spark.common.package$.rowToCaseClass(package.scala:132) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-503321:1) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-503321:50) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw.<init>(command-503321:52) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw.<init>(command-503321:54) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw.<init>(command-503321:56) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw.<init>(command-503321:58) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read.<init>(command-503321:60) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$.<init>(command-503321:64) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$.<clinit>(command-503321) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval$.$print$lzycompute(<notebook>:7) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval$.$print(<notebook>:6) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval.$print(<notebook>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021) at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574) at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41) at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219) at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:204) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:769) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:722) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:204) at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:237) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:232) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:229) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:274) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:267) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408) at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653) at scala.util.Try$.apply(Try.scala:213) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDouble$(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:195)


 

Am Mo., 24. Aug. 2020 um 03:05 Uhr schrieb Takeshi Yamamuro <[hidden email]>:
Hi,

Have you tried it like this?

------
{ r: InternalRow => enc1.fromRow(r) }

===>

{ r: InternalRow =>
  val fromRow = enc1.createDeserializer()
  fromRow(r)
}


Bests,
Takeshi

On Thu, Aug 20, 2020 at 1:52 PM Mark Hamilton <[hidden email]> wrote:

Dear Spark Developers,

 

In our teams Spark Library we utilize ExpressionEncoders to help us automatically generate spark SQL types from scala case classes.

 

https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/core/schema/SparkBindings.scala

 

However it seems in 3.0 the ability to get types from internal rows and rows has been removed. Is there any guidance on how to get similar behavior in 3.0? Thanks for your help!

 

Best,

Mark



--
---
Takeshi Yamamuro
Reply | Threaded
Open this post in threaded view
|

Re: Question about Expression Encoders

Robert Berke
works perfectly! Thanks Herman.

Am Di., 25. Aug. 2020 um 12:03 Uhr schrieb Herman van Hovell <[hidden email]>:
Hi Robert,

Your Spark 3.0 code is missing the encoder that converts the Row to an InternalRow. Your Spark 3.0 code should look like this:

def rowToCaseClass[C <: Product : TypeTag](r: Row)(implicit encs: (ExpressionEncoder[Row], ExpressionEncoder[C])): C = {
  val serializer = encs._1.createSerializer()
  val deserializer = encs._2.createDeserializer()
  deserializer(serializer(r))
}

I would recommend you create the serializer and deserializer pair once per thread.

Kind regards,
Herman

On Tue, Aug 25, 2020 at 11:35 AM Robert Berke <[hidden email]> wrote:
Hi everyone

Thanks Takeshi. I run into the same issue as Mark for my row to case class converter:
def rowToCaseClass[C <: Product : TypeTag](r: Row)(implicit encs: (ExpressionEncoder[Row], ExpressionEncoder[C]))C = {
  val ir = encs._1.toRow(r)
encs._2.fromRow(ir)
}
So in Spark3.0 I would do:
def rowToCaseClass[C <: Product : TypeTag](r: Row)(implicit enc: ExpressionEncoder[C]): C = enc.createDeserializer()(InternalRow(r))

Unfortunately this yields the following error below when calling 

val r: Row = Row(1.0,2000L)
case class C(d: Double, l: Long)
implicit val enc: ExpressionEncoder[C] = Encoders.product[C].asInstanceOf[ExpressionEncoder[C]].resolveAndBind()

rowToCaseClass[C](r)


Cheers, Robert 


Caused by: java.lang.RuntimeException: Error while decoding: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to java.lang.Double newInstance(class line58c8cc5e7d9841e5b85d34e76c692e7d31.$read$$iw$$iw$$iw$$iw$$iw$$iw$C) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:201) at io.beprop.spark.common.package$.rowToCaseClass(package.scala:132) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-503321:1) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-503321:50) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw$$iw.<init>(command-503321:52) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw$$iw.<init>(command-503321:54) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw$$iw.<init>(command-503321:56) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$$iw.<init>(command-503321:58) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read.<init>(command-503321:60) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$.<init>(command-503321:64) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$read$.<clinit>(command-503321) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval$.$print$lzycompute(<notebook>:7) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval$.$print(<notebook>:6) at line58c8cc5e7d9841e5b85d34e76c692e7d35.$eval.$print(<notebook>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021) at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574) at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41) at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219) at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:204) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:769) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:722) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:204) at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:237) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:232) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:229) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:274) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:267) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408) at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653) at scala.util.Try$.apply(Try.scala:213) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDouble(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDouble$(rows.scala:44) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:195)


 

Am Mo., 24. Aug. 2020 um 03:05 Uhr schrieb Takeshi Yamamuro <[hidden email]>:
Hi,

Have you tried it like this?

------
{ r: InternalRow => enc1.fromRow(r) }

===>

{ r: InternalRow =>
  val fromRow = enc1.createDeserializer()
  fromRow(r)
}


Bests,
Takeshi

On Thu, Aug 20, 2020 at 1:52 PM Mark Hamilton <[hidden email]> wrote:

Dear Spark Developers,

 

In our teams Spark Library we utilize ExpressionEncoders to help us automatically generate spark SQL types from scala case classes.

 

https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/core/schema/SparkBindings.scala

 

However it seems in 3.0 the ability to get types from internal rows and rows has been removed. Is there any guidance on how to get similar behavior in 3.0? Thanks for your help!

 

Best,

Mark



--
---
Takeshi Yamamuro