Fwd: [SparkSQL] Project using NamedExpression

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Fwd: [SparkSQL] Project using NamedExpression

Aviral Agarwal
Hi guys,

I want transform Row using NamedExpression.

Below is the code snipped that I am using :


def apply(dataFrame: DataFrame, selectExpressions: java.util.List[String]): RDD[UnsafeRow] = {

    val exprArray = selectExpressions.map(s =>
      Column(SqlParser.parseExpression(s)).named
    )

    val inputSchema = dataFrame.logicalPlan.output

    val transformedRDD = dataFrame.mapPartitions(
      iter => {
        val project = UnsafeProjection.create(exprArray,inputSchema)
        iter.map{
          row =>
            project(InternalRow.fromSeq(row.toSeq))
        }
    })

    transformedRDD
  }

The problem is that expression becomes unevaluable :

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 'a
        at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233)
        at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.genCode(unresolved.scala:53)
        at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
        at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext.generateExpressions(CodeGenerator.scala:464)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:281)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:324)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:317)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:125)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:135)
        at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(ScalaTransform.scala:31)
        at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(ScalaTransform.scala:30)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


This might be because the Expression is unresolved.

Any help would be appreciated.

Thanks and Regards,
Aviral Agarwal

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: [SparkSQL] Project using NamedExpression

Liang-Chi Hsieh

Hi,

You need to resolve the expressions before passing into creating UnsafeProjection.


Aviral Agarwal wrote
Hi guys,

I want transform Row using NamedExpression.

Below is the code snipped that I am using :


def apply(dataFrame: DataFrame, selectExpressions:
java.util.List[String]): RDD[UnsafeRow] = {

    val exprArray = selectExpressions.map(s =>
      Column(SqlParser.parseExpression(s)).named
    )

    val inputSchema = dataFrame.logicalPlan.output

    val transformedRDD = dataFrame.mapPartitions(
      iter => {
        val project = UnsafeProjection.create(exprArray,inputSchema)
        iter.map{
          row =>
            project(InternalRow.fromSeq(row.toSeq))
        }
    })

    transformedRDD
  }


The problem is that expression becomes unevaluable :

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
expression: 'a
        at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
genCode(Expression.scala:233)
        at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
enCode(unresolved.scala:53)
        at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
n$gen$2.apply(Expression.scala:106)
        at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
n$gen$2.apply(Expression.scala:102)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
ression.scala:102)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(Resiza
bleArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.
scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
text.generateExpressions(CodeGenerator.scala:464)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
safeProjection$.create(GenerateUnsafeProjection.scala:324)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
safeProjection$.create(GenerateUnsafeProjection.scala:317)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
safeProjection$.create(GenerateUnsafeProjection.scala:32)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
tor.generate(CodeGenerator.scala:635)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
create(Projection.scala:125)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
create(Projection.scala:135)
        at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
ScalaTransform.scala:31)
        at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
ScalaTransform.scala:30)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
DD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
la:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


This might be because the Expression is unresolved.

Any help would be appreciated.

Thanks and Regards,
Aviral Agarwal
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: [SparkSQL] Project using NamedExpression

Aviral Agarwal
Hi , 
Can you please point me on how to resolve the expression ?
I was looking into LogicalPlan.Resolve expression() that takes a Partial Function but I am not sure how to use that.

Thanks,
Aviral Agarwal

On Mar 24, 2017 09:20, "Liang-Chi Hsieh" <[hidden email]> wrote:

Hi,

You need to resolve the expressions before passing into creating
UnsafeProjection.



Aviral Agarwal wrote
> Hi guys,
>
> I want transform Row using NamedExpression.
>
> Below is the code snipped that I am using :
>
>
> def apply(dataFrame: DataFrame, selectExpressions:
> java.util.List[String]): RDD[UnsafeRow] = {
>
>     val exprArray = selectExpressions.map(s =>
>       Column(SqlParser.parseExpression(s)).named
>     )
>
>     val inputSchema = dataFrame.logicalPlan.output
>
>     val transformedRDD = dataFrame.mapPartitions(
>       iter => {
>         val project = UnsafeProjection.create(exprArray,inputSchema)
>         iter.map{
>           row =>
>             project(InternalRow.fromSeq(row.toSeq))
>         }
>     })
>
>     transformedRDD
>   }
>
>
> The problem is that expression becomes unevaluable :
>
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> expression: 'a
>         at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
> genCode(Expression.scala:233)
>         at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
> enCode(unresolved.scala:53)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:106)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:102)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
> ression.scala:102)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.mutable.ResizableArray$class.foreach(Resiza
> bleArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
> scala:47)
>         at scala.collection.TraversableLike$class.map(TraversableLike.
> scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text.generateExpressions(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:324)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:317)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:32)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor.generate(CodeGenerator.scala:635)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:125)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:135)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:31)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:30)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
> DD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
> la:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> This might be because the Expression is unresolved.
>
> Any help would be appreciated.
>
> Thanks and Regards,
> Aviral Agarwal





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-SparkSQL-Project-using-NamedExpression-tp21224p21230.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: [SparkSQL] Project using NamedExpression

Aviral Agarwal
Hi,
Can anyone please point me to the right class(es) where I can start digging in ?
That would be super helpful too.

Thanks,
Aviral Agarwal

On Fri, Mar 24, 2017 at 3:58 PM, Aviral Agarwal <[hidden email]> wrote:
Hi , 
Can you please point me on how to resolve the expression ?
I was looking into LogicalPlan.Resolve expression() that takes a Partial Function but I am not sure how to use that.

Thanks,
Aviral Agarwal

On Mar 24, 2017 09:20, "Liang-Chi Hsieh" <[hidden email]> wrote:

Hi,

You need to resolve the expressions before passing into creating
UnsafeProjection.



Aviral Agarwal wrote
> Hi guys,
>
> I want transform Row using NamedExpression.
>
> Below is the code snipped that I am using :
>
>
> def apply(dataFrame: DataFrame, selectExpressions:
> java.util.List[String]): RDD[UnsafeRow] = {
>
>     val exprArray = selectExpressions.map(s =>
>       Column(SqlParser.parseExpression(s)).named
>     )
>
>     val inputSchema = dataFrame.logicalPlan.output
>
>     val transformedRDD = dataFrame.mapPartitions(
>       iter => {
>         val project = UnsafeProjection.create(exprArray,inputSchema)
>         iter.map{
>           row =>
>             project(InternalRow.fromSeq(row.toSeq))
>         }
>     })
>
>     transformedRDD
>   }
>
>
> The problem is that expression becomes unevaluable :
>
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> expression: 'a
>         at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
> genCode(Expression.scala:233)
>         at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
> enCode(unresolved.scala:53)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:106)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:102)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
> ression.scala:102)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.mutable.ResizableArray$class.foreach(Resiza
> bleArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
> scala:47)
>         at scala.collection.TraversableLike$class.map(TraversableLike.
> scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text.generateExpressions(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:324)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:317)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:32)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor.generate(CodeGenerator.scala:635)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:125)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:135)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:31)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:30)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
> DD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
> la:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> This might be because the Expression is unresolved.
>
> Any help would be appreciated.
>
> Thanks and Regards,
> Aviral Agarwal





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-SparkSQL-Project-using-NamedExpression-tp21224p21230.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: [SparkSQL] Project using NamedExpression

Liang-Chi Hsieh
In reply to this post by Aviral Agarwal

I am not sure why you want to transform rows in the dataframe using mapPartitions like that.

If you want to project the rows with some expressions, you can use the API like selectExpr and let Spark SQL to resolve expressions. To resolve expressions manually, you need to (at least) deal with a resolver, and transform the expressions recursively with LogicalPlan.resolve API.

Aviral Agarwal wrote
Hi ,
Can you please point me on how to resolve the expression ?
I was looking into LogicalPlan.Resolve expression() that takes a Partial
Function but I am not sure how to use that.

Thanks,
Aviral Agarwal

On Mar 24, 2017 09:20, "Liang-Chi Hsieh" <[hidden email]> wrote:


Hi,

You need to resolve the expressions before passing into creating
UnsafeProjection.



Aviral Agarwal wrote
> Hi guys,
>
> I want transform Row using NamedExpression.
>
> Below is the code snipped that I am using :
>
>
> def apply(dataFrame: DataFrame, selectExpressions:
> java.util.List[String]): RDD[UnsafeRow] = {
>
>     val exprArray = selectExpressions.map(s =>
>       Column(SqlParser.parseExpression(s)).named
>     )
>
>     val inputSchema = dataFrame.logicalPlan.output
>
>     val transformedRDD = dataFrame.mapPartitions(
>       iter => {
>         val project = UnsafeProjection.create(exprArray,inputSchema)
>         iter.map{
>           row =>
>             project(InternalRow.fromSeq(row.toSeq))
>         }
>     })
>
>     transformedRDD
>   }
>
>
> The problem is that expression becomes unevaluable :
>
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> expression: 'a
>         at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
> genCode(Expression.scala:233)
>         at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
> enCode(unresolved.scala:53)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:106)
>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:102)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
> ression.scala:102)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>         at scala.collection.mutable.ResizableArray$class.foreach(Resiza
> bleArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
> scala:47)
>         at scala.collection.TraversableLike$class.map(TraversableLike.
> scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text.generateExpressions(CodeGenerator.scala:464)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:324)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:317)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:32)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor.generate(CodeGenerator.scala:635)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:125)
>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:135)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:31)
>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:30)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
> DD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
> la:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> This might be because the Expression is unresolved.
>
> Any help would be appreciated.
>
> Thanks and Regards,
> Aviral Agarwal





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Fwd-SparkSQL-
Project-using-NamedExpression-tp21224p21230.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: [SparkSQL] Project using NamedExpression

Aviral Agarwal
Hi,
I made some progress in binding the expressions to a LogicalPlan and then analyzing the plan.
Problem is the Unique Id that are assigned to every expression.
def apply(dataFrame: DataFrame, selectExpressions: java.util.List[String]): RDD[InternalRow] = {

val schema = dataFrame.schema

val exprArray = selectExpressions.map(s =>
Column(SqlParser.parseExpression(s)).named
)
val projectLogicalPlan = Project(exprArray, dataFrame.logicalPlan)
val analyzedLogicalPlan = ContextWrapper.getSqlContext.analyzer.execute(projectLogicalPlan)
  val transformedRDD = dataFrame.mapPartitions(
iter => {
val project = UnsafeProjection.create(analyzedLogicalPlan.expressions, schema.toAttributes,
subexpressionEliminationEnabled = false)
iter.map { row =>
val irow = InternalRow.fromSeq(row.toSeq)
project(irow)
}
})
   transformedRDD



The error I get is when creating UnsafeProjection.
It seems that the Unique Id assigned to each column is different in case of analyzedLogicalPlan.expressions and schema.toAttributes.
This runs into an error when binding the columns.


org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: EMP_NUM#3
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$$anonfun$4.apply(Projection.scala:146)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$$anonfun$4.apply(Projection.scala:146)
        at scala.collection.immutable.Stream.map(Stream.scala:376)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:146)
        at org.apache.spark.sql.ScalaTransform$$anonfun$2.apply(ScalaTransform.scala:49)
        at org.apache.spark.sql.ScalaTransform$$anonfun$2.apply(ScalaTransform.scala:48)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Thanks and Regards,
Aviral Agarwal


On Tue, Mar 28, 2017 at 2:13 PM, Liang-Chi Hsieh <[hidden email]> wrote:

I am not sure why you want to transform rows in the dataframe using
mapPartitions like that.

If you want to project the rows with some expressions, you can use the API
like selectExpr and let Spark SQL to resolve expressions. To resolve
expressions manually, you need to (at least) deal with a resolver, and
transform the expressions recursively with LogicalPlan.resolve API.


Aviral Agarwal wrote
> Hi ,
> Can you please point me on how to resolve the expression ?
> I was looking into LogicalPlan.Resolve expression() that takes a Partial
> Function but I am not sure how to use that.
>
> Thanks,
> Aviral Agarwal
>
> On Mar 24, 2017 09:20, "Liang-Chi Hsieh" &lt;

> viirya@

> &gt; wrote:
>
>
> Hi,
>
> You need to resolve the expressions before passing into creating
> UnsafeProjection.
>
>
>
> Aviral Agarwal wrote
>> Hi guys,
>>
>> I want transform Row using NamedExpression.
>>
>> Below is the code snipped that I am using :
>>
>>
>> def apply(dataFrame: DataFrame, selectExpressions:
>> java.util.List[String]): RDD[UnsafeRow] = {
>>
>>     val exprArray = selectExpressions.map(s =>
>>       Column(SqlParser.parseExpression(s)).named
>>     )
>>
>>     val inputSchema = dataFrame.logicalPlan.output
>>
>>     val transformedRDD = dataFrame.mapPartitions(
>>       iter => {
>>         val project = UnsafeProjection.create(exprArray,inputSchema)
>>         iter.map{
>>           row =>
>>             project(InternalRow.fromSeq(row.toSeq))
>>         }
>>     })
>>
>>     transformedRDD
>>   }
>>
>>
>> The problem is that expression becomes unevaluable :
>>
>> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
>> expression: 'a
>>         at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
>> genCode(Expression.scala:233)
>>         at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
>> enCode(unresolved.scala:53)
>>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
>> n$gen$2.apply(Expression.scala:106)
>>         at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
>> n$gen$2.apply(Expression.scala:102)
>>         at scala.Option.getOrElse(Option.scala:120)
>>         at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
>> ression.scala:102)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
>> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
>> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
>>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>         at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>         at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
>> scala:47)
>>         at scala.collection.TraversableLike$class.map(TraversableLike.
>> scala:244)
>>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
>> text.generateExpressions(CodeGenerator.scala:464)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
>> safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
>> safeProjection$.create(GenerateUnsafeProjection.scala:324)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
>> safeProjection$.create(GenerateUnsafeProjection.scala:317)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
>> safeProjection$.create(GenerateUnsafeProjection.scala:32)
>>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
>> tor.generate(CodeGenerator.scala:635)
>>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
>> create(Projection.scala:125)
>>         at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
>> create(Projection.scala:135)
>>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
>> ScalaTransform.scala:31)
>>         at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
>> ScalaTransform.scala:30)
>>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$20.apply(RDD.scala:710)
>>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$20.apply(RDD.scala:710)
>>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>>         at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
>> la:66)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>> scala:214)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>> This might be because the Expression is unresolved.
>>
>> Any help would be appreciated.
>>
>> Thanks and Regards,
>> Aviral Agarwal
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Fwd-SparkSQL-
> Project-using-NamedExpression-tp21224p21230.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail:

> dev-unsubscribe@.apache





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-SparkSQL-Project-using-NamedExpression-tp21224p21248.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Loading...