Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

Thakrar, Jayesh

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.

Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

Ryan Blue
Jayesh,

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.


rb

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

Thakrar, Jayesh

Hi Ryan,

 

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.

 

So creating custom batch/non-streaming data source is not difficult.

 

The issue I have is when a streaming data source.

 

Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below).

The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true.

However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql

 

As I write this, I think having my code on github will make it easy to illustrate.

 

See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source

 

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919

 

 

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value
   
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}

 

 

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

Jayesh,

 

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.

 

 

rb

 

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.



 

--

Ryan Blue

Software Engineer

Netflix

Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

cloud0fan
org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :)

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <[hidden email]> wrote:

Hi Ryan,

 

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.

 

So creating custom batch/non-streaming data source is not difficult.

 

The issue I have is when a streaming data source.

 

Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below).

The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true.

However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql

 

As I write this, I think having my code on github will make it easy to illustrate.

 

See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source

 

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919

 

 

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value
   
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}

 

 

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

Jayesh,

 

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.

 

 

rb

 

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.



 

--

Ryan Blue

Software Engineer

Netflix


Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

Thakrar, Jayesh

Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned earlier and have been using the Kafka streaming as a reference example.

The builtin ones work and use the internalCreateDataFrame - and that's where I got the idea about using the method to set the "isStreaming" to true.

 

But I should confess that I don't know the source code very well, so will appreciate if you can point me to any other pointers/examples please.

 

From: Wenchen Fan <[hidden email]>
Date: Thursday, March 22, 2018 at 2:52 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :)

 

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <[hidden email]> wrote:

Hi Ryan,

 

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.

 

So creating custom batch/non-streaming data source is not difficult.

 

The issue I have is when a streaming data source.

 

Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below).

The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true.

However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql

 

As I write this, I think having my code on github will make it easy to illustrate.

 

See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source

 

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919

 

 

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value
   
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}

 

 

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

Jayesh,

 

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.

 

 

rb

 

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.



 

--

Ryan Blue

Software Engineer

Netflix

 

Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

cloud0fan
You can look at the PRs that migrate builtin streaming data sources to the V2 API: https://github.com/apache/spark/pulls?utf8=%E2%9C%93&q=is%3Apr+migrate+in%3Atitle+is%3Aclosed+

On Thu, Mar 22, 2018 at 12:58 PM, Thakrar, Jayesh <[hidden email]> wrote:

Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned earlier and have been using the Kafka streaming as a reference example.

The builtin ones work and use the internalCreateDataFrame - and that's where I got the idea about using the method to set the "isStreaming" to true.

 

But I should confess that I don't know the source code very well, so will appreciate if you can point me to any other pointers/examples please.

 

From: Wenchen Fan <[hidden email]>
Date: Thursday, March 22, 2018 at 2:52 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>


Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :)

 

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <[hidden email]> wrote:

Hi Ryan,

 

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.

 

So creating custom batch/non-streaming data source is not difficult.

 

The issue I have is when a streaming data source.

 

Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below).

The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true.

However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql

 

As I write this, I think having my code on github will make it easy to illustrate.

 

See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source

 

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919

 

 

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value
   
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}

 

 

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

Jayesh,

 

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.

 

 

rb

 

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.



 

--

Ryan Blue

Software Engineer

Netflix

 


Reply | Threaded
Open this post in threaded view
|

Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

Thakrar, Jayesh

Thank you Wenchen - that was very helpful!

 

So apparently, the Kafka datasource seems to be still using the older (non-V2) API and hence the use of internalCreateDataFrame and isStreaming.

 

The ones that you pointed me to has the migration from the old approach to the V2 approach as you mentioned and that requires the data sources to generate RDD[Row] which is what I was looking for.

 

And yes, I understand the the API is still in flux and subject to change :)

 

Thanks again to both you and Ryan!

 

Jayesh

 

From: Wenchen Fan <[hidden email]>
Date: Thursday, March 22, 2018 at 6:59 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

You can look at the PRs that migrate builtin streaming data sources to the V2 API: <a href="https://github.com/apache/spark/pulls?utf8=%E2%9C%93&amp;q=is%3Apr&#43;migrate&#43;in%3Atitle&#43;is%3Aclosed&#43;">https://github.com/apache/spark/pulls?utf8=%E2%9C%93&q=is%3Apr+migrate+in%3Atitle+is%3Aclosed+

 

On Thu, Mar 22, 2018 at 12:58 PM, Thakrar, Jayesh <[hidden email]> wrote:

Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned earlier and have been using the Kafka streaming as a reference example.

The builtin ones work and use the internalCreateDataFrame - and that's where I got the idea about using the method to set the "isStreaming" to true.

 

But I should confess that I don't know the source code very well, so will appreciate if you can point me to any other pointers/examples please.

 

From: Wenchen Fan <[hidden email]>
Date: Thursday, March 22, 2018 at 2:52 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>


Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :)

 

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <[hidden email]> wrote:

Hi Ryan,

 

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.

 

So creating custom batch/non-streaming data source is not difficult.

 

The issue I have is when a streaming data source.

 

Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below).

The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true.

However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql

 

As I write this, I think having my code on github will make it easy to illustrate.

 

See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source

 

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919

 

 

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value
   
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}

 

 

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

 

Jayesh,

 

We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out.

 

 

rb

 

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[hidden email]> wrote:

Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources.

 

Consequently, one has to create streaming sources in packages under org.apache.spark.sql.

 

Any pointers or info is greatly appreciated.



 

--

Ryan Blue

Software Engineer

Netflix