[Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
It is well known that wide tables are not the most efficient way to organize
data. However, sometimes we have to deal with extremely wide tables
featuring thousands of columns. For example, loading data from legacy
systems.

*We have performed an investigation of how the number of columns affects the
duration of Spark jobs. *

Two basic Spark (2.3.1) jobs are used for testing. The two jobs use distinct
approaches to instantiate a DataFrame. Each reads a .csv file into a
DataFrame and performs count. Each job is repeated with input files having
different number of columns and the execution time is measured. 16 files
with 100 - 20,000 columns are used. The files are generated in such a way
that their size (rows * columns) is constant (200,000 cells, approx. 2 MB).
This means the files with more columns have fewer rows. Each job is repeated
7 times for each file, in order to accumulate better statistics.

The results of the measurements are shown in the figure
  job_duration_VS_number_of_columns.jpg
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/job_duration_VS_number_of_columns.jpg>  
Significantly different complexity of DataFrame construction is observed for
the two approaches:

*1. spark.read.format()*: similar results for
          a. csv and parquet formats (parquet created from the same csv):
.format(<csv/parquet>)
          b. schema-on-read on/off:  .option(inferSchema=<true/false>)
          c. provided schema loaded from file (stored schema from previous
run): .schema(<schema>)
Polynomial  complexity on the number of columns is observed.

// Get SparkSession
val spark = SparkSession
  .builder
  .appName(s"TestSparkReadFormat${runNo}")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
  .config("spark.debug.maxToStringFields", 20000)
  .getOrCreate()

// Read data  
val df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "false")
  .option("header", "true")
  .load(inputPath)

// Count rows and columns
val nRows = df.count()
val nColumns = df.columns.length
spark.stop()


*2. spark.createDataFrame(rows, schema)*: where rows and schema are
constructed by splitting lines of text file.
Linear complexity on the number of columns is observed.

// Get SparkSession
val spark = SparkSession
  .builder
  .appName(s"TestSparkCreateDataFrame${runNo}")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
  .config("spark.debug.maxToStringFields", 20000)
  .getOrCreate()

// load file
val sc = spark.sparkContext
val lines = sc.textFile(inputPath)

//create schema from headers
val headers = lines.first
val fs = headers.split(",").map(f => StructField(f, StringType))
val schema = StructType(fs)

// read data
val noheaders = lines.filter(_ != headers)
val rows = noheaders.map(_.split(",")).map(a => Row.fromSeq(a))

// create Data Frame
val df: DataFrame = spark.createDataFrame(rows, schema)

// count rows and columns
val nRows = df.count()
val nColumns = df.columns.length
spark.stop()

The similar polynomial complexity on the total number of columns in a
DataFrame is also observed in more complex testing jobs. Those jobs perform
the following transformations on the fixed number of columns:
• Filter
• GroupBy
• Sum
• withColumn

What could be the reason for the polynomial dependence of the job duration
on the number of columns? *What is an efficient way to address wide data
using Spark?
*



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

antonkulaga
I have the same problem with gene expressions data (
javascript:portalClient.browseDatasets.downloadFile('GTEx_Analysis_2016-01-15_v7_RNASeQCv1.1.8_gene_tpm.gct.gz','gtex_analysis_v7/rna_seq_data/GTEx_Analysis_2016-01-15_v7_RNASeQCv1.1.8_gene_tpm.gct.gz')
where I have tens of thousands genes as columns. No idea why Spark is
slooooooooooooooooooooooow there



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

0xF0F0F0
In reply to this post by makatun
This (and related JIRA tickets) might shed some light on the problem

http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-td20803.html


Sent with ProtonMail Secure Email.

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On August 6, 2018 2:44 PM, makatun <[hidden email]> wrote:

> It is well known that wide tables are not the most efficient way to organize
> data. However, sometimes we have to deal with extremely wide tables
> featuring thousands of columns. For example, loading data from legacy
> systems.
>
> *We have performed an investigation of how the number of columns affects the
> duration of Spark jobs. *
>
> Two basic Spark (2.3.1) jobs are used for testing. The two jobs use distinct
> approaches to instantiate a DataFrame. Each reads a .csv file into a
> DataFrame and performs count. Each job is repeated with input files having
> different number of columns and the execution time is measured. 16 files
> with 100 - 20,000 columns are used. The files are generated in such a way
> that their size (rows * columns) is constant (200,000 cells, approx. 2 MB).
> This means the files with more columns have fewer rows. Each job is repeated
> 7 times for each file, in order to accumulate better statistics.
>
> The results of the measurements are shown in the figure
> job_duration_VS_number_of_columns.jpg
> http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/job_duration_VS_number_of_columns.jpg
> Significantly different complexity of DataFrame construction is observed for
> the two approaches:
>
> 1. spark.read.format(): similar results for
> a. csv and parquet formats (parquet created from the same csv):
> .format(<csv/parquet>)
>
>           b. schema-on-read on/off:  .option(inferSchema=<true/false>)
>
>           c. provided schema loaded from file (stored schema from previous
>
>
> run): .schema(<schema>)
> Polynomial complexity on the number of columns is observed.
>
> // Get SparkSession
> val spark = SparkSession
> .builder
> .appName(s"TestSparkReadFormat${runNo}")
> .master("local[]")
> .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
> .config("spark.debug.maxToStringFields", 20000)
> .getOrCreate()
> // Read data
> val df = spark.read.format("csv")
> .option("sep", ",")
> .option("inferSchema", "false")
> .option("header", "true")
> .load(inputPath)
> // Count rows and columns
> val nRows = df.count()
> val nColumns = df.columns.length
> spark.stop()
> 2. spark.createDataFrame(rows, schema): where rows and schema are
> constructed by splitting lines of text file.
> Linear complexity on the number of columns is observed.
> // Get SparkSession
> val spark = SparkSession
> .builder
> .appName(s"TestSparkCreateDataFrame${runNo}")
> .master("local[]").config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
> .config("spark.debug.maxToStringFields", 20000)
> .getOrCreate()
>
> // load file
> val sc = spark.sparkContext
> val lines = sc.textFile(inputPath)
>
> //create schema from headers
> val headers = lines.first
> val fs = headers.split(",").map(f => StructField(f, StringType))
> val schema = StructType(fs)
>
> // read data
> val noheaders = lines.filter(_ != headers)
> val rows = noheaders.map(_.split(",")).map(a => Row.fromSeq(a))
>
> // create Data Frame
> val df: DataFrame = spark.createDataFrame(rows, schema)
>
> // count rows and columns
> val nRows = df.count()
> val nColumns = df.columns.length
> spark.stop()
>
> The similar polynomial complexity on the total number of columns in a
> DataFrame is also observed in more complex testing jobs. Those jobs perform
> the following transformations on the fixed number of columns:
> • Filter
> • GroupBy
> • Sum
> • withColumn
>
> What could be the reason for the polynomial dependence of the job duration
> on the number of columns? *What is an efficient way to address wide data
> using Spark?
> *
>
>
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ----------------------------------------------------------------------
>
> To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

Steve Loughran
In reply to this post by makatun
CVS with schema inference is a full read of the data, so that could be one of the problems. Do it at most once, print out the schema and use it from then on during ingress & use something else for persistence

On 6 Aug 2018, at 05:44, makatun <[hidden email]> wrote:

         a. csv and parquet formats (parquet created from the same csv): 
.format(<csv/parquet>)
         b. schema-on-read on/off:  .option(inferSchema=<true/false>) 

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
Steve, thank you for your response.
We have tested the spark.read with various options. The difference in
performance is very small. In particular, inference makes virtually no
effect in the tested case (the testing files have just few rows) Moreover,
the complexity of spark.read remains polynomial on the number of columns in
all the considered cases. In contrast, spark.createDataFrame(data,schema) is
linear and faster by a large factor. *What could be the reason for such a
dramatic difference in performance?*

Please, find the plot with our measurements below. The code is exactly the
same as in the initial post. The only thing which was changing is the
additional settings of the spark.read. It includes:
-read.format(csv).option("inferSchema", "false")
-read.format(csv).option("inferSchema", "true")
-read.format(csv).schema(schema) where schema is provided from a prepared
json file
-read.parquet which reads parquet file (including schema) prepared from the
same CSVs
-createDataFrame(data,schema) where data is parsed to rows from CSV and
schema is constructed from its header

<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark_read_complexity_on_columns.jpg>



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
In reply to this post by 0xF0F0F0
Following the discussion and recommendations by the link you provided, we ran
tests with disabled constraint propagation, using the following option:
spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)
The resulting measurements are on the plot:
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/disable_constraint_propagation_comparison.jpg>
Unfortunately this does not provide any difference for the described test
jobs. To our understanding, this is because our test jobs do not include any
iterations or complex transformations (just read file and count).
For the same reason, it is not clear how checkpointing or caching could be
applied here.

The transformation plan is trivial, and it seems not being changed by the
Catalyst optimizer at different optimization stages. Here is the output of
DataFrame.explain():

== Parsed Logical Plan ==
Relation[key#10,id#11,version#12,Date#13,value#14,address#15,col7_date#16,col8_float#17,col9_boolean#18,col10_string#19,col11_int#20,col12_date#21,col13_float#22,col14_boolean#23,col15_string#24,col16_int#25,col17_date#26,col18_float#27,col19_boolean#28,col20_string#29,col21_int#30,col22_date#31,col23_float#32,col24_boolean#33,col25_string#34,col26_int#35,col27_date#36,col28_float#37,col29_boolean#38,col30_string#39,col31_int#40,col32_date#41,col33_float#42,col34_boolean#43,col35_string#44,col36_int#45,col37_date#46,col38_float#47,col39_boolean#48,col40_string#49,col41_int#50,col42_date#51,col43_float#52,col44_boolean#53,col45_string#54,col46_int#55,col47_date#56,col48_float#57,col49_boolean#58,col50_string#59,col51_int#60,col52_date#61,col53_float#62,col54_boolean#63,col55_string#64,col56_int#65,col57_date#66,col58_float#67,col59_boolean#68,col60_string#69,col61_int#70,col62_date#71,col63_float#72,col64_boolean#73,col65_string#74,col66_int#75,col67_date#76,col68_float#77,col69_boolean#78,col70_string#79,col71_int#80,col72_date#81,col73_float#<a href="http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/readFormat_visualVM_sampler.jpg82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109">http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/readFormat_visualVM_sampler.jpg82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109]
csv

== Analyzed Logical Plan ==
key: string, id: string, version: string, Date: string, value: string,
address: string, col7_date: string, col8_float: string, col9_boolean:
string, col10_string: string, col11_int: string, col12_date: string,
col13_float: string, col14_boolean: string, col15_string: string, col16_int:
string, col17_date: string, col18_float: string, col19_boolean: string,
col20_string: string, col21_int: string, col22_date: string, col23_float:
string, col24_boolean: string, col25_string: string, col26_int: string,
col27_date: string, col28_float: string, col29_boolean: string,
col30_string: string, col31_int: string, col32_date: string, col33_float:
string, col34_boolean: string, col35_string: string, col36_int: string,
col37_date: string, col38_float: string, col39_boolean: string,
col40_string: string, col41_int: string, col42_date: string, col43_float:
string, col44_boolean: string, col45_string: string, col46_int: string,
col47_date: string, col48_float: string, col49_boolean: string,
col50_string: string, col51_int: string, col52_date: string, col53_float:
string, col54_boolean: string, col55_string: string, col56_int: string,
col57_date: string, col58_float: string, col59_boolean: string,
col60_string: string, col61_int: string, col62_date: string, col63_float:
string, col64_boolean: string, col65_string: string, col66_int: string,
col67_date: string, col68_float: string, col69_boolean: string,
col70_string: string, col71_int: string, col72_date: string, col73_float:
string, col74_boolean: string, col75_string: string, col76_int: string,
col77_date: string, col78_float: string, col79_boolean: string,
col80_string: string, col81_int: string, col82_date: string, col83_float:
string, col84_boolean: string, col85_string: string, col86_int: string,
col87_date: string, col88_float: string, col89_boolean: string,
col90_string: string, col91_int: string, col92_date: string, col93_float:
string, col94_boolean: string, col95_string: string, col96_int: string,
col97_date: string, col98_float: string, col99_boolean: string,
col100_string: string
Relation[key#10,id#11,version#12,Date#13,value#14,address#15,col7_date#16,col8_float#17,col9_boolean#18,col10_string#19,col11_int#20,col12_date#21,col13_float#22,col14_boolean#23,col15_string#24,col16_int#25,col17_date#26,col18_float#27,col19_boolean#28,col20_string#29,col21_int#30,col22_date#31,col23_float#32,col24_boolean#33,col25_string#34,col26_int#35,col27_date#36,col28_float#37,col29_boolean#38,col30_string#39,col31_int#40,col32_date#41,col33_float#42,col34_boolean#43,col35_string#44,col36_int#45,col37_date#46,col38_float#47,col39_boolean#48,col40_string#49,col41_int#50,col42_date#51,col43_float#52,col44_boolean#53,col45_string#54,col46_int#55,col47_date#56,col48_float#57,col49_boolean#58,col50_string#59,col51_int#60,col52_date#61,col53_float#62,col54_boolean#63,col55_string#64,col56_int#65,col57_date#66,col58_float#67,col59_boolean#68,col60_string#69,col61_int#70,col62_date#71,col63_float#72,col64_boolean#73,col65_string#74,col66_int#75,col67_date#76,col68_float#77,col69_boolean#78,col70_string#79,col71_int#80,col72_date#81,col73_float#82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109]
csv

== Optimized Logical Plan ==
Relation[key#10,id#11,version#12,Date#13,value#14,address#15,col7_date#16,col8_float#17,col9_boolean#18,col10_string#19,col11_int#20,col12_date#21,col13_float#22,col14_boolean#23,col15_string#24,col16_int#25,col17_date#26,col18_float#27,col19_boolean#28,col20_string#29,col21_int#30,col22_date#31,col23_float#32,col24_boolean#33,col25_string#34,col26_int#35,col27_date#36,col28_float#37,col29_boolean#38,col30_string#39,col31_int#40,col32_date#41,col33_float#42,col34_boolean#43,col35_string#44,col36_int#45,col37_date#46,col38_float#47,col39_boolean#48,col40_string#49,col41_int#50,col42_date#51,col43_float#52,col44_boolean#53,col45_string#54,col46_int#55,col47_date#56,col48_float#57,col49_boolean#58,col50_string#59,col51_int#60,col52_date#61,col53_float#62,col54_boolean#63,col55_string#64,col56_int#65,col57_date#66,col58_float#67,col59_boolean#68,col60_string#69,col61_int#70,col62_date#71,col63_float#72,col64_boolean#73,col65_string#74,col66_int#75,col67_date#76,col68_float#77,col69_boolean#78,col70_string#79,col71_int#80,col72_date#81,col73_float#82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109]
csv

== Physical Plan ==
*(1) FileScan csv
[key#10,id#11,version#12,Date#13,value#14,address#15,col7_date#16,col8_float#17,col9_boolean#18,col10_string#19,col11_int#20,col12_date#21,col13_float#22,col14_boolean#23,col15_string#24,col16_int#25,col17_date#26,col18_float#27,col19_boolean#28,col20_string#29,col21_int#30,col22_date#31,col23_float#32,col24_boolean#33,col25_string#34,col26_int#35,col27_date#36,col28_float#37,col29_boolean#38,col30_string#39,col31_int#40,col32_date#41,col33_float#42,col34_boolean#43,col35_string#44,col36_int#45,col37_date#46,col38_float#47,col39_boolean#48,col40_string#49,col41_int#50,col42_date#51,col43_float#52,col44_boolean#53,col45_string#54,col46_int#55,col47_date#56,col48_float#57,col49_boolean#58,col50_string#59,col51_int#60,col52_date#61,col53_float#62,col54_boolean#63,col55_string#64,col56_int#65,col57_date#66,col58_float#67,col59_boolean#68,col60_string#69,col61_int#70,col62_date#71,col63_float#72,col64_boolean#73,col65_string#74,col66_int#75,col67_date#76,col68_float#77,col69_boolean#78,col70_string#79,col71_int#80,col72_date#81,col73_float#82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109]
Batched: false, Format: CSV, Location:
InMemoryFileIndex[file:/F:/20kColumns/var_cols/comma_records200000_narrow_columns100Xrows2000.csv],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<key:string,id:string,version:string,Date:string,value:string,address:string,col7_date:stri...
00
Relation[key#10,id#11,version#12,Date#13,value#14,address#15,col7_date#16,col8_float#17,col9_boolean#18,col10_string#19,col11_int#20,col12_date#21,col13_float#22,col14_boolean#23,col15_string#24,col16_int#25,col17_date#26,col18_float#27,col19_boolean#28,col20_string#29,col21_int#30,col22_date#31,col23_float#32,col24_boolean#33,col25_string#34,col26_int#35,col27_date#36,col28_float#37,col29_boolean#38,col30_string#39,col31_int#40,col32_date#41,col33_float#42,col34_boolean#43,col35_string#44,col36_int#45,col37_date#46,col38_float#47,col39_boolean#48,col40_string#49,col41_int#50,col42_date#51,col43_float#52,col44_boolean#53,col45_string#54,col46_int#55,col47_date#56,col48_float#57,col49_boolean#58,col50_string#59,col51_int#60,col52_date#61,col53_float#62,col54_boolean#63,col55_string#64,col56_int#65,col57_date#66,col58_float#67,col59_boolean#68,col60_string#69,col61_int#70,col62_date#71,col63_float#72,col64_boolean#73,col65_string#74,col66_int#75,col67_date#76,col68_float#77,col69_boolean#78,col70_string#79,col71_int#80,col72_date#81,col73_float#82,col74_boolean#83,col75_string#84,col76_int#85,col77_date#86,col78_float#87,col79_boolean#88,col80_string#89,col81_int#90,col82_date#91,col83_float#92,col84_boolean#93,col85_string#94,col86_int#95,col87_date#96,col88_float#97,col89_boolean#98,col90_string#99,col91_int#100,col92_date#101,col93_float#102,col94_boolean#103,col95_string#104,col96_int#105,col97_date#106,col98_float#107,col99_boolean#108,col100_string#109]
csv

However, to our observations, most of the time is spent in the driver. The
computation itself takes little time. Here is the timeline
&lt;nabble_img src=&quot;timeline.png&quot; border=&quot;0&quot;/>

Java Visual VM sampler shows that most of the time is spent in Catalyst.
About 94% of the time is spent in method
catalyst.plans.logical.LogicalPlan.resolve() and 36% in
catalyst.plans.logical.LogicalPlan$$anonfun$resolve$2.apply()


SUMMARY:
Based on the above discussion, it is very interesting why the time to
instantiate a DataFrame  has polynomial dependency on the number of columns
(the same was also observed for basic transformations). Could this be
performed in linear time, as in the second testing job with
spark.createDataFrame(data,schema)? In both cases the schema (all columns
are strings), content and output are exactly the same. Could this polynomial
dependence be caused by Catalyst, bug in Spark core, code generation or JVM
setup? Or are we missing something?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
Here are the images missing in the previous mail. My apologies.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/timeline.png>
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/readFormat_visualVM_Sampler.jpg>



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

Marco Gaido
Hi Makatun,

I think your problem has been solved in https://issues.apache.org/jira/browse/SPARK-16406 which is going to be in Spark 2.4.
Please try on the current master, where you should see the problem disappeared.

Thanks,
Marco

2018-08-09 12:56 GMT+02:00 makatun <[hidden email]>:
Here are the images missing in the previous mail. My apologies.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/timeline.png>
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/readFormat_visualVM_Sampler.jpg>



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

antonkulaga
Is it not going to be backported to 2.3.2? I am totally blocked by this issue
in one of my projects.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
In reply to this post by Marco Gaido
Hi Marco,
many thanks for pointing the related Spark commit. According to the
discription, it introduces indexed (instead of linear) search over columns
in LogicalPlan.resolve(...).
We have performed the tests on the current Spark master branch and would
like to share the results. There are some good (see 1) and not so good news
(see 2 and 3).

RESULTS OF TESTING CURRENT MASTER:

1. First of all, the performance of the test jobs described in the initial
post has improved dramatically. In the new version the duration is linear on
the number of columns (observed up to 40K columns). Please, see the plot
below
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-read-count_2-3_VS_2-4.png>
The similar results were observed for the transfromations: filter, groupBy,
sum, withColumn, drop.  This is a huge performance improvement which is
critical to those working with wide tables, e.g. in machine learning or
importing data from legacy systems. Many thanks to the authors of this
commit.

2. When adding caching to the test jobs (.cache() right before the .count())
the duration of jobs increases and become polynomial on the number of
columns. The plot below shows the effect of caching in both spark 2.3.1 and
2.4.0-SNAPSHOT for a better comparison.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-cache_2-3_VS_2-4.png>
The spark 2.4.0-SNAPSHOT completes the jobs faster than 2.3.1. However, the
reason for the polynomial compexity of caching on columns is not very clear.

3. We have also performed tests with more complex transformations. Compared
to the initial test jobs, the following transformation is added:
     
     df.schema.fields.foldLeft(df)({ // iterate over initial columns
       case (accDf: DataFrame, attr: StructField) => {
       accDf.withColumn(s"${attr.name}_isNotNull",
df.col(attr.name).isNotNull) // add new column
         .drop(attr.name) // remove initial column
           }
     }).count()

It iterates over the initial columns. For each column it adds a new boolean
column indicating if the value in the initial column is not null. Then the
initial column is dropped.
The measured job duration VS number of columns is at the plot below.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-loop-over-columns_2-3_VS_2-4.png>
The duration of such jobs has significantly increased compared to Spark
2.3.1. Again, it is polynomial on the number of columns.

CONSIDERED OPTIMIZATIONS:
a) Disabling constraint propagation
<spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)>
decreases the duration by 15%, but does not solve the principal problem.

b) Checkpointing after every 100 columns may decrease the time (by up to 40%
in our experiments). It prunes the linage and therefore simplifies the work
for the Catalyst optimizer. However, it comes at a high cost: the executers
have to scan over all the rows at each checkpoint. In many situations (e.g.
> 100K rows per executor, or narrow tables with < 100 columns) checkpointing
increases the overall duration. Even in the idealistic case of just a few
rows, the speed-up by checkpointing is still not enough to adrees many tens
of thousands of columns.

CONCLUSION:
The new improvement in the upcoming spark 2.4 introduces indexed search over
columns in LogicalPlan.resolve(...). It results in a great performance
improvement in basic transformations. However, there are still some
transfromations which are problematic for wide data. In particular, .cache()
demonstrates polynomial complexity on the number of columns. The duration of
jobs featuring iteration over columns is increased compared to the current
Spark-2.3.1. There are potentially parts of code where search over columns
remaines linear. A discussion on further possible optimization is very
welcome.







--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

antonkulaga
makatun, did you try to test somewhing more complex, like dataframe.describe
or PCA?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

Manu Zhang
Hi Makatun,

For 2, I guess `cache` will break up the logical plan and force it be analyzed.
For 3, I have a similar observation here https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015. Each `withColumn` will force the logical plan to be analyzed which is not free. There is `RuleExecutor.dumpTimeSpent` that prints analysis time and turning on DEBUG log will also give you much more info.

Thanks,
Manu Zhang

On Mon, Aug 20, 2018 at 10:25 PM antonkulaga <[hidden email]> wrote:
makatun, did you try to test somewhing more complex, like dataframe.describe
or PCA?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Performance] Spark DataFrame is slow with wide data. Polynomial complexity on the number of columns is observed. Why?

makatun
Manu,
thank you very much for your response.

1. Your post helps to further optimize the spark jobs for wide data.
(https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015)
 The suggested change of code:

df.select(df.columns.map { col =>
  df(col).isNotNull
}: _*)

provides much better performance compared to the previous approach (where we
use .withColumn method and loop over initial columns). The difference
becomes astonishing when using the current Spark master (2.4.0-SNAPSHOT).
Please, see the results of our measurements at the plot below.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/loop-withColumn_VS_select-map-2.png>
The combination of the recent improvement in the Catalyst optimizer and more
efficient code makes the game changing difference: the job duration becomes
linear on the number of columns. The test jobs are able to process 40K
columns in less than 20 seconds. In contrast, before the optimizations (see
the code from the previous posts) the jobs were not able to process more
than 1600 columns (which was taking minutes).

2.  CACHING

Manu Zhang wrote
>>For 2, I guess `cache` will break up the logical plan and force it be
>>analyzed.

According to this explanation, caching does not break the logical plan:

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md#cache-and-checkpoint

The structure of our testing jobs with caching (see our previous posts with
the code and results) is very basic:
.read csv -> .cache -> .count
compared to:
.read csv -> .count
Addition of caching increases the job duration significantly. This is
especially critical in Spark-2.4.0-SNAPSHOT. There, the jobs have linear
duration on the number of columns without caching, but it becomes polynomial
when caching is added. The csv files used for testing are approx. 2MB, so it
should not be a problem to accommodate them in memory. As far as we
understand, this is not an expected behavior of caching.  

3.

antonkulaga  wrote
>> did you try to test somewhing more complex, like dataframe.describe or
>> PCA?

Anton Kulaga,
we use dataframe.describe mostly for the debugging purposes. Its execution
takes additional time, but we did not perform measurements, because,
typically, it is not included in the production jobs.
We also did not tested PCA transformations. It would be very interesting if
you could share your observations/measurements for those.

CONCLUSION:
-Using .withColumn has a high cost in Catalyst optimizer. Alternative
approach using .select with mapping of columns allows to reduce job duration
dramatically and enables processing tables with tens of thousands of
columns.
-It would be interesting to further investigate how the complexity of
caching is influenced by the number of columns.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]