|
12
|
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
|
|
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
|
|
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
-- Ryan Blue Software Engineer Netflix
|
|
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store. Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue Software Engineer Netflix
|
|
I think I mentioned on the Design Doc that with the Cassandra connector we have similar issues. There is no "transaction" or "staging table" capable of really doing that the api requires. I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue Software Engineer Netflix
|
|
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Thanks a lot Reynold and Jungtaek Lim. It definitely helped me understand this better.
----- Original message ----- From: Reynold Xin <[hidden email]> To: [hidden email] Cc: [hidden email], [hidden email], dev <[hidden email]>, Ryan Blue <[hidden email]>, Ross Lawley <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 1:18 PM
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" target="_blank" value="+14084634980" >408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
--------------------------------------------------------------------- To unsubscribe e-mail: [hidden email]
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases. I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Ah yes. I have been thinking about NoSQL things since output for Spark workload may not be suitable for RDBMS (in terms of scale, and performance). For RDBMS it would work essentially (via INSERT ... SELECT).
I agree the potential failure is pretty short for HDFS case. I just thought about it theoretically since this is a kind of contract. It wouldn't hurt most of cases in production. Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
>Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Sorry, I accidentally sent a reply to just Jungtaek.
The current commit structure works for any table where you can stage data in place and commit in a combined operation. Iceberg does this by writing data files in place and committing them in an atomic update to the table metadata. You can also implement this in HBase using the timestamp or version field for MVCC. All readers ignore data newer than version V, writers create records with version V+1, then the current version is updated at once to V+1 from V.
It actually *doesn't* work for Hive tables because table state is tracked in the file system, unless you use a pattern where you write whole partitions and swap the old partition set for the new partition set atomically to commit. Ah yes. I have been thinking about NoSQL things since output for Spark workload may not be suitable for RDBMS (in terms of scale, and performance). For RDBMS it would work essentially (via INSERT ... SELECT).
I agree the potential failure is pretty short for HDFS case. I just thought about it theoretically since this is a kind of contract. It wouldn't hurt most of cases in production.
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
-- Ryan Blue Software Engineer Netflix
|
|
I'm not sure if it was Sqoop or another similar system, but I've heard about at least one Hadoop to RDBMS system working by writing to temporary staging tables in the database. The commit then runns a big INSERT INTO that selects from all the temporary tables and cleans up after by deleting them. That way, the database can optimize the real insert by copying the underlying files from one table into the other table. On Mon, Sep 10, 2018 at 2:00 PM Arun Mahadevan < [hidden email]> wrote: >Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
-- Ryan Blue Software Engineer Netflix
|
|
> And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that.
We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch.
So not an easy thing to integrate correctly.
>Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Regardless the API, to use Spark to write data atomically, it requires 1. Write data distributedly, with a central coordinator at Spark driver. 2. The distributed writers are not guaranteed to run together at the same time. (This can be relaxed if we can extend the barrier scheduling feature) 3. The new data is visible if and only if all distributed writers success.
According to these requirements, I think using a staging table is the most common way and maybe the only way. I'm not sure how 2PC can help, we don't want users to read partial data, so we need a final step to commit all the data together.
For RDBMS data sources, I think a simple solution is to ask users to coalesce the input RDD/DataFrame into one partition, then we don't need to care about multi-client transaction. Or using a staging table like Ryan described before.
> And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that.
We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch.
So not an easy thing to integrate correctly.
>Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Why is atomic operations a requirement? I feel like doubling the amount of writes (with staging tables) is probably a tradeoff that the end user should make. Regardless the API, to use Spark to write data atomically, it requires 1. Write data distributedly, with a central coordinator at Spark driver. 2. The distributed writers are not guaranteed to run together at the same time. (This can be relaxed if we can extend the barrier scheduling feature) 3. The new data is visible if and only if all distributed writers success.
According to these requirements, I think using a staging table is the most common way and maybe the only way. I'm not sure how 2PC can help, we don't want users to read partial data, so we need a final step to commit all the data together.
For RDBMS data sources, I think a simple solution is to ask users to coalesce the input RDD/DataFrame into one partition, then we don't need to care about multi-client transaction. Or using a staging table like Ryan described before.
> And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that.
We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch.
So not an easy thing to integrate correctly.
>Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
IMHO that's up to how we would like to be strict about "exactly-once".
Some being said it is exactly-once when the output is eventually exactly-once, whereas others being said there should be no side effect, like consumer shouldn't see partial write. I guess 2PC is former, since some partitions can commit earlier while other partitions fail to commit for some time. Being said, there may be couple of alternatives other than the contract Spark provides/requires, and I'd like to see how Spark community wants to deal with others. Would we want to disallow alternatives, like "replay + deduplicate write (per a batch/partition)" which ensures "eventually" exactly-once but cannot ensure the contract?
Btw, unless achieving exactly-once is light enough for given sink, I think the sink should provide both at-least-once (also optimized for the semantic) vs exactly-once, and let end users pick one. Why is atomic operations a requirement? I feel like doubling the amount of writes (with staging tables) is probably a tradeoff that the end user should make.
Regardless the API, to use Spark to write data atomically, it requires 1. Write data distributedly, with a central coordinator at Spark driver. 2. The distributed writers are not guaranteed to run together at the same time. (This can be relaxed if we can extend the barrier scheduling feature) 3. The new data is visible if and only if all distributed writers success.
According to these requirements, I think using a staging table is the most common way and maybe the only way. I'm not sure how 2PC can help, we don't want users to read partial data, so we need a final step to commit all the data together.
For RDBMS data sources, I think a simple solution is to ask users to coalesce the input RDD/DataFrame into one partition, then we don't need to care about multi-client transaction. Or using a staging table like Ryan described before.
> And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that.
We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch.
So not an easy thing to integrate correctly.
>Well almost all relational databases you can move data in a transactional way. That’s what transactions are for. It would work, but I suspect in most cases it would involve moving data from temporary tables to the final tables
Right now theres no mechanisms to let the individual tasks commit in a two-phase manner (Not sure if the CommitCordinator might help). If such an API is provided, the sources could use it as they wish (e.g. use XA support provided by mysql to implement it in a more efficient way than the driver moving from temp tables to destination tables).
Definitely there are complexities involved, but I am not sure if the network partitioning comes into play here since the driver can act as the co-ordinator and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase.
Most of the sources would not need any of the above and just need a way to support Idempotent writes and like Ryan suggested we can enable this (if there are gaps in the current APIs).
Well almost all relational databases you can move data in a transactional way. That’s what transactions are for.
For just straight HDFS, the move is a pretty fast operation so while it is not completely transactional, the window of potential failure is pretty short for appends. For writers at the partition level it is fine because it is just renaming directory, which is atomic. When network partitioning happens it is pretty OK for me to see 2PC not working, cause we deal with global transaction. Recovery should be hard thing to get it correctly though. I completely agree it would require massive changes to Spark.
What I couldn't find for underlying storages is moving data from staging table to final table in transactional way. I'm not fully sure but as I'm aware of, many storages would not support moving data, and even HDFS sink it is not strictly done in transactional way since we move multiple files with multiple operations. If coordinator just crashes it leaves partial write, and among writers and coordinator need to deal with ensuring it will not be going to be duplicated.
Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement "commit" (his reply didn't hit dev. mailing list though) but I'm not an expert of both twos and I couldn't still imagine it can deal with various crash cases.
I don't think two phase commit would work here at all.
1. It'd require massive changes to Spark.
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
Jungtaek Lim (HeartSaVioR)
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API.
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
Regards, Dilip Biswal Tel: <a href="tel:(408)%20463-4980" value="+14084634980" target="_blank">408-463-4980 [hidden email]
----- Original message ----- From: Reynold Xin <[hidden email]> To: Ryan Blue <[hidden email]> Cc: [hidden email], dev <[hidden email]> Subject: Re: DataSourceWriter V2 Api questions Date: Mon, Sep 10, 2018 10:26 AM
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
rb
Typically people do it via transactions, or staging tables.
Hi all,
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
Many thanks
Ross
--
Ryan Blue
Software Engineer
Netflix
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
12
|