Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

Chetan Khatri
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <[hidden email]> wrote:
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i applied partitioning by date column which worked fine but it has generate more number of files in almost 700 partitions but i wanted to use reparation to reduce File I/O by reducing number of files inside each partition.

But i ended up with below exception:

ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

Ryan Blue
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to avoid the situation where each task has to write to every partition. You can do that either by adding a repartition by your table's partition keys, or by adding an order by with the partition keys and then columns you normally use to filter when reading the table. I generally recommend the second approach because it handles skew and prepares the data for more efficient reads.

If that doesn't help, then you should look at your memory settings. When you're getting killed by YARN, you should consider setting `spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory that the JVM doesn't account for. That is usually an easier fix than increasing the memory overhead. Also, when you set executor memory, always change spark.memory.fraction to ensure the memory you're adding is used where it is needed. If your memory fraction is the default 60%, then 60% of the memory will be used for Spark execution, not reserved whatever is consuming it and causing the OOM. (If Spark's memory is too low, you'll see other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <[hidden email]> wrote:
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <[hidden email]> wrote:
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i applied partitioning by date column which worked fine but it has generate more number of files in almost 700 partitions but i wanted to use reparation to reduce File I/O by reducing number of files inside each partition.

But i ended up with below exception:

ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.






--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

Chetan Khatri
Ryan, 
Thank you for reply.

For 2 TB of Data what should be the value of spark.yarn.executor.memoryOverhead = ?

with regards to this - i see issue at spark https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it works or not at Spark 2.0.1  !

can you elaborate more for spark.memory.fraction setting.

number of partitions = 674
Cluster: 455 GB total memory, VCores: 288, Nodes: 17
Given / tried memory config: executor-mem = 16g, num-executor=10, executor cores=6, driver mem=4g

spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
spark.yarn.executor.memoryOverhead=2048
spark.shuffle.io.preferDirectBufs=false









On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <[hidden email]> wrote:
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to avoid the situation where each task has to write to every partition. You can do that either by adding a repartition by your table's partition keys, or by adding an order by with the partition keys and then columns you normally use to filter when reading the table. I generally recommend the second approach because it handles skew and prepares the data for more efficient reads.

If that doesn't help, then you should look at your memory settings. When you're getting killed by YARN, you should consider setting `spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory that the JVM doesn't account for. That is usually an easier fix than increasing the memory overhead. Also, when you set executor memory, always change spark.memory.fraction to ensure the memory you're adding is used where it is needed. If your memory fraction is the default 60%, then 60% of the memory will be used for Spark execution, not reserved whatever is consuming it and causing the OOM. (If Spark's memory is too low, you'll see other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <[hidden email]> wrote:
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <[hidden email]> wrote:
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i applied partitioning by date column which worked fine but it has generate more number of files in almost 700 partitions but i wanted to use reparation to reduce File I/O by reducing number of files inside each partition.

But i ended up with below exception:

ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.






--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

Holden Karau
The memory overhead is based less on the total amount of data and more on what you end up doing with the data (e.g. if your doing a lot of off-heap processing or using Python you need to increase it). Honestly most people find this number for their job "experimentally" (e.g. they try a few different things).

On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri <[hidden email]> wrote:
Ryan, 
Thank you for reply.

For 2 TB of Data what should be the value of spark.yarn.executor.memoryOverhead = ?

with regards to this - i see issue at spark https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it works or not at Spark 2.0.1  !

can you elaborate more for spark.memory.fraction setting.

number of partitions = 674
Cluster: 455 GB total memory, VCores: 288, Nodes: 17
Given / tried memory config: executor-mem = 16g, num-executor=10, executor cores=6, driver mem=4g

spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
spark.yarn.executor.memoryOverhead=2048
spark.shuffle.io.preferDirectBufs=false









On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <[hidden email]> wrote:
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to avoid the situation where each task has to write to every partition. You can do that either by adding a repartition by your table's partition keys, or by adding an order by with the partition keys and then columns you normally use to filter when reading the table. I generally recommend the second approach because it handles skew and prepares the data for more efficient reads.

If that doesn't help, then you should look at your memory settings. When you're getting killed by YARN, you should consider setting `spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory that the JVM doesn't account for. That is usually an easier fix than increasing the memory overhead. Also, when you set executor memory, always change spark.memory.fraction to ensure the memory you're adding is used where it is needed. If your memory fraction is the default 60%, then 60% of the memory will be used for Spark execution, not reserved whatever is consuming it and causing the OOM. (If Spark's memory is too low, you'll see other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <[hidden email]> wrote:
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <[hidden email]> wrote:
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i applied partitioning by date column which worked fine but it has generate more number of files in almost 700 partitions but i wanted to use reparation to reduce File I/O by reducing number of files inside each partition.

But i ended up with below exception:

ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.






--
Ryan Blue
Software Engineer
Netflix




--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

Chetan Khatri
Thanks Holden !


On Thu, Aug 3, 2017 at 4:02 AM, Holden Karau <[hidden email]> wrote:
The memory overhead is based less on the total amount of data and more on what you end up doing with the data (e.g. if your doing a lot of off-heap processing or using Python you need to increase it). Honestly most people find this number for their job "experimentally" (e.g. they try a few different things).

On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri <[hidden email]> wrote:
Ryan, 
Thank you for reply.

For 2 TB of Data what should be the value of spark.yarn.executor.memoryOverhead = ?

with regards to this - i see issue at spark https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it works or not at Spark 2.0.1  !

can you elaborate more for spark.memory.fraction setting.

number of partitions = 674
Cluster: 455 GB total memory, VCores: 288, Nodes: 17
Given / tried memory config: executor-mem = 16g, num-executor=10, executor cores=6, driver mem=4g

spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
spark.yarn.executor.memoryOverhead=2048
spark.shuffle.io.preferDirectBufs=false









On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <[hidden email]> wrote:
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to avoid the situation where each task has to write to every partition. You can do that either by adding a repartition by your table's partition keys, or by adding an order by with the partition keys and then columns you normally use to filter when reading the table. I generally recommend the second approach because it handles skew and prepares the data for more efficient reads.

If that doesn't help, then you should look at your memory settings. When you're getting killed by YARN, you should consider setting `spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory that the JVM doesn't account for. That is usually an easier fix than increasing the memory overhead. Also, when you set executor memory, always change spark.memory.fraction to ensure the memory you're adding is used where it is needed. If your memory fraction is the default 60%, then 60% of the memory will be used for Spark execution, not reserved whatever is consuming it and causing the OOM. (If Spark's memory is too low, you'll see other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <[hidden email]> wrote:
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <[hidden email]> wrote:
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i applied partitioning by date column which worked fine but it has generate more number of files in almost 700 partitions but i wanted to use reparation to reduce File I/O by reducing number of files inside each partition.

But i ended up with below exception:

ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.






--
Ryan Blue
Software Engineer
Netflix




--
Cell : 425-233-8271

Loading...