Skip to main content

Error handling in hadoop map reduce

Based on the documentation, there are a few ways, how the error handling is performed in map reduce. Below are the few:
a. Custom counters using enum - increment for every failed record.
b. Log error and analyze later.
Counters give the number of failed records. However to get the identifier of the failed record(may be its unique key), and details of the exception occurred, node on which the error occurred - we need to perform centralized log analysis and there are many nodes running. Logstash is on which is available.
Apart from these, are there any other ways to handle the error scenarios, without manual intervention. Any tools, references, best practices are welcome.
I think the same technique applies to any distributed applications, with minor changes.

Few questions to ask, when working with error handling:
  1. Should the job be stopped if an error occurred in data validation. Most of the Big data use cases might be ok to leave few bad records. But if your usecase wants all the records to be good enough, you should take that decision and move to the below steps.
    Some times its better to let the job run by skipping the bad records or and in parallel, get the issues(errors) using below techniques, rectify and modify as you move along.
  2. You want the errors to be occurred, but only limited times. Then how many times an exception can be thrown, before the entire job gets stopped is as below
    For Map tasks: mapreduce.map.maxattempts property
    For reducer tasks: mapreduce.reduce.maxattempts
    Default is 4
  3. Handling malformed data.
    So we decided to handle the malformed data. Then define the condition or which the record is bad. You can use counters, to quickly give you the number of bad records.
    In Mapper class,
    enum Temperature { OVER_10 }
    
    Inside map method,
    //parse the record
    if(value > 10) {
        System.err.println("Temperature over 100 degrees for input: " + value);
        context.setStatus("Detected possibly corrupt record: see logs.");
        context.getCounter(Temperature.OVER_10).increment(1);      
    }
    
    With the above method, all records get processed, and the counters get added based on the bad records. You can see the counter value, at the end of the job, after job statistics or through web UI or from shell command.
    $mapred job -counter <job_id> '${fully_qualified_class_name}' ${enum_name}
    $mapred job -counter job_1444655904448_17959 'com.YourMapper$Temperature' OVER_10
    
    Once you know the impact of the problem i.e number of bad records, we need to know "why is it bad". For this, we need to go to the logs and search for the error messages.
    Yarn provide log aggregation and combines all the logs for a job id and stores in hdfs. It can be get using
    yarn logs -applicationId <application ID>

Comments

Popular posts from this blog

LeaseExpired Exception

So finally i Could determine What is going on /2018/week1/abc /2018/week2/abc /2018/week3/abc /2018/week1/xyz /2018/week2/xyz /2018/week3/xyz Notice the same file names across different weeks. And then each of these files were being written to HDFS using the DFSClient. So essentially multiple mappers were trying to write the "same file"(observe filename abc, xyz) even though the files were actually different. As the client has to acquire a lease before writing the file and as the client writing the first of the "abc" file was not releasing the lease while during the writing process, the other client trying to write the other "abc" (lets say of the week2)  was  throwing the LeaseExpiredException with the Lease Mismatch Message. But this still does not explain why the client which first acquired the lease for the write did not succeed. I mean i would expect in such a case that the first writers of every such files to succeed.No idea This type ...

Handling csv with enclosed doubled quotes and separated by comma

Incase if you need to load the csv file in which fields are terminated by ',' or any thing and it also includes the quotes char in it.  Below is the format we need to follow with SERDE Create table tbl_netflix(show_id int,title string,director string,casting string,country string,date_added date,release_year int,rating string,duration string,listed_in string,description string,type string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES (    "separatorChar" = ",",    "quoteChar"     = "\"" )Stored as TextFile;