DataSourceV2 : Transactional Write support

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSourceV2 : Transactional Write support

Shiv Prashant Sood
All,

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

Matt Cheah

Can we check that the latest staging APIs work for the JDBC use case in a single transactional write? See https://github.com/apache/spark/pull/24798/files#diff-c9d2f9c9d20452939b7c28ebdae0503dR53

 

But also acknowledge that transactions from a more traditional RDBMS sense tend to have pretty specific semantics we don’t support in the V2 API. For example, one cannot commit multiple write operations in a single transaction right now. That would require changes to the DDL and a pretty substantial change to the design of Spark-SQL more broadly.

 

-Matt Cheah

 

From: Shiv Prashant Sood <[hidden email]>
Date: Friday, August 2, 2019 at 12:56 PM
To: Spark Dev List <[hidden email]>
Subject: DataSourceV2 : Transactional Write support

 

All,

 

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ).

 

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

 

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

 

Thanks in advance for your help.

 

Regards,

Shiv


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

Jungtaek Lim
In reply to this post by Shiv Prashant Sood
I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:
All,

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv


--
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

Ryan Blue
> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic)

I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table.

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim <[hidden email]> wrote:
I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:
All,

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv


--


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

Shiv Prashant Sood
Thanks all for the clarification.

Regards,
Shiv

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue <[hidden email]> wrote:
> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic)

I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table.

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim <[hidden email]> wrote:
I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:
All,

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv


--


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

cloud0fan
I agree with the temp table approach. One idea is: maybe we only need one temp table, and each task writes to this temp table. At the end we read the data from the temp table and write it to the target table. AFAIK JDBC can handle concurrent table writing very well, and it's better than creating thousands of temp tables for one write job(assume the input RDD has thousands of partitions).

On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood <[hidden email]> wrote:
Thanks all for the clarification.

Regards,
Shiv

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue <[hidden email]> wrote:
> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic)

I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table.

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim <[hidden email]> wrote:
I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:
All,

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv


--


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

Matt Cheah

There might be some help from the staging table catalog as well.

 

-Matt Cheah

 

From: Wenchen Fan <[hidden email]>
Date: Monday, August 5, 2019 at 7:40 PM
To: Shiv Prashant Sood <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Jungtaek Lim <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: DataSourceV2 : Transactional Write support

 

I agree with the temp table approach. One idea is: maybe we only need one temp table, and each task writes to this temp table. At the end we read the data from the temp table and write it to the target table. AFAIK JDBC can handle concurrent table writing very well, and it's better than creating thousands of temp tables for one write job(assume the input RDD has thousands of partitions).

 

On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood <[hidden email]> wrote:

Thanks all for the clarification.

 

Regards,

Shiv

 

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue <[hidden email]> wrote:

> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic)

 

I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table.

 

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim <[hidden email]> wrote:

I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

 

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

 

Thanks,

Jungtaek Lim (HeartSaVioR)

 

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:

All,

 

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ).

 

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

 

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

 

Thanks in advance for your help.

 

Regards,

Shiv


 

--


 

--

Ryan Blue

Software Engineer

Netflix


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 : Transactional Write support

rxin
We can also just write using one partition, which will be sufficient for most use cases. 

On Mon, Aug 5, 2019 at 7:48 PM Matt Cheah <[hidden email]> wrote:

There might be some help from the staging table catalog as well.

 

-Matt Cheah

 

From: Wenchen Fan <[hidden email]>
Date: Monday, August 5, 2019 at 7:40 PM
To: Shiv Prashant Sood <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Jungtaek Lim <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: DataSourceV2 : Transactional Write support

 

I agree with the temp table approach. One idea is: maybe we only need one temp table, and each task writes to this temp table. At the end we read the data from the temp table and write it to the target table. AFAIK JDBC can handle concurrent table writing very well, and it's better than creating thousands of temp tables for one write job(assume the input RDD has thousands of partitions).

 

On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood <[hidden email]> wrote:

Thanks all for the clarification.

 

Regards,

Shiv

 

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue <[hidden email]> wrote:

> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic)

 

I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table.

 

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim <[hidden email]> wrote:

I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive.

 

What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic).

 

Thanks,

Jungtaek Lim (HeartSaVioR)

 

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <[hidden email]> wrote:

All,

 

I understood that DataSourceV2 supports Transactional write and wanted to  implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ).

 

Don't see how this is feasible for JDBC based connector.  The FW suggest that EXECUTOR send a commit message  to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC  as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection  is not serializable that it can be sent to the DRIVER.

 

Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation.

 

Thanks in advance for your help.

 

Regards,

Shiv


 

--


 

--

Ryan Blue

Software Engineer

Netflix