pyspark DataFrameWriter ignores customized settings?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pyspark DataFrameWriter ignores customized settings?

chhsiao1981
hi all,

I am using spark-2.2.1-bin-hadoop2.7 with stand-alone mode.

(python version: 3.5.2 from ubuntu 16.04)
I intended to have DataFrame write to hdfs with customized block-size but failed.
However, the corresponding rdd can successfully write with the customized block-size.
 
Could you help me figure out the issue?

Best regards,
Hsiao


 
The following is the test code:
(dfs.namenode.fs-limits.min-block-size has been set as 131072 in hdfs)
 
 
##########
init
##########
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
 
import hdfs
from hdfs import InsecureClient
import os
 
import numpy as np
import pandas as pd
import logging
 
os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'
 
block_size = 512 * 1024
 
conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))
 
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize", block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size", block_size)
 
##########
# main
##########

# create DataFrame
df_txt = spark.createDataFrame([\{'temp': "hello"}, \{'temp': "world"}, \{'temp': "!"}])

save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')

Reply | Threaded
Open this post in threaded view
|

Re: pyspark DataFrameWriter ignores customized settings?

chhsiao1981
Hi all,

Looks like it's parquet-specific issue.

I can successfully write with 512k block-size
if I use df.write.csv() or use df.write.text()
(I can successfully do csv write when I put hadoop-lzo-0.4.15-cdh5.13.0.jar
into the jars dir)

sample code:


block_size = 512 * 1024

conf =
SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size",
str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
str(131072))

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# create DataFrame
df_txt = spark.createDataFrame([\{'temp': "hello"}, \{'temp': "world"},
\{'temp': "!"}])

# save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using DataFrameWriter.csv, resulting 512k-block-size
df_txt.write.mode('overwrite').csv('hdfs://spark1/tmp/temp_with_df_csv')

# save using DataFrameWriter.text, resulting 512k-block-size
df_txt.write.mode('overwrite').text('hdfs://spark1/tmp/temp_with_df_text')

# save using rdd, resulting 512k-block-size
client = InsecureClient('<a href="http://spark1:50070'">http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: pyspark DataFrameWriter ignores customized settings?

chhsiao1981
Hi all,

Found the answer from the following link:

https://forums.databricks.com/questions/918/how-to-set-size-of-parquet-output-files.html

I can successfully setup parquet block size with
spark.hadoop.parquet.block.size.

The following is the sample code:

# init
block_size = 512 * 1024

conf =
SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set('spark.hadoop.parquet.block.size',
str(block_size)).set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size",
str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
str(131072))

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# create DataFrame
df_txt = spark.createDataFrame([{'temp': "hello"}, {'temp': "world"},
{'temp': "!"}])

# save using DataFrameWriter, resulting 512k-block-size

df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: pyspark DataFrameWriter ignores customized settings?

Ryan Blue
To clarify what's going on here: dfs.blocksize and dfs.block.size set the HDFS block size (the spark.hadoop. prefix adds this to the Hadoop configuration). The Parquet "block size" is more accurately called the "row group size", but is set using the unfortunately-named property parquet.block.size. I think the reason why this was hard for anyone to answer was that the HDFS block size was set correctly, but Parquet's row group size was what the value was intended for.

HDFS doesn't know anything about Parquet's row group size and still splits its blocks on the HDFS block size. Parquet tries to fit a whole number of row groups into the HDFS blocks and will pad if necessary to avoid reading a row group from multiple blocks. The Parquet community's recommendation for row group size is to use a size that is large (tens of megabytes at a minimum) and that divides the HDFS block size (to fit a whole number of row groups). The default, 128MB row groups, is reasonable for all of the datasets that I've tuned, but some use cases have opted for smaller values (16 or 32MB) to increase parallelism.

rb

On Fri, Mar 16, 2018 at 3:37 PM, chhsiao1981 <[hidden email]> wrote:
Hi all,

Found the answer from the following link:

https://forums.databricks.com/questions/918/how-to-set-size-of-parquet-output-files.html

I can successfully setup parquet block size with
spark.hadoop.parquet.block.size.

The following is the sample code:

# init
block_size = 512 * 1024

conf =
SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set('spark.hadoop.parquet.block.size',
str(block_size)).set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size",
str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
str(131072))

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# create DataFrame
df_txt = spark.createDataFrame([{'temp': "hello"}, {'temp': "world"},
{'temp': "!"}])

# save using DataFrameWriter, resulting 512k-block-size

df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Ryan Blue
Software Engineer
Netflix