Weird ClassCastException when using generics from Java

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird ClassCastException when using generics from Java

Stephen Coy
Hi there,

This will be a little long so please bear with me. There is a buildable example available at https://github.com/sfcoy/sfcoy-spark-cce-test.

Say I have the following three tables:

Machines
Id,MachineType
100001,A
100002,B
200003,B
200004,A
200005,B
Bolts
MachineType,Description
A,20 x M5
A,30 x M5
B,"2"" x 1/4"""
B,"2"" x 1/2"""
B,"2"" x 3/8"""
A,40 x M6
A,50 x M10
B,"1"" x 1/8"""
Nuts
MachineType,Description
A,M5
A,M6
B,"1/4"""
B,"1/8"""
B,"3/8"""

The objective is to create lists of Machines by Id, with all of their bolts and nuts listed on the same line:

100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6

The output is further categorised by the first 5 digits of the machine id, although that seems immaterial to this problem.
In practice I’m dealing with ~70 million machines with a couple of hundred thousand types - therefore Spark!

The code to do this looks like:

final Dataset<Machine> machineRecords = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/machines.csv")
.as(Encoders.bean(Machine.class))
.persist();

final int workerCount = sparkContext.defaultParallelism();

final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/nuts.csv")
.as(Encoders.bean(Nut.class))
.toJavaRDD()
.mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
.persist(StorageLevel.MEMORY_AND_DISK());

final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/bolts.csv")
.as(Encoders.bean(Bolt.class))
.toJavaRDD()
.mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
.persist(StorageLevel.MEMORY_AND_DISK());

machineRecords
.toJavaRDD()
.mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
.join(nutsByMachine)
.join(boltsByMachine)
.map(Tuple2::_2)
.map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
.mapToPair(machineWithNutsBolts -> new Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.foreachPartition(machineIterator -> { // <- line 77
///...
});

static String exportFileFor(Machine machine) {
return machine.getId().substring(0, 5);
}

static <T> List<T> createListAndCombine(T v) {
List<T> c = new ArrayList<>();
c.add(v);
return c;
}

static <T> List<T> mergeValues(List<T> c, T v) {
c.add(v);
return c;
}

static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
c1.addAll(c2);
return c1;
}

Running this yields a ClassCastException:

20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it was not found on disk or in memory
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it was not found on disk or in memory
20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at org.example.SparkCCETest.main(SparkCCETest.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Anyway, to cut a long story short, its occurred to me while creating this reproducer to replace those generic methods at the bottom of the code with explicitly typed versions.

This made the problem go away.

This seems like a work around, but does anyone think this could be a bug?

Thanks,

Steve C

P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about this the wrong way then I would be pleased to hear any better ideas.




This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/
Reply | Threaded
Open this post in threaded view
|

Re: Weird ClassCastException when using generics from Java

Sean Owen-2
I don't immediately see what the issue could be - try .count()-ing the
individual RDDs to narrow it down?
What code change made it work?

Also I think this could probably be a few lines of SQL with an
aggregate, collect_list(), and joins.

On Thu, May 21, 2020 at 11:27 PM Stephen Coy
<[hidden email]> wrote:

>
> Hi there,
>
> This will be a little long so please bear with me. There is a buildable example available at https://github.com/sfcoy/sfcoy-spark-cce-test.
>
> Say I have the following three tables:
>
> Machines
>
> Id,MachineType
> 100001,A
> 100002,B
> 200003,B
> 200004,A
> 200005,B
>
> Bolts
>
> MachineType,Description
> A,20 x M5
> A,30 x M5
> B,"2"" x 1/4"""
> B,"2"" x 1/2"""
> B,"2"" x 3/8"""
> A,40 x M6
> A,50 x M10
> B,"1"" x 1/8"""
>
> Nuts
>
> MachineType,Description
> A,M5
> A,M6
> B,"1/4"""
> B,"1/8"""
> B,"3/8"""
>
>
> The objective is to create lists of Machines by Id, with all of their bolts and nuts listed on the same line:
>
> 100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6
>
> The output is further categorised by the first 5 digits of the machine id, although that seems immaterial to this problem.
> In practice I’m dealing with ~70 million machines with a couple of hundred thousand types - therefore Spark!
>
> The code to do this looks like:
>
> final Dataset<Machine> machineRecords = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/machines.csv")
>   .as(Encoders.bean(Machine.class))
>   .persist();
>
> final int workerCount = sparkContext.defaultParallelism();
>
> final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/nuts.csv")
>   .as(Encoders.bean(Nut.class))
>   .toJavaRDD()
>   .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
>   .persist(StorageLevel.MEMORY_AND_DISK());
>
> final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/bolts.csv")
>   .as(Encoders.bean(Bolt.class))
>   .toJavaRDD()
>   .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
>   .persist(StorageLevel.MEMORY_AND_DISK());
>
> machineRecords
>   .toJavaRDD()
>   .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
>   .join(nutsByMachine)
>   .join(boltsByMachine)
>   .map(Tuple2::_2)
>   .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
>   .mapToPair(machineWithNutsBolts -> new Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .foreachPartition(machineIterator -> { // <- line 77
>       ///...
>   });
>
>
> static String exportFileFor(Machine machine) {
>     return machine.getId().substring(0, 5);
> }
>
> static <T> List<T> createListAndCombine(T v) {
>     List<T> c = new ArrayList<>();
>     c.add(v);
>     return c;
> }
>
> static <T> List<T> mergeValues(List<T> c, T v) {
>     c.add(v);
>     return c;
> }
>
> static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
>     c1.addAll(c2);
>     return c1;
> }
>
> Running this yields a ClassCastException:
>
> 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
> 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
> 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it was not found on disk or in memory
> 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it was not found on disk or in memory
> 20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
> java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
> at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> ...
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
> java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
> at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> ...
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> …
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
> at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
> at scala.Option.foreach(Option.scala:407)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
> at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
> at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
> at org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
> at org.example.SparkCCETest.main(SparkCCETest.java:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
> at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
> at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
> at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
> at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
> at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
> at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
> at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
> at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
> at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Anyway, to cut a long story short, its occurred to me while creating this reproducer to replace those generic methods at the bottom of the code with explicitly typed versions.
>
> This made the problem go away.
>
> This seems like a work around, but does anyone think this could be a bug?
>
> Thanks,
>
> Steve C
>
> P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about this the wrong way then I would be pleased to hear any better ideas.
>
>
>
>
> This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Weird ClassCastException when using generics from Java

Stephen Coy
Hi Sean,

The fix was to provide explicitly typed versions of the the three generic methods at the bottom of the code:

i.e 
static <T> List<T> createListAndCombine(T v) {
List<T> c = new ArrayList<>();
c.add(v);
return c;
}


becomes

static List<Nut> createListAndCombine(Nut v) {
List<Nut> c = new ArrayList<>();
c.add(v);
return c;
}

static List<Bolt> createListAndCombine(Bolt v) {
List<Bolt> c = new ArrayList<>();
c.add(v);
return c;
}

etc.

I forgot to mention that this issue appears in both Java 8/2.4.5 and Java11/3.0.0rc2.

Thanks for the SQL tip. I will investigate further.

Cheers,

Steve C

On 22 May 2020, at 11:22 pm, Sean Owen <[hidden email]> wrote:

I don't immediately see what the issue could be - try .count()-ing the
individual RDDs to narrow it down?
What code change made it work?

Also I think this could probably be a few lines of SQL with an
aggregate, collect_list(), and joins.

On Thu, May 21, 2020 at 11:27 PM Stephen Coy
<[hidden email]> wrote:

Hi there,

This will be a little long so please bear with me. There is a buildable example available at https://aus01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsfcoy%2Fsfcoy-spark-cce-test&amp;data=02%7C01%7Cscoy%40infomedia.com.au%7C8988d3cfc34640f48d9308d7fe532952%7C45d5407150f849caa59f9457123dc71c%7C0%7C0%7C637257505444324095&amp;sdata=B8wrcAhCJ0tAQutTD22KNha48gaO%2FSSepdFBZNmKy58%3D&amp;reserved=0.

Say I have the following three tables:

Machines

Id,MachineType
100001,A
100002,B
200003,B
200004,A
200005,B

Bolts

MachineType,Description
A,20 x M5
A,30 x M5
B,"2"" x 1/4"""
B,"2"" x 1/2"""
B,"2"" x 3/8"""
A,40 x M6
A,50 x M10
B,"1"" x 1/8"""

Nuts

MachineType,Description
A,M5
A,M6
B,"1/4"""
B,"1/8"""
B,"3/8"""


The objective is to create lists of Machines by Id, with all of their bolts and nuts listed on the same line:

100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6

The output is further categorised by the first 5 digits of the machine id, although that seems immaterial to this problem.
In practice I’m dealing with ~70 million machines with a couple of hundred thousand types - therefore Spark!

The code to do this looks like:

final Dataset<Machine> machineRecords = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/machines.csv")
 .as(Encoders.bean(Machine.class))
 .persist();

final int workerCount = sparkContext.defaultParallelism();

final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/nuts.csv")
 .as(Encoders.bean(Nut.class))
 .toJavaRDD()
 .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
 .persist(StorageLevel.MEMORY_AND_DISK());

final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/bolts.csv")
 .as(Encoders.bean(Bolt.class))
 .toJavaRDD()
 .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
 .persist(StorageLevel.MEMORY_AND_DISK());

machineRecords
 .toJavaRDD()
 .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
 .join(nutsByMachine)
 .join(boltsByMachine)
 .map(Tuple2::_2)
 .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
 .mapToPair(machineWithNutsBolts -> new Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .foreachPartition(machineIterator -> { // <- line 77
     ///...
 });


static String exportFileFor(Machine machine) {
   return machine.getId().substring(0, 5);
}

static <T> List<T> createListAndCombine(T v) {
   List<T> c = new ArrayList<>();
   c.add(v);
   return c;
}

static <T> List<T> mergeValues(List<T> c, T v) {
   c.add(v);
   return c;
}

static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
   c1.addAll(c2);
   return c1;
}

Running this yields a ClassCastException:

20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it was not found on disk or in memory
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it was not found on disk or in memory
20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at org.example.SparkCCETest.main(SparkCCETest.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Anyway, to cut a long story short, its occurred to me while creating this reproducer to replace those generic methods at the bottom of the code with explicitly typed versions.

This made the problem go away.

This seems like a work around, but does anyone think this could be a bug?

Thanks,

Steve C

P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about this the wrong way then I would be pleased to hear any better ideas.




This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/