Hadoop Error Handling

Hadoop Error Handling!  How can we make this work for us?  The problem is that when errors happen in the mappers and/or reducers it is really hard to get the meaningful error information back to client.  The biggest reason for this is because a job would have been submitted at one node in cluster and then the job gets processed on other remote data nodes.  Sure errors will be written into the log files of those data nodes, but that really does not allow for some nice neat error message to be displayed for someone to review.

The ability to present users with meaningful error messages is becoming more and more important.  As the user base expands beyond the group of highly technical Hadoop nerds and moves into the mainstream, with business analysts, market researchers, etc… using Hadoop.  These less technical users need a different standard of interaction with the cluster.  Generally, they will need easy to use graphical interfaces that provide understandable error messages, when things go wrong.  Thus effective Hadoop Error Handling will become a necessity.

Everyone who starts using Hadoop will quickly discover the built-in, web application as a source of valuable information.  Naturally, the first thought is "hey I will just get my error information from there".  Unfortunately, it's not that simple.  The APIs to get that information is not exposed.  Of course, one could try to use a screen scraping approach, which has drawbacks and would ultimately not work in every situation.

So how do we approach Hadoop Error Handling?  Well, in my Hadoop experience I have come up with three approaches that have served me well enough, although I am sure there must be more approaches.  I have a proof of concept project checked into GitHub that will provide you with a working example of how each of these three approaches work.

Source Code:

Hadoop Error Handling Project

The three approaches are:

  • Handling non-fatal errors that need to be tracked
  • Fatal errors that occurs that you want to define explicitly and occur only once
  • Fatal errors that occur during the processing and can use a simple message

The Hadoop Error Handling for non-fatal errors that need to be tracked can be done by using counters.  The basic idea is to create a counter with the Context.getCounter() and then increment it with each erorr.  Finally, on the client side after the job has completed to read the counter to get the total.  For a very simplified example, see below.

In the mapper:

if (some_error_confidtion){
      context.getCounter(COUNTER_GROUP, COUNTER).increment(1);
}

In the client:

boolean okay = job.waitForCompletion(true);
if (okay){
      Counters counters = job.getCounters();
      Counter bwc = counters.findCounter(COUNTER_GROUP, COUNTER);
      System.out.println("Errors" + bwc.getDisplayName()+":" + bwc.getValue());
}

For the Hadoop Error Handling of fatal errors that you want to explicitly define and happen once, I have found that using an Enum is an excellent way to go.  I can then pack the Enum with all kinds of additional information, such as an error code, a specific message, an I18N resource bundle key, and most importantly a lookup() function.  Actually, you can any kind of information that you need.  So then once you have the Enum, you can then use that with the Hadoop counters, but not in a way that was originally intended.  You can use the counter as the error code reference to the specific error.  It requires that you use a PLACEHOLDER Enum value and then the set of specific error message Enum values.

When you encounter the error condition, you would create the counter and use the Enum parameter with the error code value as the value of the counter:

catch (NullPointerException npe){
      npe.printStackTrace();
      context.getCounter(ExceptionTypeEnum.PLACEHOLDER).setValue(
            ExceptionTypeEnum.NULL_POINTER.getErrorCode());
      needToStop++;
}

Then on the client side, you would need to retrieve the counter and then convert the PLACEHOLDER into the actual error message for presentation:

Counters counters = job.getCounters();
Counter bwc = counters.findCounter(ExceptionTypeEnum.PLACEHOLDER);
ExceptionTypeEnum err = ExceptionTypeEnum.get(bwc.getValue());
System.out.println();
System.out.println("Terminal error, " + err.getName());
System.out.println("With message: " + err.getMessage());

The only real drawback of this approach is that you need to ensure that you do not terminate the mapper when the error occurs.  It needs to be allowed to complete at least one iteration, because otherwise the counter will be zeroed out.

The final Hadoop Error Handling approach is to reach into the data returned by the TaskTracker and then processing it.  The challenge here is that the data is not readily available and there will be some duplication of the error messages.  The first thing to know is that you will need to use the MR API to get the report.  The first thing you need to do is to take you JobID and downgrade it to the old API object.  Then you need to create a JobClient() instance.  With the JobClient, you can then get a RunningJob instance which gives you status information for the entire job.  Next, you will want to get the specific error messages from the TaskTrackers.  You can use the JobClient.getMapTaskReports() and getReduceTaskReports() methods to get all the error messages from all of the failed tasks.  This is where you will need to de-duplicate the error messages to ensure there is only one.

boolean okay = job.waitForCompletion(true);
JobID jobId = job.getJobID();
org.apache.hadoop.mapred.JobID oldJobId = org.apache.hadoop.mapred.JobID.downgrade(jobId);
RunningJob runningJob = jobClient.getJob(oldJobId);
JobStatus jobStatus = runningJob.getJobStatus();
TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
Map<String,String> errMap = new HashMap<String,String>();
for (TaskReport report : mapReports){
       TIPStatus status = report.getCurrentStatus();
       if (status.compareTo(TIPStatus.COMPLETE) == 0){
              // not interested in tasks that completed
              continue;
       }
       for (String err : report.getDiagnostics()){
              // only need first line, can get the rest from logs
              // and don't want to get duplicates
              String[] errArr = err.split("\n");
              errMap.put(errArr[0],errArr[0]);
       }
}

 

2 Responses to Hadoop Error Handling

  1. Thanks for sharing superb informations. Your site is very cool. I’m impressed by the details that you have on this web site. It reveals how nicely you understand this subject. Bookmarked this web page, will come back for extra articles. You, my friend, ROCK! I found just the information I already searched all over the place and simply couldn’t come across. What an ideal web site.

  2. Bob Freitas says:

    Thanks!  Glad I was able to help!

Comments are closed.