(no subject)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

Driesprong, Fokko
Folks,

I've opened a PR a while ago with a PR to merge the possibility to merge a custom data type, into a native data type. This is something new because of the introduction of Delta.

To have some background, I'm having a DataSet that has fields of the type XMLGregorianCalendarType. I don't care about this type and would like to convert this to a standard data type. Mainly because, if I'm reading the data again using another job, it needs to have the customer data type being registered, which is not possible in the SQL API. The magic bit here is that I'm overriding the jsonValue to lose the information about the custom data type. In this case, you have to make sure that it is serialized as the normal timestamp.

Before Delta, when appending to the table, everything would go fine because it would not check compatibility on write. Now with Delta, things are different. When writing, it will check if the two structures can be merged:

OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
Warning: Ignoring non-spark config property: eventLog.rolloverIntervalSeconds=3600
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge incompatible data types TimestampType and org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
at com.ahold.IngestFild$.main(IngestFild.scala:31)
at com.ahold.IngestFild.main(IngestFild.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Currently, the merge function does not support UDT's. Therefore I've extended the rules. Last few weeks it was quiet at the PR. Is this something that we can merge into Spark? I would like to get your opinion on this.

 Cheers, Fokko
Reply | Threaded
Open this post in threaded view
|

Re:

Driesprong, Fokko
Anyone any opinion on this? A link to the PR: https://github.com/apache/spark/pull/26644

Cheers, Fokko


Op vr 20 dec. 2019 om 16:00 schreef Driesprong, Fokko <[hidden email]>:
Folks,

I've opened a PR a while ago with a PR to merge the possibility to merge a custom data type, into a native data type. This is something new because of the introduction of Delta.

To have some background, I'm having a DataSet that has fields of the type XMLGregorianCalendarType. I don't care about this type and would like to convert this to a standard data type. Mainly because, if I'm reading the data again using another job, it needs to have the customer data type being registered, which is not possible in the SQL API. The magic bit here is that I'm overriding the jsonValue to lose the information about the custom data type. In this case, you have to make sure that it is serialized as the normal timestamp.

Before Delta, when appending to the table, everything would go fine because it would not check compatibility on write. Now with Delta, things are different. When writing, it will check if the two structures can be merged:

OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
Warning: Ignoring non-spark config property: eventLog.rolloverIntervalSeconds=3600
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge incompatible data types TimestampType and org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
at com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
at com.ahold.IngestFild$.main(IngestFild.scala:31)
at com.ahold.IngestFild.main(IngestFild.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Currently, the merge function does not support UDT's. Therefore I've extended the rules. Last few weeks it was quiet at the PR. Is this something that we can merge into Spark? I would like to get your opinion on this.

 Cheers, Fokko