Pig Keeps Temp Files

Did you know that Pig Keeps Temp Files between script submissions?  Yes, unfortunately it does.  This happens when you run Pig from inside of a web application.  The problem is that internally Pig creates a number of temp files, but does not actively clean up after itself.  It relies upon the File.deleteOnExit(), which does not delete the temp files until the JVM terminates.  Of course, this is not a problem when the script is run from Grunt, pig-on-elephantbecause it terminates after each submission.  However, when Pig is run from inside of a web application, then the JVM does not terminate, and very quickly you will have a big problem, Pig Keeps Temp Files.  Soon your /tmp directory gets cluttered with job specific Jar files and working directories.

This behavior was last seen in Pig 0.11.1.  Hopefully, this will be fixed in a future release, but in the meantime what is someone to do when Pig Keeps Temp Files.  I considered some possibilities.  The first was to spawn an entirely new process, with its own JVM to run my Pig scripts from the web application.  However, I decided against this because this just seemed like a pretty heavy weight solution, and would require that the entire processing model be changed, not really a good idea.  I considered becoming a Pig contributor and fixing it in the source code, but my boss axed that idea because he could not be guaranteed a solution when he needed it.  So instead I turned to AOP, which is a common approach for inserting functionality into third party libraries.  The thinking here was what if I could use AOP to expose the information about the temp files and then delete those entries after the Pig script completed, it would at least minimize the problem.  It would not eliminate it, because the entries would still exist in the LinkedHashSet in the DeleteOnExitHook class, which is used by the File.deleteOnExit(), but at least the /tmp directory would get cleaned up.

Here is what I did to deal with when Pig Keeps Temp Files, at least until it can be fixed properly.  The first thing I needed to do was to find the place where the temp files are created and then inject a pointcut in the appropriate method.  I needed to use AspectJ just because I am in one of those shops where Spring is dirty word—go figure.  Anyway, I downloaded the Pig source code and traced through it.  I found what I was looking for in the JobControlCompiler.getJob() method.  This is where the temporary Jar file that will be submitted with the job gets created.  The getJob() will return the fully populated Job object, before it has been submitted.  It is important to capture the information before the job gets submitted, because once the job gets submitted the jar file will be copied over to the .staging directory in HDFS.  In addition, the other important piece of information is the value found in the “pig.schematupple.local.dir” property.  This is the path of temporary directory that will be created for the job.  This can be retrieved from the Configuration object in the usual way.

The Aspect

Below is the Aspect that I created to be weaved into the Pig Jar to create a new AOP enhanced Pig Jar.  I chose to use a post-compile time weave, as opposed to a load time weave, because it made more sense to do the weave once and then re-used the new Pig Jar.

package com.freitas.pig.aop;

import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;

import com.freitas.utils.PigJobSingleton;

public aspect PigWeaveAspect {

    pointcut jobCatch() : call(*
        org.apache.pig.backend.hadoop.executionengine.
        mapReduceLayer.JobControlCompiler.getJob(..));

    after() returning(org.apache.hadoop.mapred.jobcontrol.Job job)
        : jobCatch() {

        if (job != null){
            String jarFilePath = job.getJobConf().
                get("mapred.jar");
            String schemaDirPath = job.getJobConf().get(
                "pig.schematuple.local.dir");
            String jobName = job.getJobName();
            PigJobSingleton pigJobSingleton = 
                PigJobSingleton.getInstance();
            pigJobSingleton.addTempFile(jobName, jarFilePath);
            pigJobSingleton.addTempFile(jobName, schemaDirPath);
        }

    }
}

The Singleton

The observant reader will have noticed that there is this PigJobSingleton object that has not been explained yet.  This is just using the classic Singleton pattern.  The idea is simple, create a singleton object, which will hold the set of temporary files for each of the Pig script executions.  Note: the use of the Singleton was only done here because my current shop is not using a Dependency Injection framework.  If you are using DI, then I would recommend using that instead.  

Next, I needed to consider what lookup key to use.  The Pig script could be composed of more than one Hadoop job, so using the JobID would not work out that well, which was further compounded by the fact that the temporary file information needed to captured before the job gets submitted and the JobID would not even be available.  Instead, it is necessary to key everything by the job group name.

However, in order for this to work, it will be necessary to ensure each job group name is unique.  By default Pig will use the name of the script file and prepend the string “PigLatin:” to get the job group name (eg: PigLatin:myPigScript).  So it was necessary to create a method to create a unique script name before submitting it.  This was done by simply adding a constantly increasing number to the current name, see the method PigJobSingleton.getUniqueJobName().

The general flow of using this will be:

  1. Before the starting create a unique name for the Pig script and retain the unique name, then submit it
  2. The Aspect will be called when a new Hadoop job is created, it will create a hashmap entry in the PigJobSingleton for the job group and add all the temp files, stored as a list of file paths
  3. At the completion of the Pig script, the PigJobSingleton will be referenced to retrieve the list of temp files
  4. Iterate through the list and delete each file/directory
  5. Remove the entry from the PigJobSingleton hashmap

package com.freitas.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class PigJobSingleton {

    // need to use this prefix because Pig will
    // add it to the job name
    public static final String PIG_JOB_PREFIX = "PigLatin:";
    private Map<String, List<String>> jobMap = new
        ConcurrentHashMap<String, List<String>>();
    private int nextJobId = 1;

    static class SingletonHolder {
        static PigJobSingleton instance;
        static {
            instance = new PigJobSingleton();
        }
    }

    public static PigJobSingleton getInstance() {
        return SingletonHolder.instance;
    }

    public void addTempFile(String jobName, String fqFileName) {
        String prefixJobName = checkForPrefix(jobName);
        List<String> currList = jobMap.get(prefixJobName);
        if (currList == null){
            // means there is no entry for this job yet
            currList = new ArrayList<String>();
        }
        currList.add(fqFileName);
        jobMap.put(prefixJobName, currList);
    }

    public List<String> getTempFileList(String jobName){
        List<String> currList = jobMap.get(
            checkForPrefix(jobName));
        if (currList == null){
            return Collections.emptyList();
        }
        return currList;
    }

    public void deletJobEntry(String jobName){
        jobMap.remove(checkForPrefix(jobName));
    }

    public synchronized String getUniqueJobName(String scriptName){
        String[] nameParts = scriptName.split("\\.");
        String jobNameCore = null;
        if (nameParts.length > 0){
            jobNameCore = nameParts[0];
        }
        else {
            jobNameCore = scriptName;
        }
        StringBuffer sb = new StringBuffer();
        sb.append(jobNameCore);
        sb.append("_");
        sb.append(String.format("%03d", nextJobId++));
        sb.append(".pig");
        return sb.toString();
    }

    private String checkForPrefix(String jobName){
        // need to make sure the job name starts
        // with the Pig prefix
        if (jobName.startsWith(PIG_JOB_PREFIX)){
            return jobName;
        }
        return PIG_JOB_PREFIX + jobName;
    }
}

Weaving the Jar

The command line execution of the AspectJ weave looked something like what is shown below.  This used a project named PigWeave, took the original Pig jar as an input and created a new AOP enhanced Pig jar.

ajc –inpath ./PigWeave/orig/pig-0.11.0-cdh4.3.0-withouthadoop.jar
-sourceroots ./PigWeave/src/com/freitas/pig/aop
-outjar ./PigWeave/target/aop-pig-0.11.0-cdh4.3.0-withouthadoop.jar
-Xlintfile ./PigWeave/Xlint.properties

 The Xlint.properties file was needed because there was a large number of references in the Pig Jar that will not be resolved, and the default setting for cantFindType is error, so it was necessary to override the default to warning.

cantFindType = warning

Putting it All Together

Then finally, in the web application, it was necessary to put all the pieces of functionality together by adding some code before and after Pig script execution to reference the PigJobSingleton, retrieve the set of temp files, deletes each and finally remove the hashmap entry.

String uniqueScript = customUniqueScript(scriptFileName);
String[] args = {"-Dpig.logfile=" + logFileName, "-stop_on_failure", uniqueScript };
PigStats pigStats = PigRunner.run(args, jobListener);

PigJobSingleton pigJobSingleton = PigJobSingleton.getInstance();
List<String> tempFilesToDelete = pigJobSingleton.getTempFileList(uniqueScript);
for (String tempFile : tempFilesToDelete){
    new File(tempFile).delete();
}
pigJobSingleton.deletJobEntry(uniqueScript);
new File(uniqueScript).delete();

private String customUniqueScript(File sourceFile) throws IOException {
    PigJobSingleton pigJobSingleton = PigJobSingleton.getInstance();
    String scriptFileName = pigJobSingleton.getUniqueJobName(sourceFile.getName());
    PrintWriter pw = new PrintWriter(new FileWriter(scriptFileName));
    FileInputStream fis = new FileInputStream(sourceFile);
    BufferedReader in = new BufferedReader(new InputStreamReader(fis));
    String inLine = null;
    while ((inLine = in.readLine()) != null) {
        pw.print(inLine + "\n");
    }
    pw.close();
    in.close();
    return scriptFileName;
}

Conclusion

Although this does not completely solve all the problems created when Pig Keeps Temp Files, it does help a great deal.  But the important things are that this is something you can do immediately, and have some control over.