Suggestion on Join Approach with Spark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Suggestion on Join Approach with Spark

Chetan Khatri
Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say, 

val historyDF = spark.read.parquet("/home/test/transaction-line-item")
and I am getting changed data at seperate hdfs path,let's say;
val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
Now I would like to take rows from deltaDF and ignore only those records from historyDF, and write to some MySQL table. 
Once I am done with writing to MySQL table, I would like to update /home/test/transaction-line-item as overwrite. Now I can't just 
overwrite because lazy evaluation and DAG structure unless write to somewhere else and then write back as overwrite.
val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
"left_outer").filter(deltaDF.col("sys_change_column").isNull)
.drop(deltaDF.col("sys_change_column"))
val mergedDataDF = syncDataDF.union(deltaDF)
I believe, Without doing union , only with Join this can be done. Please suggest best approach.
As I can't write back mergedDataDF  to the path of historyDF, because from there I am only reading. What I am doing is to write at temp
path and then read  from there and write back! Which is bad Idea, I need suggestion here...

mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("/home/test/transaction-line-item")

Please suggest me best approach.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Suggestion on Join Approach with Spark

Chetan Khatri
Any one help me, I am confused. :(

On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <[hidden email]> wrote:
Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say, 

val historyDF = spark.read.parquet("/home/test/transaction-line-item")
and I am getting changed data at seperate hdfs path,let's say;
val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
Now I would like to take rows from deltaDF and ignore only those records from historyDF, and write to some MySQL table. 
Once I am done with writing to MySQL table, I would like to update /home/test/transaction-line-item as overwrite. Now I can't just 
overwrite because lazy evaluation and DAG structure unless write to somewhere else and then write back as overwrite.
val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
"left_outer").filter(deltaDF.col("sys_change_column").isNull)
.drop(deltaDF.col("sys_change_column"))
val mergedDataDF = syncDataDF.union(deltaDF)
I believe, Without doing union , only with Join this can be done. Please suggest best approach.
As I can't write back mergedDataDF  to the path of historyDF, because from there I am only reading. What I am doing is to write at temp
path and then read  from there and write back! Which is bad Idea, I need suggestion here...

mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("/home/test/transaction-line-item")

Please suggest me best approach.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Suggestion on Join Approach with Spark

Nicholas Chammas
This kind of question is for the User list, or for something like Stack Overflow. It's not on topic here.

The dev list (i.e. this list) is for discussions about the development of Spark itself.

On Wed, May 15, 2019 at 1:50 PM Chetan Khatri <[hidden email]> wrote:
Any one help me, I am confused. :(

On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <[hidden email]> wrote:
Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say, 

val historyDF = spark.read.parquet("/home/test/transaction-line-item")
and I am getting changed data at seperate hdfs path,let's say;
val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
Now I would like to take rows from deltaDF and ignore only those records from historyDF, and write to some MySQL table. 
Once I am done with writing to MySQL table, I would like to update /home/test/transaction-line-item as overwrite. Now I can't just 
overwrite because lazy evaluation and DAG structure unless write to somewhere else and then write back as overwrite.
val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
"left_outer").filter(deltaDF.col("sys_change_column").isNull)
.drop(deltaDF.col("sys_change_column"))
val mergedDataDF = syncDataDF.union(deltaDF)
I believe, Without doing union , only with Join this can be done. Please suggest best approach.
As I can't write back mergedDataDF  to the path of historyDF, because from there I am only reading. What I am doing is to write at temp
path and then read  from there and write back! Which is bad Idea, I need suggestion here...

mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("/home/test/transaction-line-item")

Please suggest me best approach.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Suggestion on Join Approach with Spark

Chetan Khatri
Hello Nicholas,

I sincerely apologise.

Thanks

On Wed, May 15, 2019 at 11:34 PM Nicholas Chammas <[hidden email]> wrote:
This kind of question is for the User list, or for something like Stack Overflow. It's not on topic here.

The dev list (i.e. this list) is for discussions about the development of Spark itself.

On Wed, May 15, 2019 at 1:50 PM Chetan Khatri <[hidden email]> wrote:
Any one help me, I am confused. :(

On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <[hidden email]> wrote:
Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say, 

val historyDF = spark.read.parquet("/home/test/transaction-line-item")
and I am getting changed data at seperate hdfs path,let's say;
val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
Now I would like to take rows from deltaDF and ignore only those records from historyDF, and write to some MySQL table. 
Once I am done with writing to MySQL table, I would like to update /home/test/transaction-line-item as overwrite. Now I can't just 
overwrite because lazy evaluation and DAG structure unless write to somewhere else and then write back as overwrite.
val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
"left_outer").filter(deltaDF.col("sys_change_column").isNull)
.drop(deltaDF.col("sys_change_column"))
val mergedDataDF = syncDataDF.union(deltaDF)
I believe, Without doing union , only with Join this can be done. Please suggest best approach.
As I can't write back mergedDataDF  to the path of historyDF, because from there I am only reading. What I am doing is to write at temp
path and then read  from there and write back! Which is bad Idea, I need suggestion here...

mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("/home/test/transaction-line-item")

Please suggest me best approach.

Thanks