Skip to main content

Custom SerDe

Steps invoved

We start by implementing the SerDe interface and setting up the internal state variables needed by other methods.



public class ColumnarMapSerDe implements SerDe {

  private List<String> columnNames;
  private ObjectInspector objectInspector;
  private Map<String, String> rowMap;



The initialize method is called when a table is created. It is responsible for verifying that the table definition is compatible with the underlying serialization and deserialization mechanism. In this case, we only support strings.
@Override
  public void initialize(
    Configuration conf,
    Properties tableProperties)
  throws SerDeException {

    final List<TypeInfo> columnTypes =
      TypeInfoUtils.getTypeInfosFromTypeString(
      tableProperties.getProperty(LIST_COLUMN_TYPES));

    // verify types
    for (TypeInfo type : columnTypes) {
      if (!type.getCategory().equals(PRIMITIVE) || 
The serialize method is called whenever a row should be written to HDFS. It is responsible for using the object inspector to extract the columnar data from the row and converting that data to a Hadoop Writable object:
 @Override
  public Writable serialize(
    Object obj,
    ObjectInspector objectInspector)
  throws SerDeException {

    StringBuilder builder = new StringBuilder();

    StructObjectInspector structOI =
      (StructObjectInspector) objectInspector;

    List<? extends StructField> structFields =
      structOI.getAllStructFieldRefs();
Similar to the serialize method, the deserialize method is responsible for converting a Hadoop Writable object back into columnar form. In this case, the input is a Text object containing our serialized map, which we break into keys and values and return as a List of Strings.
@Override
  public Object deserialize(Writable writable)
  throws SerDeException {

    // reset internal state
    rowMap.clear();
    rowFields.clear();

    Text text = (Text) writable;
    String content = text.toString();

The next two methods are used by Hive to describe the types used by this SerDe. Object inspectors describe how Hive should interpret the tabular data and the serialized class describes how Hadoop should serialize the data to HDFS.

@Override
  public ObjectInspector getObjectInspector()
  throws SerDeException {
    return objectInspector;
  }

  @Override
  public Class<? extends Writable> getSerializedClass() {
    return Text.class;
  }
Finally, implementations can optionally record and report statistics about the data they are serializing and deserializing:
@Override
  public SerDeStats getSerDeStats() {
    stats.setRawDataSize(deserializedByteCount);
    return stats;
  }
}
This class depends on the Hadoop and Hive libraries, which can be found in $HADOOP_HOME/ and $HIVE_HOME/lib/. Compile this class and package it into a standard JAR file. You can use your preferred Java development environment or simply use the following commands:
$ javac \
  -cp ${HADOOP_HOME}/'*':${HIVE_HOME}/lib/'*' \
  *.java

$ mkdir -p com/leeriggins/hive/

$ mv *.class com/leeriggins/hive/

$ jar cf udf.jar com
We can now use the SerDe to define a new table. The first step is dynamically adding our new jar file to the Hive classpath:

Now, we can create our tables and copy the data from the input table to the output table:

Theory behind it 
The central part of this example is our implementation of the ColumnarMapSerDe class, which implements the SerDe interface. Our class describes how to transform a Hadoop record into the columns of a Hive table.

Object inspectors

The two main types involved with serialization and deserialization are Writable and ObjectInspectorWritable is a Hadoop type used to represent keys and values in Map/Reduce jobs. An ObjectInspector is a Hive type containing the necessary logic for converting between the various Hive representations of data and the more standard Java and Hadoop types.
There are multiple ObjectInspector implementations for each of the primitive and complex Hive types. These can be lazy or not and backed by Hadoop Writable objects or standard Java classes. Generally, using the lazy versions or the versions backed by Writable object can be more efficient; however, using these object inspectors efficiently is more complicated than using the standard Java object inspectors.
The complex Hive types all contain nested object inspectors describing the types of their contents. Primitive Hive types are all represented by subtypes of PrimitiveObjectInspector.
Object inspectors should never be created directly; instead, Hive provides the ObjectInspectorFactory and PrimitiveObjectInspectorFactory classes that may be used to create instances.

Initialization

First, we need to initialize our SerDe. Hive strongly encourages reusing objects to reduce the need for garbage collection. Therefore, we will precompute as much as possible on initialization and store this information in instance variables.
In the initialize method, Hive will pass us the Hadoop configuration and table properties so we can configure our internal state for the specific table. We can get the names and types of each of the columns from the table properties. Since we are modeling a map of strings to strings, we will throw an exception if any of the columns are not strings. Any other SerDe properties, such as the input_regex used by the RegexSerDe, are also available from this table properties object.
For this SerDe, we want to store the column names so we can later extract the appropriate values from each row. We also want to create the ObjectInspector that describes this table. We will return a single StructObjectInspector whose fields correspond to the columns of our table. Each field should be a string, so we will use StringObjectInspectors.
Finally, we will initialize the instance variables that we will use during serialization and deserialization. Hive asks for the Writable class and ObjectInspector used by our SerDe by calling getSerializedClass and getObjectInspector.

Serialization

When serializing a row, we are given an object containing that row's data plus the ObjectInspector necessary to read the data from the object. Since this object represents an entire row, the object inspector will be a StructObjectInspector whose fields correspond to the columns of the table. The StructObjectInspector also contains a nested ObjectInspector for each field; in this case, each will be a StringObjectInspector.
So, our serialize method needs to use each of these nested object inspectors to read each field, combine the data with the name of the column that we read at initialization time, and build a map string. In this case, we have hard-coded the \001 and \002 delimiters; however, these could also have been determined at initialization time from table properties.

Deserialization

The deserialize method reverses the serialization process. In this case, we want to convert the Writable object passed to us into a row. Deserialization is somewhat simpler as we can just return a List of Strings. Hive will use the ObjectInspector we return from getObjectInspector to convert this value into whatever internal representation it may decide to use.
Because we returned the Text class from the getSerializedClass method, we know that we can convert it into text, split it by our delimiters, and extract the values corresponding to our column names.
The deserialize method has one additional side effect, which is incrementing the number of bytes that we read during deserialization. These will be encapsulated in a SerDeStats and returned to the framework in the getSerDeStates method. Tracking this information is optional; a SerDe may simply always return zero for the amount of deserialized data.

Comments

Popular posts from this blog

Error handling in hadoop map reduce

Based on the documentation, there are a few ways, how the error handling is performed in map reduce. Below are the few: a. Custom counters using enum - increment for every failed record. b. Log error and analyze later. Counters give the number of failed records. However to get the identifier of the failed record(may be its unique key), and details of the exception occurred, node on which the error occurred - we need to perform centralized log analysis and there are many nodes running. Logstash is on which is available. Apart from these, are there any other ways to handle the error scenarios, without manual intervention. Any tools, references, best practices are welcome. I think the same technique applies to any distributed applications, with minor changes. Few questions to ask, when working with error handling: Should the job be stopped if an error occurred in data validation. Most of the Big data use cases might be ok to leave few bad records. But if your usecase wa...

LeaseExpired Exception

So finally i Could determine What is going on /2018/week1/abc /2018/week2/abc /2018/week3/abc /2018/week1/xyz /2018/week2/xyz /2018/week3/xyz Notice the same file names across different weeks. And then each of these files were being written to HDFS using the DFSClient. So essentially multiple mappers were trying to write the "same file"(observe filename abc, xyz) even though the files were actually different. As the client has to acquire a lease before writing the file and as the client writing the first of the "abc" file was not releasing the lease while during the writing process, the other client trying to write the other "abc" (lets say of the week2)  was  throwing the LeaseExpiredException with the Lease Mismatch Message. But this still does not explain why the client which first acquired the lease for the write did not succeed. I mean i would expect in such a case that the first writers of every such files to succeed.No idea This type ...

Handling csv with enclosed doubled quotes and separated by comma

Incase if you need to load the csv file in which fields are terminated by ',' or any thing and it also includes the quotes char in it.  Below is the format we need to follow with SERDE Create table tbl_netflix(show_id int,title string,director string,casting string,country string,date_added date,release_year int,rating string,duration string,listed_in string,description string,type string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES (    "separatorChar" = ",",    "quoteChar"     = "\"" )Stored as TextFile;