Re: Issue creating row with java.util.Map type

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: Issue creating row with java.util.Map type

Richard Xin
try
Row newRow = RowFactory.create(row.getString(0), row.getString(1), row.getMap(2));


On Friday, January 27, 2017 10:52 AM, Ankur Srivastava <[hidden email]> wrote:


+ DEV Mailing List

On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <[hidden email]> wrote:
Hi,

I am trying to map a Dataset with rows which have a map attribute. When I try to create a Row with the map attribute I get cast errors. I am able to reproduce the issue with the below sample code. The surprising thing is with same schema I am able to create a dataset from the List of rows.

I am on Spark 2.0 and scala 2.11
public static void main(String[] args) {
StructType schema = new StructType().add("src", DataTypes.StringType)
.add("dst", DataTypes.StringType)
.add("freq", DataTypes.createMapType( DataTypes.StringType, DataTypes.IntegerType));
List<Row> inputData = new ArrayList<>();
inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
SparkSession sparkSession = SparkSession
.builder()
.appName("IPCountFilterTest")
.master("local")
.getOrCreate();

Dataset<Row> out = sparkSession.createDataFrame( inputData, schema);
out.show();

Encoder<Row> rowEncoder = RowEncoder.apply(schema);
out.map((MapFunction<Row, Row>) row -> {
Row newRow = RowFactory.create(row. getString(0), row.getString(1), new HashMap<String, Integer>());
       //Row newRow = RowFactory.create(row. getString(0), row.getString(1), row.getJavaMap(2));
        return newRow;
}, rowEncoder).show();
}
Below is the error:

17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int>
at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source)
at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43)
at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246)
at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784)
at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784)
at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
at org.apache.spark.scheduler. Task.run(Task.scala:85)
at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142)
at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread. java:745)
17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int>
at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source)
at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43)
at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246)
at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784)
at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784)
at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
at org.apache.spark.scheduler. Task.run(Task.scala:85)
at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142)
at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread. java:745)


Thanks
Ankur