Using Oozie to Process Daily Logs

At Edmunds we are working to move our existing data warehouse system to a new system based on Hadoop and Netezza. At first, our data warehouse team focused on delivering ad impression data from DoubleClick DART as the first production deliverable. Last week, we achieved a major milestone: DART data is now being delivered on a nightly basis through our new Hadoop/Netezza infrastructure. Once in Hadoop the data is scrubbed and dumped into files that mirror our Netezza table structure. In order to handle the DART processing, we wrote a fair amount of code to deal with daily log rotation and things of that nature.

Toward the end of the project we started using Oozie to coordinate our processing workflows. We chose to use Oozie because we wanted a system that would allow us to schedule jobs, coordinate workflows and allow us to have better visibility about what is running when.

Recently I took on the task of processing some of the logs we generate internally. These internal logs are rotated out daily. Given the functionality provided by Oozie, we thought it would be great to remove our code that handles log rotation, file names, and date calculations and use Oozie to do the work. As powerful as Oozie is for handling date based processing, getting it to work was another story.

This post describes the configuration I used to get our jobs running using Oozie. I will also go into some of the mistakes I made so that others can save time and effort using Oozie.

Background

My goal was to process files that are delivered to Hadoop every night. The files are delivered to HDFS in a set of directories organized by date and server name. All the files all have the same name. The directory format is shown below:

/log_archive/prod/2011/06/06/prodserver/app_server_events.log
/log_archive/prod/2011/06/06/prodserver2/app_server_events.log

I have a simple mapper (no reducer) that take the input files and convert them to a format that our data loaders can handle. Based on the directory format I set the mapper configuration’s mapred.input.dir to:

/log_archive/prod/2011/06/06/*/app_server_events.log

I wanted to maintain the date based directory structure for the output files, but since I was dealing with files from all the servers I set the mapred.output.dir to:

/log_archive/prod/2011/06/06/app_server_events_processed.log

To complete the project, after all processing is done, I need to deliver the processed file to an NFS mounted filesystem.

Using Hard coded values

My first stab at getting the above process working was to just hard code everything. I figured that once I had it working I could back into a more dynamic system with calculated file names based on run dates. In addition, to keep things simple, I didn’t worry about how I was going to get the data out of Hadoop and onto the mount where my user needed it I just left the processed files in HDFS.

In order to allow Oozie to run my process I had to get the mapper jar and its dependencies into Hadoop. To do this I create a directory in the Oozie user’s home directory.  The directory structure is shown in listing 1 below.

+-/user/oozie/server-events-workflow/
  +-coordinator.xml
  +-workflow.xml
  +-lib/
    +-*.jar

Listing 1 – Workflow job directory structure

Since I don’t want to run things from the Oozie user’s home directory, prior to moving to production I will move the directory structure to something more generic like /apps/workflow/WORKFLOW-NAME. For now, the structure in listing 1 above structure works. I could move the coordinator outside of the workflow, but for this example I do not have to coordinate multiple workflows. So, I chose to place the coordinator and workflow files together. To package up my code and dependencies I relied on Maven and the Maven Assembly plugin’s fileSet and dependencySet directives to assemble Oozie artifacts into a directory and zip file for easy deployment.

The coordinator.xml and workflow.xml files have almost everything hardcoded. I knew that in the future I will want to have more dynamic, date based file names so, I choose to move variable definitions for file and directory names into the coordinator.xml file. In addition, I moved some standard location definitions such as the workflow path, name node, start date, and end date to a separate property file that is passed into the Oozie job at run time. Putting the configuration into an external text file allowed me to move from my local environment to our integration and production servers easily.

Listing 2 is a sample of the command line script that is used to execute the workflow. Note the use of the -config flag to pass in the properties file to configure default Oozie actions.

#!/bin/bash

#########################################
# Run for Dart Workflow Job once        #
#########################################

abspath=`dirname "$(cd "${0%/*}" 2>/dev/null; echo "$PWD"/"${0##*/}")"`
echo $abspath

whoami=`whoami`

if [ "x${whoami}" != "xoozie" ] ; then
	echo "ERROR: This script should only be run by the oozie user"
	exit 127
fi

oozie job -oozie http://localhost:11000/oozie -config $abspath/coordinator-manual.properties -run

Listing 2 – Shell script to run an Oozie job

The properties file defines key common items that pass variables to the coordinator.xml. To help with testing I created two properties files. One file sets the start and end dates so that I can run the Oozie job manually and the other allows me to run the job on a daily basis.

oozie.coord.application.path=hdfs://namenode:8020/user/oozie/apps/server-events-workflow
workflowApplicationPath=hdfs://namenode:8020/user/oozie/apps/server-events-workflow
jobTracker=jobtracker:8021
nameNode=hdfs://namenode:8020
maxErrors=1000
startDate=2011-06-01T01:00Z
endDate=2011-06-01T01:00Z

Listing 3 – Sample of the manual file properties file

The coordinator.xml will be passed the start and end dates as well as name node and other configuration items. The coordinator sets the input and output parameters that will be passed into the workflow.xml definition.

<coordinator-app name="server-event-processor" frequency="${coord:days(1)}" start="${startDate}" end="${endDate}" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowApplicationPath}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>default</value>
                </property>
                <property>
                    <name>maxErrors</name>
                    <value>${maxErrors}</value>
                </property>
                <property>
                    <name>inputs</name>
                    <value>${nameNode}/user/oozie/log_archive/prod/2011/06/06/*/server_events.log</value>
                </property>
                <property>
                    <name>outputs</name>
                    <value>${nameNode}/user/oozie/log_archive/prod/2011/06/06/server_events_processed.log</value>
                </property>
                <property>
                    <name>oozie.wf.workflow.notification.url</name>
                    <value>http://eventpublisher:8080/hadoop-event-publisher-webapp/service/oozieNotification/updateWorkflow?jobId=$jobId&amp;status=$status</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Listing 4 – Hard coded coordinator.xml

Since, the process is a simple file conversion and does not have the local file move functionality, yet, the workflow file in listing 4 has a single action defined. The action only specifies a mapper, does not specify a reducer, and sets the number of reducers to zero. The mapred queue name is set to the queueName property set by the coordinator.xml file. The mapred.input.dir and mapred.output.dir properties are set to the input and output properties defined in the coordinator.xml file. Note that I use the prepare directive to delete any output from previous runs and to make debugging and testing easier.

<workflow-app xmlns="uri:oozie:workflow:0.1" name="server-events-informatica-wf">
    <start to="ServerInformaticaProcessor"/>
    <action name="ServerInformaticaProcessor">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputs}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>com.edmunds.dwh.processor.serverevent.ServerEventMapJob$ServerEventMapClass</value>
                </property>
                <property>
                    <name>mapred.speculative.execution</name>
                    <value>false</value>
                </property>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.input.format.class</name>
                    <value>org.apache.hadoop.mapred.TextInputFormat</value>
                </property>
                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.hadoop.mapred.TextOutputFormat</value>
                </property>
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>0</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputs}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputs}</value>
                </property>
                <property>
                    <name>informaticaSupport</name>
                    <value>true</value>
                </property>
                <property>
                    <name>validate</name>
                    <value>false</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail-server-informatica"/>
    </action>
    <kill name="fail-server-informatica">
        <message>Informatica Processor failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Listing 5 – workflow.xml file with properties supplied by coordinator.xml and cleanup prior to execution of workflow

At this point I had a coordinator and workflow definition that would execute my mapper with a single hard coded file name. I ran into a problems when I first tried to run everything. My coordinator ran and exited with a status of SUCCEEDED. Yet, my workflow was never executed. For the longest time, I could not figure out why. While I was looking at my coordinator properties file, I noticed the start and end date values highlighted in listing 6 below.

oozie.coord.application.path=hdfs://namenode:8020/user/oozie/apps/server-events-workflow
workflowApplicationPath=hdfs://namenode:8020/user/oozie/apps/server-events-workflow
jobTracker=jobtracker:8021
nameNode=hdfs://namenode:8020
maxErrors=1000
startDate=2011-06-01T01:00Z
endDate=2011-06-01T00:00Z

Listing 6 – Properties file

Since I was trying to run the workflow manually, I had just copied a start and end date from some other file. My blind copying created a problem. Oozie diligently noticed that the end date was before the start date. So, it exited. Once I updated the end date to be after the start date, everything worked fine.

I now had two problems remaining: 1) I needed to get the output from HDFS to a specific mount point and 2) I needed to run this job daily and update the path for inputs and outputs accordingly. As the file move seemed the easier of the two problems, I decided to start with getting data out of HDFS onto a local file system.

Moving files from HDFS to local file system

I spent a bunch of time pouring through the Oozie documentation to see if there was a built in function to move mapred output from HDFS to a local file system. While Oozie does have have an Fs action it only works with HDFS. The HDFS only restriction meant I had to figure out my own way of doing the file copy.

Oozie has support for Java actions. Java actions are run by a single mapper that executes it main() method. The Java action support passing arguments and setting java opts. The java action gave me a hook by which I could create a simple Java class to do the move for me. The next thing I needed to figure out was how to take the multiple file outputs from my mapper and copy them to a single file on a non-HDFS filesystem.

I knew there was a method, moveToLocalFile(), on the Hadoop FileSystem class. However, the method only appears to work for a single file. I searched through the Hadoop JavaDoc and found a FileUtil class that has a static method called copyMerge(). The method, copyMerge(), takes a source filesystem, source path, destination file system, destination path, and a flag to delete the source path on success. The copyMerge() method sounded like something that would work for me. So, I created a simple class that is constructed with a source and destination file system and contains a single method: move(). The move() method takes a source and destination path. If the source path is determined to be a directory it calls copyMerge() with the delete flag set to true. If the source path is a simple file the move() method calls the moveToLocalFile() method on the source file system object.

The Oozie java action expects a main() method. So, I created a simple main() method that creates a configuration and two file systems. It then calls move on the paths it created from the passed in file names. The code I created is shown in listing 7 below. Granted, it could use some nicer error handling and logging, but for now it a good start.

package com.edmunds.dwh.processor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/**
 * This class moves a file or directory from HDFS to a local file or directory.
 * Copyright (C) 2011 Edmunds.com
 * <p/>
 * Date: 6/9/11:10:27 AM
 *
 * @author phannon
 */
public final class HdfsToLocalFileMove {
    private final FileSystem sourceFileSystem;
    private final FileSystem destFileSystem;
    private final Configuration conf;

    public HdfsToLocalFileMove(Configuration conf, FileSystem sourceFileSystem, FileSystem destFileSystem) throws IOException {
        this.sourceFileSystem = sourceFileSystem;
        this.destFileSystem = destFileSystem;
        this.conf = conf;
    }

    public void move(Path source, Path destination) throws IOException {
        if (destFileSystem.exists(destination)) {
            destFileSystem.delete(destination, true);
        }
        if (sourceFileSystem.getFileStatus(source).isDir()) {
            FileUtil.copyMerge(sourceFileSystem, source, destFileSystem, destination, true, conf, null);
        } else {
            sourceFileSystem.moveToLocalFile(source, destination);
        }
    }

    public static void main(String[] args) throws IOException {
        Path source = new Path(args[0]);
        Path dest = new Path(args[1]);
        Configuration conf = new Configuration();
        FileSystem fsSource = source.getFileSystem(conf);
        FileSystem fsDest = FileSystem.getLocal(conf);
        HdfsToLocalFileMove move = new HdfsToLocalFileMove(conf, fsSource, fsDest);
        move.move(source, dest);
    }
}

Listing 7 – HDFS File Move Class

Now that I had a java class that could be called by the Oozie java action I had to wire the action into the workflow (I’ll skip covering my build and recopy into HDFS for the libraries and configuration files).

I wanted to make sure that when I move away from hard coded paths my workflow.xml file won’t need to change. Also, I want to ensure that whenever I run the job I can pass in the local directory to store the output to. To make sure I can have the flexibility I need I added the directory name to my properties file. Also, I’ll build the output path in the coordinator.xml file. The changes to each file are outlined below in listing 8.

localDir=file:///misc/phannon

Listing 8 – New property in the property file.

Note the path is using normal URI syntax. The file:// is important, leave this off and your file will end up in HDFS.

<coordinator-app name="server-event-processor" frequency="${coord:days(1)}" start="${startDate}" end="${endDate}"
                 timezone="America/Los_Angeles"
                 xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowApplicationPath}</app-path>
            <configuration>
            ...SNIP...
                <property>
                    <name>localDest</name>
                    <value>${localDir}/server_events_processed_2011_06_06.log</value>
                </property>
            ...SNIP...
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Listing 9 – Exposing new property for the workflow

The additional property in listing 9 allowed my workflow.xml file to access a parameter called ${localDest} which is used to configure the java file move action. The new workflow is shown below in listing 10. The new file move action is highlighted. In order for the file move action to be called, I updated the ok tag on the file processor action to point to the move file action. This made the workflow continue to the file move action on success of the file processor action.

<workflow-app xmlns="uri:oozie:workflow:0.1" name="server-events-informatica-wf">
    <start to="ServerInformaticaProcessor"/>

    <action name="ServerInformaticaProcessor">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputs}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>com.edmunds.dwh.processor.serverevent.ServerEventMapJob$ServerEventMapClass</value>
                </property>
                <property>
                    <name>mapred.speculative.execution</name>
                    <value>false</value>
                </property>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.input.format.class</name>
                    <value>org.apache.hadoop.mapred.TextInputFormat</value>
                </property>
                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.hadoop.mapred.TextOutputFormat</value>
                </property>
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>0</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputs}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputs}</value>
                </property>
                <property>
                    <name>informaticaSupport</name>
                    <value>true</value>
                </property>
                <property>
                    <name>validate</name>
                    <value>false</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="move-to-local"/>
        <error to="fail-server-informatica"/>
    </action>
    <action name="move-to-local">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>com.edmunds.dwh.processor.HdfsToLocalFileMove</main-class>
            <arg>${outputs}</arg>
            <arg>${localDest}</arg>
            <capture-output/>
        </java>
        <ok to="end"/>
        <error to="fail-move-to-local"/>
    </action>
    <kill name="fail-server-informatica">
        <message>Informatica Processor failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <kill name="fail-move-to-local">
        <message>Copy to local failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Listing 10 – workflow.xml with file move action added

At this point I had a hard coded coordinator and workflow configuration that translates a file from one format to another. Also, the coodinator and workflow configuration have been updated to move the output from HDFS to a local file system. This left me with one final problem: I needed to run this job daily and update the path for inputs and outputs accordingly. Luckily, Oozie provides all the functionality I needed to solve the problem.

Moving to dynamically calculated file names

The input files I receive are sorted into directories based on year, month, and day. I wanted the output files to be stored with year, month, and day appended to the file name. In order to get Oozie to generate the file names, first I had to use the dataset tags to specify the file format and then use input and output events to specify the specific instances of the dataset files that I needed to read or generate. I had a bit of a hard time with this section, Oozie does provide documentation. However, it didn’t really stick how it worked until I played with the settings for a while.

The dataset specification was easy enough. I simply added the lines in listing 11 to the top of my coorinator.xml file.

   <datasets>
        <dataset name="logs" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${nameNode}/user/oozie/log_archive/prod/${YEAR}/${MONTH}/${DAY}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="processed" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${nameNode}/user/oozie/log_archive/prod/${YEAR}/${MONTH}/${DAY}/server_events_processed.log</uri-template>
        </dataset>
        <dataset name="localDir" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${localDir}/${environment}/${YEAR}/${MONTH}/${DAY}/server_events_processed.log</uri-template>
        </dataset>
    </datasets>

Listing 11 – Data set definitions for input and output uri templates

Then I created input event blocks right underneath the declaration for the datasets.

    <input-events>
        <data-in name="input" dataset="logs">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>
    <output-events>
        <data-out name="output" dataset="processed">
            <instance>${coord:current(0)}</instance>
        </data-out>
        <data-out name="localOutput" dataset="localDir">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>

Listing 12 – Event block definitions

To pass these to the workflow.xml I then updated the properties for the workflow in the coordinator.xml.

                <property>
                    <name>inputs</name>
                    <value>${coord:dataIn('input')}/*/server_events.log</value>
                </property>
                <property>
                    <name>outputs</name>
                    <value>${coord:dataOut('output')}</value>
                </property>
                <property>
                    <name>localDest</name>
                    <value>${coord:dataOut('localOutput')}</value>
                </property>

Listing 13 – Updated properties for calculated file names

The complete xml file is shown in listing 14 below.

<coordinator-app name="server-event-processor" frequency="${coord:days(1)}" start="${startDate}" end="${endDate}"
                 timezone="America/Los_Angeles"
                 xmlns="uri:oozie:coordinator:0.1">
    <datasets>
        <dataset name="logs" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${nameNode}/user/oozie/log_archive/prod/${YEAR}/${MONTH}/${DAY}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="processed" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${nameNode}/user/oozie/log_archive/prod/${YEAR}/${MONTH}/${DAY}/server_events_processed.log</uri-template>
        </dataset>
        <dataset name="localDir" frequency="${coord:days(1)}"
                 initial-instance="2011-01-01T24:00Z" timezone="America/Los_Angeles">
            <uri-template>${localDir}/${environment}/${YEAR}/${MONTH}/${DAY}/server_events_processed.log</uri-template>
        </dataset>
    </datasets>
    <input-events>
        <data-in name="input" dataset="logs">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>
    <output-events>
        <data-out name="output" dataset="processed">
            <instance>${coord:current(0)}</instance>
        </data-out>
        <data-out name="localOutput" dataset="localDir">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${workflowApplicationPath}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>default</value>
                </property>
                <property>
                    <name>maxErrors</name>
                    <value>${maxErrors}</value>
                </property>
                <property>
                    <name>inputs</name>
                    <value>${coord:dataIn('input')}/*/server_events.log</value>
                </property>
                <property>
                    <name>outputs</name>
                    <value>${coord:dataOut('output')}</value>
                </property>
                <property>
                    <name>localDest</name>
                    <value>${coord:dataOut('localOutput')}</value>
                </property>
                <property>
                    <name>oozie.wf.workflow.notification.url</name>
                    <value>http://eventpublisher:8080/hadoop-event-publisher-webapp/service/oozieNotification/updateWorkflow?jobId=$jobId&amp;status=$status</value>
                </property>

            </configuration>
        </workflow>
    </action>
</coordinator-app>

Listing 14 – Complete coordinator.xml

Since I already setup the workflow.xml with variables, I didn’t have to make any changes to the workflow file. The dataset block has some useful markers like DAY, MONTH, YEAR to help with URI construction.

Notice I did not put the /*/server_events.log in the dataset beacuse I do not think that it made sense for the dataset definition. I felt /*/server_events.log made more sense in the workflow configuration. I might try to revisit that decision if wildcards are supported in the dataset block, from the documentation I cannot tell. I was hoping that the markers DAY, MONTH, YEAR could be used in other blocks within the coordinator file. However, once I figured out that the input-events block select an actual instance of the URI pattern, I understood better that the DAY, MONTH, YEAR el was more like a macro than an actual variable that could be used elsewhere. If I wanted to access date information I could have also used the ${cood:current(int)} EL function. In my next revision of the mapper I will use the current EL to allow me to validate that the log data does is within the date range the log file is supposed to contain.

The first time I ran this code the coordinator job went into a waiting state. I had no idea why. After staring at the logs I noticed that every few seconds Oozie was checking for the existence of my files. At first I didn’t notice, but Oozie was actually looking for a specific file called “_SUCCESS” in my source directory. I remember seeing something about that in the documentation but didn’t pay attention. Looking back at the documentation I realized that the “_SUCCESS” file is a flag that lets Oozie know the source data was ready. Since our system does not have such a flag I had to add the empty done-flag markup you see in the coordinator dataset definition.

Next, I missed the, now obvious, fact that my properties file had to change to a more realistic start and stop time so that the coordinator could correctly calculate today, yesterday, etc. Our logs are rotated at midnight and my test set was for 06/06/2011 so, I set my start time to 06/07/2011 01:00 and my stop time to 06/07/2011 02:00. I figured that once my start date was set I could use current(-1) to give me the previous day. However, current does not appear to work that way, or at least current with the frequency setting I was using. Reading the documentation states:

The ${coord:days(int n)} EL function returns the number of minutes for ‘n’ complete days starting with the day of the specified nominal time for which the computation is being done.

I guess that makes sense? So, I set my frequency to ${coord:days(1)} because I wanted the process to run once every day.

Next, I had to feed a specific instance of my data set as the input event trigger. The documentation for input events can be found here. There is a special EL function called coord:current. The documentation states that

${coord:current(int n)} represents the nth dataset instance for a synchronous dataset, relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. The nth dataset instance is computed based on the dataset’s initial-instance datetime, its frequency and the (current) coordinator action creation (materialization) time.

n can be a negative integer, zero or a positive integer.

I figured that ${coord:current(-1)} for a start date of 06/07/2011 01:00 would resolve to 06/06/2011. It does not. However, ${coord:current(0)} does. I think if I had used a start date of 06/07/2011 24:00 it would work that way. Go figure.

Conclusion

Once I figured all this out I ran my script for a final (for that day) time. Everything worked! I expected a bit more of a rush. However, it didn’t feel like I accomplished much for the time spent. Hence, my blog post to ensure others don’t spend a day working though all this stuff. In the end reading though the configurations it all basically makes sense:

  • You have to write custom code to move stuff out of hdfs
  • Start and stop dates and times matter
  • You can have multiple output events for different actions
  • If you want to process logs from 06/06/2011 set the start date to 06/07/2011 and use current(0) (unless, I guess you set the start date to 06/07/2011 24:00)
  • Oozie allowed me to replace a bunch of custom code with some fairly simple xml
Advertisements
This entry was posted in Data Warehouse, Hadoop, Java. Bookmark the permalink.

5 Responses to Using Oozie to Process Daily Logs

  1. Rob says:

    Well done Paddy! Thanks for publishing your experiences with Oozie.
    Rob

  2. Harshal Vora says:

    How do you make sure that the logs are already present in HDFS before you start processing.
    I see that you use an empty , but there might be cases where the logs from some of your production servers are not transferred into HDFS due to some error on one of the production server.
    Do you have any checking and alerting system for this?

  3. Bryan Quinn says:

    Harshal, you can configure SLA events http://yahoo.github.com/oozie/releases/2.3.0/CoordinatorFunctionalSpec.html#a12._SLA_Handling
    but this just updates the Oozie SLA_EVENTS database table.
    Its up to your own alerting system to process them then.

  4. Kishore says:

    On a Multi Node cluster how do you specify which node will run the Java Action (considering the fact that Java actions are run as Map-Reduce jobs on the cluster). This becomes important because you are moving your file to the local FileSystem and that could mean you could be writing to local file systems on different machines in the cluster in different times.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s