Anatomy of a Command Builder with Example – Cloudera Kite Morphlines

Cloudera Kite Morphlines

In the last post we have seen the internals Cloudera Kite Morphlines
of a configuration file also known as
morphline. In this post we are going to
explore the actual code that does all the
job in the background. It doesn’t make a
difference whether you are using the in
built command (bundled with Cloudera Kite Morphlines SDK) or writing your own custom command , basic structure and semantics of all the commands are same.

All the commands in the Cloudera Kite Morphlines implements

org.kitesdk.morphline.api.CommandBuilder

interface. This interface contains 2 methods for which you have to provide the implementation in your CommandBuilder implementation.

These methods are : –

Collection getNames();
Command build(Config config, Command parent, Command child, MorphlineContext context);

Let’s look into the detail of these 2 methods :

  1. getNames() : – This is the place where you give name to your Command Builder so that it can be used in the configuration file. Command name and command builders are tightly coupled to each other. During the initialization of the first command i.e (Pipe) see previous post , it loads all the command builders through the importCommands statements available in the configuration file. If you recollect from our previous post configuration file we have given two packages there.
    importCommands : ["org.kitesdk.**","com.techidiocy.custom.commands.**"]

    , so while context initialization it will scan all the packages and finds all the command builders. During scanning itself it will register the command builder with its defined command name via invocation of getNames().You can also see the complete code in the

     importCommandBuilders() method of org.kitesdk.morphline.api.MorphlineContext class

    Below is the snippet from this method that is registering each of these commands.

    for (String builderName : builder.getNames()) {
                LOG.debug("Importing command: {} from class: {}", builderName, builderClass.getName());
                if (builderName.contains(".")) {
                  LOG.warn("Command name should not contain a period character: " + builderName);
                }
                commandBuilders.put(builderName, builderClass);
              }

    Note : A command builder can be registered with the multiple names provided in the implementation of getNames().

  2. build(…): Actual syntax for this method can be seen above.Almost all of the times , this method will have a single line of implementation code in it , and sole responsibility for this method is to return the Command which in turn override the doProcess() method which is the most important actor in the complete process.If you recollect from the previous post where we have seen the how the doProcess() runs the whole show and controls the chain of invocation.

Enough of theory let’s see couple of command builders to get the actual feel of them. First we will see the

org.kitesdk.morphline.stdio.ReadLineBuilder

which comes in the Cloudera Kite Morphlines SDK bundle. Below is the snippet of the class.

public final class ReadLineBuilder implements CommandBuilder {

  @Override
  public Collection getNames() {
    return Collections.singletonList("readLine");
  }

  @Override
  public Command build(Config config, Command parent, Command child, MorphlineContext context) {
    return new ReadLine(this, config, parent, child, context);
  }
// Nested class goes here
}

ReadLine is a nested class that overrides the doProcess() method where as all of us know actual business logic resides.
You can see the complete implementation here.

Now lets see a custom command builder which I have written to generate an Avro Schema File on the fly on the basis of input java object. Below is the complete code for this command builder.

public class GenerateAvroSchemaFileBuilder implements CommandBuilder{

	public Command build(Config arg0, Command arg1, Command arg2,MorphlineContext arg3) {
		return new BuildAvroSchemaFile(this,arg0,arg1,arg2,arg3);
	}

	public Collection getNames() {
		return Collections.singletonList("generateAvroSchemaFile");
	}
	
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//Nested classes: Generate an avro schema file on the basis of input passed (java object) by the previous command //////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
	
	private static class BuildAvroSchemaFile extends AbstractCommand {
		
	private final String avroSchemaFileLocation;

	protected BuildAvroSchemaFile(CommandBuilder builder, Config config,Command parent, Command child, MorphlineContext context) {
		super(builder, config, parent, child, context);		
		this.avroSchemaFileLocation = getConfigs().getString(config, "avroSchemaFileLocation", null);
	}
	
	@Override
	public boolean doProcess(Record record){
		FileData fileData = (FileData) record.get(Fields.ATTACHMENT_BODY).get(0);
Map<String , List<FieldSpecification>> properties = fileData.getRecords().get(0).getProperties();
		String fileLocation = generateSchemaFile(avroSchemaFileLocation,properties);
		record.removeAll(Fields.MESSAGE);
		record.put(Fields.MESSAGE, fileLocation);
		// pass the record to the next command in the chain
		if(!getChild().process(record)){
			System.out.println("Failed during execution of LoadHiveTable command");
			return false;
		}					
		return true;
	}

	private String generateSchemaFile(String avroSchemaFileLocation2,Map<String, List> properties) {
		String fileLocation = avroSchemaFileLocation2+"schema.avsc";
		File schemaFile = new File(fileLocation);
		FileWriter schemaFileWriter = null;
		int counter = 1;
		try {			
			 schemaFileWriter = new FileWriter(schemaFile);
			 Set<Entry<String, List>> entrySet= properties.entrySet();
			 boolean flag= entrySet.size()>0;
			 if(flag){
				 schemaFileWriter.write("{\n");
				 schemaFileWriter.write("\"type\":\"record\",\n");
				 schemaFileWriter.write("\"name\":\"DynamicSchema\",\n");
				 schemaFileWriter.write("\"namespace\":\"com.techidiocy.examples.data\",\n");
				 schemaFileWriter.write("\"doc\":\"A dynamically generated schema.\",\n");
				 schemaFileWriter.write("\"fields\":[\n");
			 }
   		     for(Entry<String, List<FieldSpecification>> entry : entrySet){
   		    	schemaFileWriter.write("{\n");
   		    	schemaFileWriter.write("\"name\":\""+entry.getValue().get(0).getFieldName().toLowerCase()+"\",\n");
   		    	   		    	schemaFileWriter.write("\"type\":\"string\"\n");
   		    	schemaFileWriter.write("}");
   		    	if(counter !=entrySet.size())
   		    		schemaFileWriter.write(",\n");
   		    	counter++;
   		  }
   		     if(flag){
   		    	schemaFileWriter.write("]\n");
   		    	schemaFileWriter.write("}\n");
   		     }
		} catch (IOException e) {
			e.printStackTrace();
			throw new MorphlineRuntimeException("Failed while writing to the avro schema file");
		}finally{
			try {
				schemaFileWriter.flush();
				schemaFileWriter.close();
			} catch (IOException e) {
				e.printStackTrace();
				throw new MorphlineRuntimeException("Failed while closing the writer for avro schema file");
			}
		}
		return fileLocation;
	}
   }

}

Here , is the corresponding command invocation from the configuration file.

     {
       generateAvroSchemaFile
        {
           avroSchemaFileLocation : "/home/cloudera/workspace/avroschema/"
        }
     }

There is nothing special about the above command builder , it expects output location for the avro schema file as an input argument from the configuration file. After that it reads a Java map which has been passed on by the previous command , writes all of its keys in the schema file with the data type as “string” (can be customized as per requirements).

Idea behind writing this command builder to avoid the writing of avro schema file manually again and again for different structured objects , as this avro schema file will be used in the next command when I load this data into the hive table in Parquet Format.

This is one of simple use case of command builder , we can utilize the Cloudera Kite Morphlines SDK to break any complex transformation in a series of small commands. Other advantage of these custom command builders as I mentioned in my previous post is re-usability. Once I have this command builder (GenerateAvroSchemaFileBuilder), I can use it in any number of configuration files.

In the last three posts (including this one) , I tried to summarize whatever i have learnt till now from Kite SDK. Currently I am exploring the other modules of Cloudera Kite Morphlines and will keep posting my experiences and findings here.

Long way 2 go :) .

Suggestions ,corrections,questions are most welcomed.

Disclaimer : All the logos and images used above belong to their respective owners.

Let'sConnect

Saurabh Jain

A Developer working on Enterprise applications ,Distributed Systems, Hadoop and BigData.This blog is about my experience working mostly on Java technologies ,NoSQL ,git , maven and Hadoop ecosystem.
Let'sConnect

Share and Enjoy

  • Facebook
  • Twitter
  • Delicious
  • LinkedIn
  • StumbleUpon
  • Add to favorites
  • Email
  • RSS
Add Comment Register



Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>