Oozie Coordinator Properties and Configuration Management

Oozie is great!  It will let you run a workflow with jobs from the various tools in the Hadoop suite (MapReduce, Hive, Pig, etc…).  It is a great way to link a complex sequence of steps into a cohesive and easy to manage workflow.  It really is an awesome tool.  The Coordinators are a feature of Oozie that let you schedule jobs based upon either time or the presence of a file.  However, the Oozie Coordinator Properties and Configuration Management has something to be desired. 

Oozie

One of the big shortcomings with the Oozie Coordinator Properties and Configuration Management is that with the default tools, it was necessary manage the configuration.properties and configuration.xml in different places, which is very inconvenient.  The configuration.properties file needs to be on local disk storage and configuration.xml needs to be on HDFS.  Upon reflection it would be so much easier to have both the properties and XML configuration in the same directory on HDFS. 

Fortunately, this is something that is very doable.  All that is really needed is to write a simple Java application that uses the Hadoop libraries to read the properties file and the Oozie client libraries to submit the Coordinator job.  The solution will use three classes:

  1. A main client class to read command line parameters, read the properties file and submit the job-runner
  2. A class to handle the Oozie details
  3. A class to handle the HDFS details

The complete source code is available on GitHub at https://github.com/bobfreitas/Oozie-Job-Runner

 

Main Client Class

The main client class is provided below.  It expects to receive three parameters:

  • URL of the Oozie server
  • Fully qualified path to the properties file
  • Type of operation (run or submit)

First, the class will retrieve the three parameters from the commandline arguments.  Next, with the Oozie server URL, it will create a CoordinatorOozieService object, which is the class that will be used to handle the Oozie details, more on this class later.  Next, it will create an HDFSAccessor object, which is the class that will handle the HDFS details, details to follow. 

An important thing to consider is that the final artifact will need to run as a Java application on the Hadoop cluster.  As you can see the class is just regular Java.  There is nothing MapReduce about it.  It is not commonly realized that you can still run any Java application in Hadoop.  A lot of people are under the misconception that everything needs to be a MapReduce job, but that is just not true.  This will be a simple Java application that happens to interact with Oozie and HDFS, but it is not a MapReduce job. 

import java.util.Properties;

public class CoordinatorClient {

    public static void main(String[] args) throws Exception {
        
        if (args.length < 3) {
            throw new Exception("Invalid parameters");
        }
        
        String url = args[0];
        CoordinatorOozieService coordinator = new CoordinatorOozieService(url);
        
        String propsFile = args[1];
        HDFSAccessor hdfsAccessor = new HDFSAccessor();
        Properties props = hdfsAccessor.readPropsFile(propsFile);
        
        String type = args[2];
        
        try {
            if (type.equals("submit")) {
                String jobId = coordinator.submitJob(props);
                System.out.println("Coordinator job submitted with id: " + jobId);
            }
            else if (type.equals("run")) {
                String jobId = coordinator.runJob(props);
                System.out.println("Coordinator job run with id: " + jobId);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

Oozie Related Class

The CoordinatorOozieService class will be used to abstract away the details of the interaction with Oozie.  This class is where the Oozie client library will be used.  The constructor receives the URL of the Oozie server and then creates a connection to Oozie, which is modeled with the OozieClient object.  The OozieClient object will then be used to do the work.  The methods, submitJob() and runJob(), both will get the set of job properties as a parameters, which will be the properties read out of the properties file being stored on HDFS, explained below.  As can be seen in the code, the Properties object is then being added directly to the Oozie Configuration object via the client API. 

import java.io.IOException;
import java.util.Properties;

import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.Job.Status;

public class CoordinatorOozieService {

    OozieClient oozieClient = null;  

    public CoordinatorOozieService(String url){
        oozieClient = new OozieClient(url);
    }
    
    public String submitJob(Properties workflowProperties) throws OozieClientException, IOException{
        Properties conf = oozieClient.createConfiguration(); 
        conf.putAll(workflowProperties); 
        return oozieClient.submit(conf); 
    }
    
    public String runJob(Properties workflowProperties) throws OozieClientException, IOException{ 
        Properties conf = oozieClient.createConfiguration(); 
        conf.putAll(workflowProperties); 
        return oozieClient.run(conf);
    }
}

 

HDFS Related Class

The HDFSAccessor class is used read the properties file from HDFS and put those properties into a Properties object which can then be used when submitting the Coordinator job.  It assumes the default location for the Hadoop configuration files, /etc/hadoop.  It will then use the fully qualified path to the properties file to open the file and convert the contents into a Properties object.  It really is that easy. 

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSAccessor {

    private Configuration conf;

    public HDFSAccessor() {
        conf = new Configuration();
        String path = "/etc/hadoop";
        Path hadoopConfig = new Path(path + "/conf/core-site.xml");
        conf.addResource(hadoopConfig);
        conf.addResource(new Path(path + "/conf/hdfs-site.xml"));
        conf.addResource(new Path(path + "/conf/mapred-site.xml"));
    }
    
    public FileSystem getFileSystem() throws IOException {
        String fsURI = conf.get("fs.default.name");
        return FileSystem.get(URI.create(fsURI), conf);
    }
    
    public Properties readPropsFile(String filePath) throws Exception{
        Path pt = new Path(filePath);
        FileSystem fs =  getFileSystem();
        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
        Properties props = new Properties();
        props.load(br);
        return props;
    }
}

As you can see for yourself the Java application is actually quite simple.  There is no complicated logic, and each of the classes are fairly concise. 

 

Building the Artifact

However, since the artifact will need to run in Hadoop, it will be necessary to ensure that the OozieClient library and dependences are included in the final distributable artifact.  There are a number of ways to do this, but the most convenient would be to use a Maven assembly.  The Maven pom file and assembly files are provided below.  Note: this does assume the use of Cloudera CDH 4.5 installation.  If a different configuration is being used then the appropriate adjustments will need to be made.  As can be seen the Hadoop libraries have the provided scope, since it will be possible to use the libraries on the cluster.  The Oozie client library may or may not be present, so this has the default scope and will be included in the assembly, which will take the class files and put them in the final jar file. 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>oozie</groupId>
    <artifactId>job-runner</artifactId>
    <version>1.0.0</version>
    <name>job-runner</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.0.0-cdh4.5.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>2.0.0-mr1-cdh4.5.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.0.0-cdh4.5.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.oozie</groupId>
            <artifactId>oozie-client</artifactId>
            <version>3.3.2-cdh4.5.0</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <finalName>${project.name}-${project.version}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

 

assembly.xml

<?xml version="1.0"?>
<assembly
    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    
    <id>bundle</id>
    <formats>
        <format>jar</format>
    </formats>

    <includeBaseDirectory>false</includeBaseDirectory>

    <fileSets>
        <fileSet>
            <outputDirectory>/</outputDirectory>
            <directory>target/classes</directory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <outputDirectory>/</outputDirectory>
            <unpack>true</unpack>
            <useProjectArtifact>true</useProjectArtifact>
            <includes>
                <include>org.apache.oozie:oozie-client</include>
                <include>com.googlecode.json-simple:json-simple</include>
            </includes>
        </dependencySet>
    </dependencySets>

</assembly>

 

Running the Application

The final piece of this solution is to make the application easy to submit Oozie Coordinators, and how that is done will depend upon you prefer to interact with the cluster.  The two mains methods being either with the command line or with Hue.  If the command line is the preferred method, then you could execute a command something like:

hadoop jar ./job-runner-1.0.0-bundle.jar oozie.CoordinatorClient http://cdh-dev:11000/oozie hdfs://cdh-dev:8020/user/admin/test/coord/mytest.properties run

 

If on the other hand, Hue is the preferred method, then you will want to create a shared workflow and use that to submit the coordinators.  It just needs to be a single Java action node that takes the fully qualified path to the properties file as a parameter.  A sample workflow.xml is provided below:

<workflow-app name="Submit_Coordinator" xmlns="uri:oozie:workflow:0.4">
    <start to="Submit_Job"/>
    <action name="Submit_Job">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>oozie.CoordinatorClient</main-class>
            <arg>http://cdh-nn01:11000/oozie</arg>
            <arg>${nameNode}${FQ_PROPS}</arg>
            <arg>run</arg>
            <file>/user/admin/libs/job-runner-1.0.0-bundle.jar#job-runner-1.0.0-bundle.jar</file>
        </java>
        <ok to="end"/>
        <error to="kill"/>
    </action>
    <kill name="kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>