Chain MapReduce jobs in Java
tags: MapReduce
Below is the sample code.
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
public class KMeans extends Configured implements Tool
{
@Override
public int run(String[] args) throws Exception
{
System.out.println(Arrays.toString(args));
int nIter = Integer.parseInt(args[2]);
Path pInput = new Path(args[0]);
Path dirCC = (new Path(args[3])).getParent();
String fClusterCenters = args[3];
for (int i = 1; i <= nIter; ++i) {
Job job = Job.getInstance(getConf(), "KMeans");
job.setJarByClass(KMeans.class);
job.getConfiguration().setInt("kmeans.iteration.id", i);
job.addCacheFile(new URI(fClusterCenters + "#cluster_centers"));
job.setMapperClass(KMeansMapper.class);
job.setCombinerClass(KMeansReducer.class);
job.setReducerClass(KMeansReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleArrayWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path pOutput = new Path(args[1] + "_" + i);
FileInputFormat.addInputPath(job, pInput);
FileOutputFormat.setOutputPath(job, pOutput);
if (!job.waitForCompletion(true))
return 1;
// Merge output files into one
FileSystem fs = FileSystem.get(job.getConfiguration());
Path pMergedOutput = new Path(dirCC, "cluster_centers_" + i);
if (fs.exists(pMergedOutput))
fs.delete(pMergedOutput, false);
FileUtil.copyMerge(fs, pOutput,
fs, pMergedOutput,
false,
job.getConfiguration(),
"");
fClusterCenters = pMergedOutput.toString();
}
return 0;
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
int res = ToolRunner.run(new Configuration(),
new KMeans(),
args);
System.exit(res);
}
// ...
}
Then in the mapper/reducer classes, we could add
public static class KMeansMapper
extends Mapper<LongWritable, Text, IntWritable, DoubleArrayWritable>
{
private IntWritable mapkey = new IntWritable();
private DoubleArrayWritable mapvalue = new DoubleArrayWritable();
private List<double[]> clusterCenters = new ArrayList<double[]>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException, IllegalArgumentException
{
Configuration conf = context.getConfiguration();
int iterId = conf.getInt("kmeans.iteration.id", 0);
int offs = (iterId > 1)? 3 : 0;
BufferedReader r = new BufferedReader(new FileReader("./cluster_centers"));
String line;
while ((line = r.readLine()) != null) {
String[] coord = line.split("\\s+");
double[] a = new double[coord.length - offs];
for (int i = 0; i < coord.length - offs; ++i)
a[i] = Double.parseDouble(coord[i + offs]);
clusterCenters.add(a);
}
if (clusterCenters.isEmpty())
throw new IllegalArgumentException("Zero input cluster centers");
}
// ...
}