A DataFrame cache bug

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

A DataFrame cache bug

tgbaggio
Hi All,

I found a strange bug which is related with reading data from a updated path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached dataFrame

Any idea? Thanks a lot

Cheers
Gen
Reply | Threaded
Open this post in threaded view
|

Re: A DataFrame cache bug

tgbaggio
Hi All,

I might find a related issue on jira:


This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang <[hidden email]> wrote:
Hi All,

I found a strange bug which is related with reading data from a updated path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached dataFrame

Any idea? Thanks a lot

Cheers
Gen

Reply | Threaded
Open this post in threaded view
|

Re: A DataFrame cache bug

Kazuaki Ishizaki
Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(dir)".

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)  // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

Regards,
Kazuaki Ishizaki



From:        gen tang <[hidden email]>
To:        [hidden email]
Date:        2017/02/22 15:02
Subject:        Re: A DataFrame cache bug




Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang <gen.tang86@...> wrote:
Hi All,

I found a strange bug which is related with reading data from a updated path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached dataFrame

Any idea? Thanks a lot

Cheers
Gen


Reply | Threaded
Open this post in threaded view
|

Re: A DataFrame cache bug

tgbaggio
Hi Kazuaki Ishizaki

Thanks a lot for your help. It works. However, a more strange bug appears as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect

If we move refreshByPath into f(), just before spark.read. The whole code works fine.

Any idea? Thanks a lot

Cheers
Gen


On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <[hidden email]> wrote:
Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(dir)".

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)  // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

Regards,
Kazuaki Ishizaki



From:        gen tang <[hidden email]>
To:        [hidden email]
Date:        2017/02/22 15:02
Subject:        Re: A DataFrame cache bug




Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang <[hidden email]> wrote:
Hi All,

I found a strange bug which is related with reading data from a updated path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached dataFrame

Any idea? Thanks a lot

Cheers
Gen



Reply | Threaded
Open this post in threaded view
|

Re: A DataFrame cache bug

tgbaggio
Hi, The example that I provided is not very clear. And I add a more clear example in jira.

Thanks

Cheers
Gen

On Wed, Feb 22, 2017 at 3:47 PM, gen tang <[hidden email]> wrote:
Hi Kazuaki Ishizaki

Thanks a lot for your help. It works. However, a more strange bug appears as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect

If we move refreshByPath into f(), just before spark.read. The whole code works fine.

Any idea? Thanks a lot

Cheers
Gen


On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <[hidden email]> wrote:
Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(dir)".

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)  // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

Regards,
Kazuaki Ishizaki



From:        gen tang <[hidden email]>
To:        [hidden email]
Date:        2017/02/22 15:02
Subject:        Re: A DataFrame cache bug




Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang <[hidden email]> wrote:
Hi All,

I found a strange bug which is related with reading data from a updated path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached dataFrame

Any idea? Thanks a lot

Cheers
Gen




Reply | Threaded
Open this post in threaded view
|

Re: A DataFrame cache bug

Liang-Chi Hsieh


Hi Gen,

I submitted a PR to fix the issue of refreshByPath: https://github.com/apache/spark/pull/17064

Thanks.


tgbaggio wrote
Hi, The example that I provided is not very clear. And I add a more clear
example in jira.

Thanks

Cheers
Gen

On Wed, Feb 22, 2017 at 3:47 PM, gen tang <[hidden email]> wrote:

> Hi Kazuaki Ishizaki
>
> Thanks a lot for your help. It works. However, a more strange bug appears
> as follows:
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.SparkSession
>
> def f(path: String, spark: SparkSession): DataFrame = {
>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>   println(data.count)
>   val df = data.filter("id>10")
>   df.cache
>   println(df.count)
>   val df1 = df.filter("id>11")
>   df1.cache
>   println(df1.count)
>   df1
> }
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is incorrect
>
> If we move refreshByPath into f(), just before spark.read. The whole code
> works fine.
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <[hidden email]>
> wrote:
>
>> Hi,
>> Thank you for pointing out the JIRA.
>> I think that this JIRA suggests you to insert
>> "spark.catalog.refreshByPath(dir)".
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> Regards,
>> Kazuaki Ishizaki
>>
>>
>>
>> From:        gen tang <[hidden email]>
>> To:        [hidden email]
>> Date:        2017/02/22 15:02
>> Subject:        Re: A DataFrame cache bug
>> ------------------------------
>>
>>
>>
>> Hi All,
>>
>> I might find a related issue on jira:
>>
>> *https://issues.apache.org/jira/browse/SPARK-15678*
>> <https://issues.apache.org/jira/browse/SPARK-15678>
>>
>> This issue is closed, may be we should reopen it.
>>
>> Thanks
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*[hidden email]*
>> <[hidden email]>> wrote:
>> Hi All,
>>
>> I found a strange bug which is related with reading data from a updated
>> path and cache operation.
>> Please consider the following code:
>>
>> import org.apache.spark.sql.DataFrame
>>
>> def f(data: DataFrame): DataFrame = {
>>   val df = data.filter("id>10")
>>   df.cache
>>   df.count
>>   df
>> }
>>
>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>> correct
>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>> is correct
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> In fact when we use df1.filter("id>10"), spark will however use old
>> cached dataFrame
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>>
>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/