Spark performance tuning demystified

I think, perhaps, a good place to start with this is with an older article that runs down all of these parts together [1].

To start with, we want to be sure we know what resources are available to us, example cluster :

– 1 c4.8xlarge master
– 4 c4.8xlarge instances
– each with 36 CPU cores
– and each with 60 GB RAM.

First, The master will not count towards the calculation since it should be managing and not running jobs like a worker would.

Before doing anything else, we need to look at YARN, which allocates resources to the cluster. There are some things to consider:

1) To figure out what resources YARN has available, there is a chart for emr-5.11.0 showing that [2].

2) By default (for c4.8xlarge in the chart), YARN has 53248 MB (52 GB) max and 32 MB min available to it.

3) The max is supposed to be cleanly divisible by the minimum (1664 in this case), since memory incremented by the minimum number.

4) YARN is basically set for each instance it is running on (Total RAM to allocate to YARN to use).

5) Be sure to leave at least 1-2GB RAM and 1 vCPU for each instance’s O/S and other applications to run too. The default amount of RAM (52 GB out of 60 GB RAM on the instance) seems to cover this, but this will lave us with 35 (36-1) vCPUs per instance off the top.

7) If a Spark container runs over the amount of memory YARN has allocated, YARN will kill the container and retry to run it again.

When it comes to executors it may be worth considering the following:

1) This is set for the entire cluster. it doesn’t have to use all resources

2) Likely, you are going to want more than 1 executor per node, since parallelism is usually the goal. This can be experimented with to manually size a job, again it doesn’t even have to use all resources.

3) If we were to use the article [1] as an example, we may want to try an do 3 executors per node to start with (Not too few or too many), so –num-executors 11 [3 executors per node x 4 nodes = 12 executors, -1 for application master running = 11].
 
There are some big things to know about executor cores too, of course:

1) Although it doesn’t explicitly say it, I have noticed problems working with other customers, when total executors exceeds the instance’s vCPU / core count.

2) It is suggested to put aside at least 1 executor core per node to ensure there are resources left over to run O/S, App Master and the like.

3) Executor cores are being assigned per executor (discussed just above). So this isn’t total per node or for the cluster, but rather depending on the resources available divided by the number of executors wanted per node.

4) In the article’s example this works out to –executor-cores= 11 [36 cores/ vCPU per instance – 1 for AM/overhead = 35 cores. 35 cores/ 3 executors per node = 11 executor cores]

Finally, looking at the executor-memory, we also want to be sure of few things:

1) This is assigned like the executor cores (finally, something sized similarly). This is per executor and thus is not the total amount of memory on a node, or set in YARN, but rather depending on the resources available divided by the number of executors again (Like executor-cores)

2) Sometimes, some off-heap memory and other things can make an executor/container larger than the memory set. Leave room so YARN doesn’t kill it due to using more memory than is provided.

3) In the article’s example this works out to –executor-memory 16G (16384 MB) [52 GB provided to YARN / 3 executors = 17 GB RAM max. 17 GB-1 GB for extra overhead = 16GB (16384 MB)]
 
So, putting the above together as an example, we could use 3 executors per node with: –num-executors 11 –executor-cores 11 –executor-memory 16G

That’s the first part. As for the rest, let’s cover the cluster mode(s) [4] and the driver.

1) When we use –deploy-mode client, we tell YARN to deploy to an ‘external node’, but since this is being run from the master, it is straining the master for the entire cluster. Client mode is best if you have spark-shell on another instance in the same network and can submit a job from there to the cluster but use the instance’s resources for the driver (taking the load off the cluster) or when using it interactively to test since the application master is on the master instance and its logs will be there).

2) We probably want to use YARN as our deployment mode [5]. In particular, we should be looking at yarn-cluster. This means each container’s App master is deployed in the cluster and not just on one device (That is also managing the whole cluster). The resources running are spread out, but the logs are too for the application masters). There’s another good explanation on this available [6].
 
3) Remember, if you over-subscribe the driver memory and cores, there’s less for the rest of the cluster to use for actual work. Most often, the driver memory is set to 1-2GB; 20GB would take more than half the RAM and 4 cores would take most of the cores in our earlier example. I t may be worth leaving these as their defaults (not using the options) for now and then conservatively raising them if the logs inform you to do so.
 
I suspect you already have it, but as there are articles from all over in here, this is the EMR document with the options for emr-5.11.0.

Also, if this works and you want to tune further, you can see about increasing resources available to YARN (Like using perhaps 58 GB of RAM instead of 52 GB) and test those too.

When working withing Zeppelin to change the settings, it should allow changes by adding them in: Menu (top right) -> Interpreter -> (Add under the Spark section) as noted here [8]. The settings above with the “–” in-front are for doing a Spark submit. In Zeppelin they would be be the 3 like options you mentioned:
spark.executor.instances (–num-executors)
spark.executor.cores (–executor-cores)
spark.executor.memory (–executor-memory) (set using a G at the end for GB, like 16G )

Also while I was going through the attachment that you send I see that the performance is also hit by job doing spark speculation. S3 being a object if it does speculation then this is going to slow down the performance. [9]

So
a. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskschedulerimpl-speculative-execution.html(https://spark.apache.org/docs/latest/configuration.html) ensure spark.speculation os set to false (which should be default, but may allow the setting to work)
b. Insert oiverwrite + append +s3= bad, maybe writer to HDFS and do a s3distcp to s3 afterwards which will be much faster [10]

Resources:

[1] http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

[2] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-jvm

[3] http://spark.apache.org/docs/latest/submitting-applications.html

[4] http://spark.apache.org/docs/latest/cluster-overview.html

[5] http://spark.apache.org/docs/latest/running-on-yarn.html

[6] https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

[7] http://docs.aws.amazon.com/emr/latest/ReleaseGuide/latest/emr-spark-configure.html

[8] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-zeppelin.html#zeppelin-considerations

[9] http://agrajmangal.in/blog/big-data/spark-parquet-s3/

[10] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Using INSERT OVERWRITE requires listing the contents of the Amazon S3 bucket or folder. This is an expensive operation. If possible, manually prune the path instead of having Hive list and delete the existing objects. [1]

Amazon S3 is the most popular input and output source for Amazon EMR. A common mistake is to treat Amazon S3 as you would a typical file system. [2]
There are differences between Amazon S3 and a file system that you need to take into account when running your cluster.
* If an internal error occurs in Amazon S3, your application needs to handle this gracefully and re-try the operation.
* If calls to Amazon S3 take too long to return, your application may need to reduce the frequency at which it calls Amazon S3.
* Listing all the objects in an Amazon S3 bucket is an expensive call. Your application should minimize the number of times it does this.
There are several ways you can improve how your cluster interacts with Amazon S3.
* Use S3DistCp to move objects in and out of Amazon S3. S3DistCp implements error handling, retries and back-offs to match the requirements of Amazon S3. For more information, see Distributed Copy Using S3DistCp.
* Design your application with eventual consistency in mind. Use HDFS for intermediate data storage while the cluster is running and Amazon S3 only to input the initial data and output the final results.
* If your clusters will commit 200 or more transactions per second to Amazon S3, contact support to prepare your bucket for greater transactions per second and consider using the key partition strategies described in Amazon S3 Performance Tips & Tricks.
* Set the Hadoop configuration setting io.file.buffer.size to 65536. This causes Hadoop to spend less time seeking through Amazon S3 objects.
* Consider disabling Hadoop’s speculative execution feature if your cluster is experiencing Amazon S3 concurrency issues. You do this through the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution configuration settings. This is also useful when you are troubleshooting a slow cluster.

References:
[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-error-hive.html#emr-troubleshoot-error-hive-3
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#emr-troubleshoot-errors-io-1

 

Need to learn more about aws big data (demystified)?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s