spark.readStream.schema(??)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

spark.readStream.schema(??)

Zahid Rahman
version: spark-3.0.0-preview2-bin-hadoop2.7

The syntax checker objects to the following argument which is what I am supposed to enter.
.schema(staticSchema)
However when I  provide the  following argument it works but I don't think that is correct.
What is the correct argument for this case ?

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {


def main(args: Array[String]): Unit = {

// crete spark session
val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark,sql.shuffle.partitions","5")

// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)

val streamingDataFrame = spark.readStream
.schema(staticDataFrame.schema)
.option("maxFilesPerTrigger", 1)
.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

} // main

} // object






¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
Reply | Threaded
Open this post in threaded view
|

Re: spark.readStream.schema(??)

Zahid Rahman
I found another bug.

¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}


On Sat, 28 Mar 2020 at 02:12, Zahid Rahman <[hidden email]> wrote:

I have sorted the error anyway because I am the best there is.
It is downhill for me from here.

There is no nobody using this email list anyway for anything. The email is a dead a dodo.
probably because of people like you.

That is exactly what this email is for.
It is not just for me to test your buggy software and report the bugs free of cost.
and not get anything in return.
Another words free consultancy for you because you now the software
after spending years of your life while I am going to mastering in weeks.

Have we eaten something that disagrees with us today.
Do you have a sore throat ?
May be a little temperature ?


¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}


On Sat, 28 Mar 2020 at 02:03, Sean Owen <[hidden email]> wrote:
(this isn't an email list for user support)

On Fri, Mar 27, 2020 at 8:32 PM Zahid Rahman <[hidden email]> wrote:
>
> version: spark-3.0.0-preview2-bin-hadoop2.7
>
> The syntax checker objects to the following argument which is what I am supposed to enter.
>
> .schema(staticSchema)
>
> However when I  provide the  following argument it works but I don't think that is correct.
> What is the correct argument for this case ?
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.{window,column,desc,col}
>
> object RetailData {
>
>
>   def main(args: Array[String]): Unit = {
>
>     // crete spark session
>     val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
>     // set spark runtime  configuration
>     spark.conf.set("spark,sql.shuffle.partitions","5")
>
>     // create a static frame
>   val staticDataFrame = spark.read.format("csv")
>     .option ("header","true")
>     .option("inferschema","true")
>     .load("/data/retail-data/by-day/*.csv")
>
>     staticDataFrame.createOrReplaceTempView("retail_data")
>     val staticFrame = staticDataFrame.schema
>
>     staticDataFrame
>       .selectExpr(
>         "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>       .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>       .sum("total_cost")
>       .sort(desc("sum(total_cost)"))
>       .show(2)
>
>     val streamingDataFrame = spark.readStream
>       .schema(staticDataFrame.schema)
>       .option("maxFilesPerTrigger", 1)
>       .load("/data/retail-data/by-day/*.csv")
>
>       println(streamingDataFrame.isStreaming)
>
>   } // main
>
> } // object
>
>
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org