How to work with Maximize Resource Allocation and spark dynamic Allocation [ aws emr spark ]

Spark resource tuning is essentially a case of fitting the number of executors we want to assign per job to the available resources in the cluster. In this case, we need to look at the EMR cluster from an “overall” perspective. If we assume your Core and Task nodes are 40 (20+20) instances of type m3.xlarge.
As mentioned before, each m3.xlarge instance has 4 vCPUs and 15GiB of RAM available for use [1]. Since we are going to leave 1 vCPU per node free as per the initial issue, this gives us a total cluster vCPU resources of:
40 x 3 vCPUs = 120vCPUs

For m3.xlarge instance types, the maximum amount of RAM available for each node is controlled by the property “yarn.nodemanager.resource.memory-mb” located in the /etc/hadoop/conf/yarn-site.xml file, located on the master instance node. It is set by default to 11520 (even with the maximumResourcesAllocation enabled). This allows for some RAM to be left free on each node for other tasks such as system procedures, etc.
(40 x 11520MB RAM) / 1024 (to convert to GiB) = 450GiB RAM*

* Note: this does not mean that it is OK to divide this 450GiB by 40 and to set each executor memory to this value. This is because this maximum is a hard maximum and each executor requires some memoryOverhead to account for things like VM overheads, interned strings, other native overheads, etc.[2]. Typically, the actual maximum memory set by –executor-memory is 80% of this value.

In this case, we can assign almost any combination that fits into these parameters. For example, for m3.xlarge instance types we could assign a safe maximum of 3 cores and 9GiB of RAM per executor on each node to give us a maximum of 40 executorsThen we could simply assign the executors divided equally across the number of jobs. Of course, as you either increase your node count or increase your instance type, the available of resources and maxExecutors per job would go up. As such, if you do decide to create a cluster with a larger instance type, you can adjust the values in this reply accordingly.

Using this as a simple formula, we could have many examples such as (using your previously provided settings as a basis):
1) 1 Spark-submit Job -> –executor-cores 3 –executor-memory 9g –conf spark.dynamicAllocation.maxExecutors=40 –conf spark.yarn.executor.memoryOverhead=2048

2) 2 Spark-submit Jobs -> –executor-cores 1 –executor-memory 4500m –conf spark.dynamicAllocation.maxExecutors=80 –conf spark.yarn.executor.memoryOverhead=1024
(–executor-cores are 1 here as 3/2 = 1.5 and decimals are rounded down)

3) 3 Spark-submit Jobs -> –executor-cores 1 –executor-memory 3000m –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=768
etc

Lets double the instance size and see what happens…

On a m3.2xlarge with double the resources per node we would have 40 x 8 vCPUs and 40 x 30GiB RAM. Again, leaving 1 vCPU free per node (8-1=7 vCPUs per node) and trusting that yarn limits the max used RAM to 23GiB (23552MiB) or so, our new values would look like:
1) 1 Spark-submit Job -> –executor-cores 7 –executor-memory 19g –conf spark.dynamicAllocation.maxExecutors=40 –conf spark.yarn.executor.memoryOverhead=4096

(19456MiB + 4096MiB) * 1 overall Job = 23552MiB => A perfect fit!
23552 * 0.8 = 18841.6MiB so we can probably round this up to 19GB of RAM or so without issue. The majority of the memory discrepency between 23-19 (4GiB) can be set in the memoryOverhead parameter)

2) 2 Spark-submit Jobs -> –executor-cores 3 –executor-memory 9g –conf spark.dynamicAllocation.maxExecutors=80 –conf spark.yarn.executor.memoryOverhead=2560

(9216 + 2560) *2 Jobs to run = 23552MiB => A perfect fit!
(–executor-cores are 3 here as 7/2 = 3.5 and decimals are rounded down)

3) 3 Spark-submit Jobs -> –executor-cores 2 –executor-memory 6g –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=1536
(–executor-cores are 2 here as 7/3 = 2.333 and decimals are rounded down.)
etc
(6144 + 1536) *3 = 23040MiB => Almost a perfect fit. 512MiB is left unused.

Please note that the memory settings are not linear or related to one another. They simply must fit into the maximum size allowed by yarn. In the above case for the m3.2xlarge, I used the default 23GiB as the maximum value allowed per node.

So for the example:
“3) 3 Spark-submit Jobs -> –executor-cores 2 –executor-memory 6g –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=1536”
We have 3 jobs with a maximum executor memory size of 6GiB. This is 18GiB that fits into the 23GiB. Of the 5GiB remaining, we allocate that as memoryOverhead but with an equal share per job. This means that we set this value to 1536MB RAM to use the remaining RAM, except for 512MiB.

As you can see above, the calculation is that the per job is:
(executor-memory + memoryOverhead) * number of concurrent jobs = value that must be <= Yarn threshold.
(6144MB + 1536MB)*3 = 23040MiB which is 512MiB less than the Yarn threshold value of 23552MiB (23GiB).

These are rough, basic settings that is only appropriate for jobs that work best with a lower number of larger executors. Some Spark jobs run better with a higher number of smaller executors. This depends completely on the type of job that you are running and as such it is unique to each Spark application run on the cluster. With these type of jobs, you would be better off again further subdividing the values above so that you can fit more executors into the amount of jobs required. Luckily, you can use the Spark dynamicAllocation to allow spark to calculate this for you and to allocate spare executors where available per job [3].

There is scope for more complex Spark tunings, which you can read more information about in links [4], [5], [6] and [7] below.

———-
Question : “Is it right to say the setting maximizeResourceAllocation should be used in all cases, or if not, what are the reasons not to use it?
———-
A:

Using the dynamicAllocation settings (that are by default set to true on EMR 4.4 and later) we can better determine the number of overall resources that we have and how many executors that we want to give to each job. These examples below use the 40 m3.xlarge nodes as their clusters total resources reference.

For example, leaving the cluster as-is using only the original job per cluster, we would naturally want to allocate all of our resources to this one job. If the job runs better using large executors, we would then allocate all of our resources to this job. We could do this by creating the cluster using the maximizeResourceAllocation property set to true which would automatically tune our clusters to use the most resources available [8]. Your existing settings of “–executor-memory 9g –executor-cores 3 –conf spark.yarn.executor.memoryOverhead=2048” would be sufficient for this task, though not actually necessary if the maximizeResourceallocation option is set. You can see the default settings that the maximizeResourceAllocation options sets in the /etc/spark/conf/spark-defaults.conf file on your master node.

———-
Example1: 1 Job, smaller number of larger executors:
———-
For example, using your previous spark-submit job option (tweaked) you could use something like this:
—–
spark-submit –master yarn –executor-cores 3
–conf maximizeResourceAllocation=true
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=40
–conf spark.dynamicAllocation.initialExecutors=40
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob1.jar
—–

Here, Spark dynamicAllocation will allocate the maximum available RAM for you for your executor. We will be running 40 individual executors that have 3 vCPUs each. You can read more about the Spark dynamicAllocation options in link [3] below.


However, say your job runs better with a smaller number of executors?

———-
Example2: 1 Job, greater number of smaller executors:
———-
In this case you would simply set the dynamicAllocation settings in a way similar to the following, but adjust your memory and vCPU options in a way that allows for more executors to be launched on each node. So for twice the executors on our 40 nodes, we need to half the memory and the vCPUs in order to make the 2 executors “fit” onto each of our nodes.

For example, for our spark job we could submit the job in a similar way to the following:
—–
spark-submit –master yarn –executor-cores 1 –executor-memory=4500m
–conf spark.yarn.executor.memoryOverhead=1024
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=80
–conf spark.dynamicAllocation.initialExecutors=20**
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob1.jar
—–

This would create a minimum of 2 executors per node for the 1 job of size 1vCPU an 4.5GiB of memory each. The property “spark.dynamicAllocation.maxExecutors=80” can be set as this allows the number of executors to be scaled up to the maximum resource allocation of the cluster. This allows for 80 * 4.5GiB = 360GiB (80% of the maximum 450GiB) and 80 * 1 vCPU which fits within the maximum vCPUs available for the cluster

** This value is arbitrary. The dynamicAllocation will scale this value as needs be during the course of your job.

Now say that we want to run 2 jobs of equal importance over the cluster with the same amount of resources going to both jobs.
———-
Example3: 2 Jobs, equal number of 1 larger executor per node per job:
———-

In this case, we could use the following settings (similar to the last one) but with the following tweaks:
—–
spark-submit –master yarn –executor-cores 1 –executor-memory=4500m
–conf spark.yarn.executor.memoryOverhead=1024
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=40
–conf spark.dynamicAllocation.initialExecutors=20
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob<1 or 2>.jar
—–

The main difference here is the setting of the value “–conf spark.dynamicAllocation.maxExecutors=40”. We need to set this value to 40 as our memory limit is essentially halved as we are running 2 separate jobs on each node. Hence, our total resources on our cluster effectively halves to 80vCPUs and 225GiB RAM per Spark-submit job.
So to fit in our 2 jobs into these newer limits, it gives us again 1vCPU per executor:
80 vCPUs / 40 nodes = 2 vCPUs per job. Since we need to leave 1 of the 4 cores free per node, that gives us 4-1=3 vCPUs per node divided by 2 jobs is 1.5. We round down then to 1 vCPU per job.
225 * 0.8 (to get 80% of settable memory) = 180GiB per job.
180 / 40 nodes = 4.5GiB per job per node.

By setting the value of spark.dynamicAllocation.maxExecutors to 40 for each job, it ensures that neither job encroaches into the total resources allocated for each job. By limiting the maxExecutors to 40 for each job, the maximum resources available per jobs is:
40 X 4500MiB RAM (divided by 1000 to convert to GiB) = 180GiB per job.
40 x 1 vCPU = 40vCPUs per job.
2 Jobs means that in total 360GiB RAM is used (80% of total RAM as expected) and 40vCPUs * 2 jobs is 80 vCPUs which again, fits in with the maximum vCPUs for the cluster.

The difference between Example 3 and Example 2 is that in Example 2, all of the resources are being used by 2 executors on each node for the 1 job. In Example 2, your job would have a maximum of 80 executors (40 nodes * 2 executors per node) running on the same 1 job. In Example 3, your 2 jobs would have 40 executors maximum (40 nodes * 1 executor per node) running on each job concurrently.

Some Notes:
———–
Note 1:
—–
There are a few points about the above settings that you may have noticed. First, the –executor-cores value was only set to 1. This is because since we are only using 3 out of the 4 vCPUs per node, and 3/2 = 1.5, decimal values are not allowed in this parameter. This means that a whole vCPU core is going unused. In Example 2 above, you could simply lower the –executor-memory value to 3GiB and increase the –conf spark.dynamicAllocation.maxExecutors value to 120 to use this core.

This goes back to my previous point in my last response where I said:
“Please note that with the reduced CPU power per node, it may be worthwhile either increasing your instance count or instance type in order to maintain the speeds that you want. Increasing the instance type though (say for example, to a m3.2xlarge type) also has the benefit of though of being able to use more of the nodes CPU for the executors. The OS only needs a small bit of CPU free, but the spark tunings only let us specify the CPU to use in terms of cores.

Hence, on a m3.xlarge instance, 1 core left free from the 4 cores is 25% of your CPU resources left free but on a m3.2xlarge, leaving 1 core free from the 8 cores is only 12.5% of your CPU resources left free, and so on.”

You may need to balance the instance types to the instance count in order to get a “best case” usage for your resource requirements.

Note 2:
—–
You could consider trying to amalgamate both of your jobs into the one .jar file that is being called. While this would likely require more coding and tweaking of your application, doing so would allow you to allow the Spark dynamicAllocation process to effectively “load balance” the number of executors across your jobs. Instead of running your 2 jobs concurrently in two separate spark-submit calls, using the 1 spark-submit call using settings similar to Example 1 above, would allow the resources from one job to be used on the other job if one job was to finish earlier than the other. Of course you have less control over which job gets more exclusive resources than the other, but it may mean that if using Example 3 above (as an example) resources left over from the end of Job 1 could be used to help speed up the remainder of Job 2, and vice-versa.

This may or may not be what you want to do, but it may be worth considering to try to make the most from your available resources.

Note 3:
—–
If running the jobs on the same cluster, please note that the job files will be grouped together. This can make debugging issues in your jobs more difficult as there will be less separation of the job logs. If you want a greater level of job log separation, then I would suggest that you perhaps consider running each job on a separate EMR cluster. This will also give you an extra cluster to monitor and will also give you separate CloudWatch metrics per job, instead of having the 1 set of metrics for both jobs. This will make monitoring your jobs a lot easier as the metrics per cluster will be related to only one of your jobs. If there is an issue with either job, it will be a lot easier to debug the issue as it would be a lot harder to see from the CloudWatch metrics which specific job was causing the issue.

==========

Once more, the above information is provided as a guideline only. It is likely that your final settings will need more tweaking before you find the “sweet-spot” for your application. Please feel free to get back to me incase of any additional questions or concerns, I would be happy to help you.

 

Need to learn more about aws big data (demystified)?


1 thought on “How to work with Maximize Resource Allocation and spark dynamic Allocation [ aws emr spark ]”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s