Will .count() always trigger an evaluation of each row?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Will .count() always trigger an evaluation of each row?

Nicholas Chammas

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

Garren Staubli
This post has NOT been accepted by the mailing list yet.
Hi Nick,

Calling .cache.count() will force an evaluation of every row so that the dataset can and will then be cached into memory. Calling .cache() on its own is lazily evaluated, but the following .count() forces an evaluation of the entire dataset.

Garren
Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

Sean Owen
In reply to this post by Nicholas Chammas
I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

Matei Zaharia
Administrator
Count is different on DataFrames and Datasets from RDDs. On RDDs, it always evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of "select count(*) from ..." in SQL, which can be done without scanning the data for some data formats (e.g. Parquet). On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

Matei

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden email]> wrote:

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick


Reply | Threaded
Open this post in threaded view
|

RE: Will .count() always trigger an evaluation of each row?

assaf.mendelson

Actually, when I did a simple test on parquet (spark.read.parquet(“somefile”).cache().count()) the UI showed me that the entire file is cached. Is this just a fluke?

 

In any case I believe the question is still valid, how to make sure a dataframe is cached.

Consider for example a case where we read from a remote host (which is costly) and we want to make sure the original read is done at a specific time (when the network is less crowded).

I for one used .count() till now but if this is not guaranteed to cache, then how would I do that? Of course I could always save the dataframe to disk but that would cost a lot more in performance than I would like…

 

As for doing a map partitions for the dataset, wouldn’t that cause the row to be converted to the case class for each row? That could also be heavy.

Maybe cache should have a lazy parameter which would be false by default but we could call .cache(true) to make it materialize (similar to what we have with checkpoint).

Assaf.

 

From: Matei Zaharia [via Apache Spark Developers List] [mailto:ml-node+[hidden email]]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of "select count(*) from ..." in SQL, which can be done without scanning the data for some data formats (e.g. Parquet). On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

 

Matei

 

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden email]> wrote:

 

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

 

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

 

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

Jörn Franke
I think your example relates to scheduling, e.g. it makes sense to use oozie or similar to fetch the data at specific point in times.

I am also not a big fan of caching everything. In a Multi-user cluster with a lot of Applications you waste a lot of resources making everybody less efficient. 

On 19 Feb 2017, at 10:13, assaf.mendelson <[hidden email]> wrote:

Actually, when I did a simple test on parquet (spark.read.parquet(“somefile”).cache().count()) the UI showed me that the entire file is cached. Is this just a fluke?

 

In any case I believe the question is still valid, how to make sure a dataframe is cached.

Consider for example a case where we read from a remote host (which is costly) and we want to make sure the original read is done at a specific time (when the network is less crowded).

I for one used .count() till now but if this is not guaranteed to cache, then how would I do that? Of course I could always save the dataframe to disk but that would cost a lot more in performance than I would like…

 

As for doing a map partitions for the dataset, wouldn’t that cause the row to be converted to the case class for each row? That could also be heavy.

Maybe cache should have a lazy parameter which would be false by default but we could call .cache(true) to make it materialize (similar to what we have with checkpoint).

Assaf.

 

From: Matei Zaharia [via Apache Spark Developers List] [mailto:ml-node+[hidden email]]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of "select count(*) from ..." in SQL, which can be done without scanning the data for some data formats (e.g. Parquet). On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

 

Matei

 

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden email]> wrote:

 

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

 

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

 

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML



View this message in context: RE: Will .count() always trigger an evaluation of each row?
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

RE: Will .count() always trigger an evaluation of each row?

assaf.mendelson

I am not saying you should cache everything, just that it is a valid use case.

 

 

From: Jörn Franke [via Apache Spark Developers List] [mailto:ml-node+[hidden email]]
Sent: Sunday, February 19, 2017 12:13 PM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

I think your example relates to scheduling, e.g. it makes sense to use oozie or similar to fetch the data at specific point in times.

 

I am also not a big fan of caching everything. In a Multi-user cluster with a lot of Applications you waste a lot of resources making everybody less efficient. 


On 19 Feb 2017, at 10:13, assaf.mendelson <[hidden email]> wrote:

Actually, when I did a simple test on parquet (spark.read.parquet(“somefile”).cache().count()) the UI showed me that the entire file is cached. Is this just a fluke?

 

In any case I believe the question is still valid, how to make sure a dataframe is cached.

Consider for example a case where we read from a remote host (which is costly) and we want to make sure the original read is done at a specific time (when the network is less crowded).

I for one used .count() till now but if this is not guaranteed to cache, then how would I do that? Of course I could always save the dataframe to disk but that would cost a lot more in performance than I would like…

 

As for doing a map partitions for the dataset, wouldn’t that cause the row to be converted to the case class for each row? That could also be heavy.

Maybe cache should have a lazy parameter which would be false by default but we could call .cache(true) to make it materialize (similar to what we have with checkpoint).

Assaf.

 

From: Matei Zaharia [via Apache Spark Developers List] [mailto:ml-node+[hidden email]]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of "select count(*) from ..." in SQL, which can be done without scanning the data for some data formats (e.g. Parquet). On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

 

Matei

 

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden email]> wrote:

 

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

 

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

 

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

 


View this message in context: RE: Will .count() always trigger an evaluation of each row?
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

Ryan Blue
I think it is a great idea to have a way to force execution to build a cached dataset.

The use case for this that we see the most is to build broadcast tables. Right now, there's a 5-minute timeout to build a broadcast table. That's plenty of time if the data is sitting in a table, but we see a lot of users that have a dataframe with a complicated query plan that they know is small enough to broadcast. If that query plan is several stages, it can cause the job to fail because of the timeout. I usually recommend caching/persisting the content and then running the broadcast join to avoid the timeout.

I realize that the right solution is to get rid of the timeout when building a broadcast table, but forcing materialization is useful for things like this. I'd like to see a legitimate way to do it, since people currently rely on count.

rb

On Sun, Feb 19, 2017 at 3:14 AM, assaf.mendelson <[hidden email]> wrote:

I am not saying you should cache everything, just that it is a valid use case.

 

 

From: Jörn Franke [via Apache Spark Developers List] [mailto:[hidden email][hidden email]]
Sent: Sunday, February 19, 2017 12:13 PM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

I think your example relates to scheduling, e.g. it makes sense to use oozie or similar to fetch the data at specific point in times.

 

I am also not a big fan of caching everything. In a Multi-user cluster with a lot of Applications you waste a lot of resources making everybody less efficient. 


On 19 Feb 2017, at 10:13, assaf.mendelson <[hidden email]> wrote:

Actually, when I did a simple test on parquet (spark.read.parquet(“somefile”).cache().count()) the UI showed me that the entire file is cached. Is this just a fluke?

 

In any case I believe the question is still valid, how to make sure a dataframe is cached.

Consider for example a case where we read from a remote host (which is costly) and we want to make sure the original read is done at a specific time (when the network is less crowded).

I for one used .count() till now but if this is not guaranteed to cache, then how would I do that? Of course I could always save the dataframe to disk but that would cost a lot more in performance than I would like…

 

As for doing a map partitions for the dataset, wouldn’t that cause the row to be converted to the case class for each row? That could also be heavy.

Maybe cache should have a lazy parameter which would be false by default but we could call .cache(true) to make it materialize (similar to what we have with checkpoint).

Assaf.

 

From: Matei Zaharia [via Apache Spark Developers List] [mailto:[hidden email][hidden email]]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

 

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of "select count(*) from ..." in SQL, which can be done without scanning the data for some data formats (e.g. Parquet). On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

 

Matei

 

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden email]> wrote:

 

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

 

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden email]> wrote:

Especially during development, people often use .count() or .persist().count() to force evaluation of all rows — exposing any problems, e.g. due to bad data — and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that it doesn’t have to do all that work to give the correct count. (This blog post suggests that something like this is already happening.) This will change Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick

 

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

 


View this message in context: RE: Will .count() always trigger an evaluation of each row?
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

 


To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Will .count() always trigger an evaluation of each row?

zero323
Technically speaking it is still possible to:


df.createOrReplaceTempView("df")
spark.sql("CACHE TABLE df")
spark.table("df")