BUG: spark.readStream .schema(staticSchema) not receiving schema information

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

BUG: spark.readStream .schema(staticSchema) not receiving schema information

Zahid Rahman
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the  csv files including the .load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session
val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)

val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()

} // main

} // object



















val staticSchema = staticDataFrame.schema












¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
Reply | Threaded
Open this post in threaded view
|

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

RussS
This is probably more of a question for the user support list, but I believe I understand the issue.

Schema inside of spark refers to the structure of the output rows, for example the schema for a particular dataframe could be
(User: Int, Password: String) - Two Columns the first is User of type int and the second is Password of Type String.

When you pass the schema from one reader to another, you are only copyting this structure, not all of the other options associated with the dataframe.
This is usually useful when you are reading from sources with different options but data that needs to be read into the same structure.

The other properties such as "format" and "options" exist independently of Schema. This is helpful if I was reading from both MySQL and 
a comma separated file for example. While the Schema is the same, the options like ("inferSchema") do not apply to both MySql and CSV and
format actually picks whether to us "JDBC" or "CSV" so copying that wouldn't be helpful either.

I hope this clears things up,
Russ

On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman <[hidden email]> wrote:
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the  csv files including the .load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session
val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)

val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()

} // main

} // object



















val staticSchema = staticDataFrame.schema












¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
Reply | Threaded
Open this post in threaded view
|

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

Zahid Rahman
Very kind of you.

On Sat, 28 Mar 2020, 15:24 Russell Spitzer, <[hidden email]> wrote:
This is probably more of a question for the user support list, but I believe I understand the issue.

Schema inside of spark refers to the structure of the output rows, for example the schema for a particular dataframe could be
(User: Int, Password: String) - Two Columns the first is User of type int and the second is Password of Type String.

When you pass the schema from one reader to another, you are only copyting this structure, not all of the other options associated with the dataframe.
This is usually useful when you are reading from sources with different options but data that needs to be read into the same structure.

The other properties such as "format" and "options" exist independently of Schema. This is helpful if I was reading from both MySQL and 
a comma separated file for example. While the Schema is the same, the options like ("inferSchema") do not apply to both MySql and CSV and
format actually picks whether to us "JDBC" or "CSV" so copying that wouldn't be helpful either.

I hope this clears things up,
Russ

On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman <[hidden email]> wrote:
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the  csv files including the .load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session
val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)

val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()

} // main

} // object



















val staticSchema = staticDataFrame.schema












¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
Reply | Threaded
Open this post in threaded view
|

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

Zahid Rahman
In reply to this post by RussS
So the schema is limited to holding only the DEFINITION of schema. For example as you say  the columns, I.e. first column User:Int 2nd column String:password. 

Not location of source I.e. csv file with or without header.  SQL DB tables.

I am pleased for once I am wrong about being another bug, and it was a design decision adding flexibility.









On Sat, 28 Mar 2020, 15:24 Russell Spitzer, <[hidden email]> wrote:
This is probably more of a question for the user support list, but I believe I understand the issue.

Schema inside of spark refers to the structure of the output rows, for example the schema for a particular dataframe could be
(User: Int, Password: String) - Two Columns the first is User of type int and the second is Password of Type String.

When you pass the schema from one reader to another, you are only copyting this structure, not all of the other options associated with the dataframe.
This is usually useful when you are reading from sources with different options but data that needs to be read into the same structure.

The other properties such as "format" and "options" exist independently of Schema. This is helpful if I was reading from both MySQL and 
a comma separated file for example. While the Schema is the same, the options like ("inferSchema") do not apply to both MySql and CSV and
format actually picks whether to us "JDBC" or "CSV" so copying that wouldn't be helpful either.

I hope this clears things up,
Russ

On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman <[hidden email]> wrote:
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the  csv files including the .load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session
val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)

val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()

} // main

} // object



















val staticSchema = staticDataFrame.schema












¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}