Correlated subqueries in the DataFrame API

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Correlated subqueries in the DataFrame API

Nicholas Chammas
I just submitted SPARK-23945 but wanted to double check here to make sure I didn't miss something fundamental.

Correlated subqueries are tracked at a high level in SPARK-18455, but it's not clear to me whether they are "design-appropriate" for the DataFrame API.

Are correlated subqueries a thing we can expect to have in the DataFrame API?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Correlated subqueries in the DataFrame API

Ryan Blue

Nick, thanks for raising this.

It looks useful to have something in the DF API that behaves like sub-queries, but I’m not sure that passing a DF works. Making every method accept a DF that may contain matching data seems like it puts a lot of work on the API — which now has to accept a DF all over the place.

What about exposing transforms that make it easy to coerce data to what the method needs? Instead of passing a dataframe, you’d pass df.toSet to isin:

val subQ = spark.sql("select distinct filter_col from source")
val df = table.filter($"col".isin(subQ.toSet))

That also distinguishes between a sub-query and a correlated sub-query that uses values from the outer query. We would still need to come up with syntax for the correlated case, unless there’s a proposal already.

rb


On Mon, Apr 9, 2018 at 3:56 PM, Nicholas Chammas <[hidden email]> wrote:
I just submitted SPARK-23945 but wanted to double check here to make sure I didn't miss something fundamental.

Correlated subqueries are tracked at a high level in SPARK-18455, but it's not clear to me whether they are "design-appropriate" for the DataFrame API.

Are correlated subqueries a thing we can expect to have in the DataFrame API?

Nick




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Correlated subqueries in the DataFrame API

rxin
Perhaps we can just have a function that turns a DataFrame into a Column? That'd work for both correlated and uncorrelated case, although in the correlated case we'd need to turn off eager analysis (otherwise there is no way to construct a valid DataFrame).


On Thu, Apr 19, 2018 at 4:08 PM, Ryan Blue <[hidden email]> wrote:

Nick, thanks for raising this.

It looks useful to have something in the DF API that behaves like sub-queries, but I’m not sure that passing a DF works. Making every method accept a DF that may contain matching data seems like it puts a lot of work on the API — which now has to accept a DF all over the place.

What about exposing transforms that make it easy to coerce data to what the method needs? Instead of passing a dataframe, you’d pass df.toSet to isin:

val subQ = spark.sql("select distinct filter_col from source")
val df = table.filter($"col".isin(subQ.toSet))

That also distinguishes between a sub-query and a correlated sub-query that uses values from the outer query. We would still need to come up with syntax for the correlated case, unless there’s a proposal already.

rb


On Mon, Apr 9, 2018 at 3:56 PM, Nicholas Chammas <[hidden email]> wrote:
I just submitted SPARK-23945 but wanted to double check here to make sure I didn't miss something fundamental.

Correlated subqueries are tracked at a high level in SPARK-18455, but it's not clear to me whether they are "design-appropriate" for the DataFrame API.

Are correlated subqueries a thing we can expect to have in the DataFrame API?

Nick




--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: Correlated subqueries in the DataFrame API

Nicholas Chammas

What about exposing transforms that make it easy to coerce data to what the method needs? Instead of passing a dataframe, you’d pass df.toSet to isin

Assuming toSet returns a local list, wouldn’t that have the problem of not being able to handle extremely large lists? In contrast, I believe SQL’s IN is implemented in such a way that the inner query being referenced by IN does not need to be collected locally. Did I understand your suggestion correctly?

I think having .isin() accept a Column potentially makes more sense, since that matches what happens in SQL in terms of semantics, and would hopefully also preserve the distributed nature of the operation.

For example, I believe in most cases we’d want this

(table1
    .where(
        table1['name'].isin(
            table2.select('name')
            # table2['name']  # per Reynold's suggestion
        )))

and this

(table1
    .join(table2, on='name')
    .select(table1['*']))

to compile down to the same physical plan. No?

Nick


On Thu, Apr 19, 2018 at 7:13 PM Reynold Xin <[hidden email]> wrote:
Perhaps we can just have a function that turns a DataFrame into a Column? That'd work for both correlated and uncorrelated case, although in the correlated case we'd need to turn off eager analysis (otherwise there is no way to construct a valid DataFrame).


On Thu, Apr 19, 2018 at 4:08 PM, Ryan Blue <[hidden email]> wrote:

Nick, thanks for raising this.

It looks useful to have something in the DF API that behaves like sub-queries, but I’m not sure that passing a DF works. Making every method accept a DF that may contain matching data seems like it puts a lot of work on the API — which now has to accept a DF all over the place.

What about exposing transforms that make it easy to coerce data to what the method needs? Instead of passing a dataframe, you’d pass df.toSet to isin:

val subQ = spark.sql("select distinct filter_col from source")
val df = table.filter($"col".isin(subQ.toSet))

That also distinguishes between a sub-query and a correlated sub-query that uses values from the outer query. We would still need to come up with syntax for the correlated case, unless there’s a proposal already.

rb


On Mon, Apr 9, 2018 at 3:56 PM, Nicholas Chammas <[hidden email]> wrote:
I just submitted SPARK-23945 but wanted to double check here to make sure I didn't miss something fundamental.

Correlated subqueries are tracked at a high level in SPARK-18455, but it's not clear to me whether they are "design-appropriate" for the DataFrame API.

Are correlated subqueries a thing we can expect to have in the DataFrame API?

Nick




--
Ryan Blue
Software Engineer
Netflix