Fwd: dataframe null safe joins given a list of columns

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: dataframe null safe joins given a list of columns

Enrico Minack

Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> version of join(Dataset[_], Seq[String]) would be useful.

Does any PMC consider this useful enough to be added to the Dataset API? I'd be happy to create a PR in that case.

Enrico



-------- Weitergeleitete Nachricht --------
Betreff: dataframe null safe joins given a list of columns
Datum: Thu, 6 Feb 2020 12:45:11 +0000
Von: Marcelo Valle [hidden email]
An: user @spark [hidden email]


I was surprised I couldn't find a way of solving this in spark, as it must be a very common problem for users. Then I decided to ask here.

Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, "c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, "d4")).toDF("a", "b", "d")
df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-----+---+---+
|  a|    b|  c|  d|
+---+-----+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-----+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it doesn't create duplicate columns by default. However, it uses the operator `===` to join, not the null safe one `<=>`.

Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---+----+---+---+----+---+
|  a|   b|  c|  a|   b|  d|
+---+----+---+---+----+---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---+----+---+---+----+---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).drop(df2("a")).drop(df2("b")).show()
+---+----+---+---+
|  a|   b|  c|  d|
+---+----+---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---+----+---+---+
```

Which works, but is really verbose, especially when you have many join columns.

Is there a better way of solving this without needing a utility method? This same problem is something I find in every spark project.



This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom

Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Alexandros Biratsis
Hi Enrico and Spark devs,

Since the current plan is not to provide a built-in functionality for dropping repeated/redundant columns, I wrote two helper methods as a workaround solution.

The 1st method supports multiple Column instances extending the current drop which supports column names only:
implicit class DataframeExt(val df: DataFrame) {
def drop(cols: Seq[Column]) : DataFrame = {
cols.foldLeft(df){
(tdf, c) => tdf.drop(c)
}
}
}
2nd implicit method which converts a sequence of column names into Column instances, optionally binding them to the parent dataframes:
implicit class SeqExt(val cols: Seq[String]) {
def toCol(dfs: DataFrame*) : Seq[Column] = {
if(dfs.nonEmpty) {
dfs.foldLeft(Seq[Column]()) {
(acc, df) => acc ++ cols.map {df(_)}
}
}
else{
cols.map {col(_)}
}
}
}
After adding these two to your library you can use it as:
import implicits._

val dropCols = Seq("c2", "c3")
val joinCols = Seq("c1")

val weatherDf = dfA.join(dfB, joinCols, "inner")
.join(dfC, joinCols, "inner")
.join(dfD, joinCols, "inner")
.drop(dropCols.toCol(dfB, dfC, dfD))
Cheers,
Alex

On Wed, Feb 26, 2020 at 10:07 AM Enrico Minack <[hidden email]> wrote:
I have created a jira to track this request: https://issues.apache.org/jira/browse/SPARK-30957

Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:

Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> version of join(Dataset[_], Seq[String]) would be useful.

Does any PMC consider this useful enough to be added to the Dataset API? I'd be happy to create a PR in that case.

Enrico



-------- Weitergeleitete Nachricht --------
Betreff: dataframe null safe joins given a list of columns
Datum: Thu, 6 Feb 2020 12:45:11 +0000
Von: Marcelo Valle [hidden email]
An: user @spark [hidden email]


I was surprised I couldn't find a way of solving this in spark, as it must be a very common problem for users. Then I decided to ask here.

Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, "c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, "d4")).toDF("a", "b", "d")
df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-----+---+---+
|  a|    b|  c|  d|
+---+-----+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-----+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it doesn't create duplicate columns by default. However, it uses the operator `===` to join, not the null safe one `<=>`.

Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---+----+---+---+----+---+
|  a|   b|  c|  a|   b|  d|
+---+----+---+---+----+---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---+----+---+---+----+---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).drop(df2("a")).drop(df2("b")).show()
+---+----+---+---+
|  a|   b|  c|  d|
+---+----+---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---+----+---+---+
```

Which works, but is really verbose, especially when you have many join columns.

Is there a better way of solving this without needing a utility method? This same problem is something I find in every spark project.



This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom