[Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

Etienne Chauchot
Hi Spark guys,

I'm Etienne Chauchot and I'm a committer on the Apache Beam project.

We have what we call runners. They are pieces of software that translate pipelines written using Beam API into pipelines that use native execution engine API. Currently, the Spark runner uses old RDD / DStream APIs.
I'm writing a new runner that will use structured streaming (but not continuous processing, and also no schema for now).

I am just starting. I'm currently trying to map our sources to yours. I'm targeting new DataSourceV2 API. It maps pretty well with Beam sources but I have a problem with instanciation of the custom source.
I searched for an answer in stack-overflow and user ML with no luck. I guess it is a too specific question:

When visiting Beam DAG I have access to Beam objects such as Source and Reader that I need to map to MicroBatchReader and InputPartitionReader.
As far as I understand, a custom DataSourceV2 is instantiated automatically by spark thanks to sparkSession.readStream().format(providerClassName) or similar code. The problem is that I can only pass options of primitive types + String so I cannot pass the Beam Source to DataSourceV2.
=> Is there a way to do so ?


Also I get as an output a Dataset<Row>. The Row contains an instance of Beam WindowedValue<T>, T is the type parameter of the Source. I do a map on the Dataset to transform it to a Dataset<WindowedValue<T>>. I have a question related to the Encoder: 
=> how to properly create an Encoder for the generic type WindowedValue<T> to use in the map?

Here is the code:

And more specially:

Thanks,

Etienne






Reply | Threaded
Open this post in threaded view
|

Re: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

Etienne Chauchot
Hi everyone,

Does anyone have comments on this question?

CCing user ML

Thanks
Etienne

Le mardi 11 décembre 2018 à 19:02 +0100, Etienne Chauchot a écrit :
Hi Spark guys,

I'm Etienne Chauchot and I'm a committer on the Apache Beam project.

We have what we call runners. They are pieces of software that translate pipelines written using Beam API into pipelines that use native execution engine API. Currently, the Spark runner uses old RDD / DStream APIs.
I'm writing a new runner that will use structured streaming (but not continuous processing, and also no schema for now).

I am just starting. I'm currently trying to map our sources to yours. I'm targeting new DataSourceV2 API. It maps pretty well with Beam sources but I have a problem with instanciation of the custom source.
I searched for an answer in stack-overflow and user ML with no luck. I guess it is a too specific question:

When visiting Beam DAG I have access to Beam objects such as Source and Reader that I need to map to MicroBatchReader and InputPartitionReader.
As far as I understand, a custom DataSourceV2 is instantiated automatically by spark thanks to sparkSession.readStream().format(providerClassName) or similar code. The problem is that I can only pass options of primitive types + String so I cannot pass the Beam Source to DataSourceV2.
=> Is there a way to do so ?


Also I get as an output a Dataset<Row>. The Row contains an instance of Beam WindowedValue<T>, T is the type parameter of the Source. I do a map on the Dataset to transform it to a Dataset<WindowedValue<T>>. I have a question related to the Encoder: 
=> how to properly create an Encoder for the generic type WindowedValue<T> to use in the map?

Here is the code:

And more specially:

Thanks,

Etienne