Dynamically Resize EMR

It is well known that AWS EMR is a great option for running Hadoop clusters. There are a lot of reasons to use EMR. It is easy to fire up clusters when you need them, and there is very little DevOps required. However, there is one big short-coming, EMR does not support auto scaling, like with EC2. This is a bit of barrier, especially when you have a lot of small jobs and one or two massive jobs. As would be the case, where a series of small hourly jobs that run throughout the day, and then at night there is a huge daily job, a pretty typical scenario right.

So how do people usually solve this. Well you could configure the cluster for the big job, which would be akin to throwing money away, because you would be paying for a lot of unused capacity 24×7. You could run two clusters, one for the hourly and one for the daily, where you would create and terminate the daily cluster, but this has a lot of moving parts. You could have a DevOps person manually resize the cluster before the daily job. Finally, you could automate the resizing.

Obviously, the fourth option is clearly the best, but the challenge there is that there are no good tools to do that out-of-the-box for us. Of course, one could use the AWS CLI and put together some complicated Bash script(s) or something like that, which would of course be a big pain to maintain. Wouldn't it be so much easier to have a tool that does the resizing work for you, and then wrap that in a simple Bash scripts? This is exactly what we at TokBox Inc decided to do.

The objective was to create a stand-alone jar that we could then be used in the flow of our jobs to dynamically increase and decrease the size of a running cluster. Since we needed to allow other business units to be able to perform the resizing, the jar needed to be completely self-contained and allow any approved person to use it. We used the AWS API to submit requests to the running cluster. In the tool, we needed a way to manage the instances, so we create a special instance group for each of the supported instance types. This way when a request comes in, we will know where to check for the current number of instances, where to add instance, or where to remove instances. An important consideration was that we needed to wait for the increase or decrease to finish before attempting to run the big job. This was so the Resource Manager would be able to allocate correctly. Of course, this meant we needed to introduce a time out, since we didn't want to get stuck and wait forever.

Technical Details

The details of the tool turned out to relatively easy once all the important information was extracted from the AWS documentation, which anyone who has had that experience can confirm is generally an “interesting” experience. The tool itself consists only of five classes. The source code can be found on Github at:

Emr Resizer

The ProcessArgs class performs the command line argument handling, and it is not all that complicated. It uses the jopt-simple library and is more or less self-explanatory. All it does is to read in the command line arguments, does some validation and returns a HashMap with the arguments.

The EmrResizer class is where all the action really happens. One of the key things is the use of the AmazonElasticMapReduceClient in the AWS API. For whatever reason Amazon does not provide a great deal of documentation or examples for this client, but that is okay, we are persistent and muscled through it. The first thing to do will be to create a connection to the cluster, done in the constructor. For the connection, you will need to authenticate yourself. You will have some choices on how, more on that in the next section. Assuming that you were able to authenticate and establish a connection with the cluster, the tool will then starts the main process() path. It will check to make sure the cluster you want to change is actually active, with the describeCluster() service call, then it transfers the command line parameters into the RequestInfo object, which is just a POJO to make carrying the request information around easier.

From here the tool will split off into two branches, one for the increase and one for the decrease, so far not exactly rocket science right. The two branches are pretty similar in what they do.

  1. Check for the instance group and get it's id, keep in mind each instance type will have its own instance group, done with the  istInstanceGroups() service call
  2. Get the current number of instances in that instance group with the listInstances() service call
  3. Change that number of instances in the group based on the request with the modifyInstanceGroups() service call
  4. Wait for the request to complete by sleeping the thread and periodically checking the number of instances

The modeling of the different instance types done by encoding the required information in an Enumeration, InstanceEnum. This was done for the sake of simplicity. It allows the definition to be included in a self-contained artifact and is pretty easy to add new instance types by adding a single line, but in truth different instances types was not a strong requirement. Those were well-defined and a small subset.

AWS Authentication

As mentioned above, it will be necessary to authenticate when establishing a connection with AWS. However, there are some considerations for deciding how best to do that. For example, if the tool will be run from a server inside of the cluster, then your best option might be to use IAM Roles. The default EMR role will already exist and can be used. The approach is currently supported with the 'iam' command line argument. If at all possible this would be the recommended approach.

However, if the tool will be run from an edge node that allows access to the cluster for a number of users, such as a place for them to run Hive queries, then you probably don't want to give the edge node such a powerful IAM Role, because it will be difficult to control who does what after that. Instead in this case, it might make more sense to create a barebones L-User account that has only the permissions needed. In addition, you might want to encrypt or otherwise protect both the access and secret key. This would a little bit more complicated and would depend upon the security policy of your organization.

The important point here is would be much easier to run the tool from inside the cluster. For example, during the processing of an Oozie workflow. In addition, the tool could be exposed in an adhoc way with a single node workflow accessed via Hue.

Running in Oozie

If you are a fan of Oozie and choose to run the tool in an Ooozie workflow as an action node, then an interesting situation can and will arise. Since Oozie will run jobs on any node in the cluster, it may very well run the decrease step on one of the instances that will need to be removed from the cluster. This will create a deadlock situation, because the decrease cannot complete until the node has been terminated, but the job step cannot complete until the decrease has finished. The solution here is to allow the tool to terminate with the timeout. The tool will return three different status values: 0) everything was okay, 1) there was some kind of error and 2) the operation was submitted but it timed out for some reason. We can then use the status of 2 on a decrease operation to allow the job step to gracefully time out and not abort the entire workflow. Below is an example Bash script to do that.

#!/usr/bin/env bash
if [ ! -f /tmp/${JAR} ]; then
    hadoop fs -get ${NAMENODE}/user/hadoop/lib/${JAR} /tmp/${JAR}
java -jar /tmp/${JAR} -cid=${CID} -optype=${OPERATION} -insttype=${TYPE} -instcnt=${COUNT} -timeout=${TIMEOUT}
if [[ "$CMDSIG" -eq 0 ]]; then
    echo "Cluster resizing completed sucessfully"
    exit 0
elif [[ "$CMDSIG" -eq 1 ]]; then
    echo "Cluster resizing had a processing error"
    exit 1
elif [[ "$CMDSIG" -eq 2 && "$OPERATION" == "decrease" ]]; then
    echo "Cluster resizing completed sucessfully on decrease"
    exit 0
    echo "Cluster resizing had an error"
    exit 1

One final note on running the tool in Oozie, depending upon your system defaults, it may be necessary to increase the memory given to the Shell Action node by setting the job properties, for example:

oozie.launcher.mapreduce.map.memory.mb – 4096
oozie.launcher.mapreduce.map.java.opts – -Xmx3686m

Future Enhancements

One thing that will be blatantly obvious to the astute reading is that this version of the tool only supports On-Demand instances, and this was intentional. The initial intention was to use spot instance, but then the logic got very complicated very quickly. For example what to do when the bidding goes sky high, or what about when the availability zone runs out of spot instances? These are solvable, but this will take time to sort out the details. Please feel free to fork the project and merge your good work back into it for a more complete solution.

Another good enhancement would be to use properties files for the AWS keys, or the instance type information, etc… Since this was not a major requirement for the initial needs, it does not mean its not a good idea. It was simply not included for the sake getting something done in the time allowed.