Friday, 10 January 2014

MapReduce Multiple Inputs - Multiple Outputs

How do we read multiple files and execute diffrent mapper logics. Here is the code for the same.

Example : We have 2 files .. Find sum of salaries gender wise.

File1:emp1.txt

101,f,3000

102,m,4000

103,f,5000

104,m,5000

105,m,9000

File2:emp2.txt

201,aaaa,m,10000,11

202,b,m,30000,11

203,c,f,6000,14

204,dd,f,90000,14

205,ee,m,10000,13

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MultipleFiles {
public static class Map1 extends Mapper {
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[1];
int sal=Integer.parseInt(words[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Map2 extends Mapper
{
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[2];
int sal=Integer.parseInt(words[3]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Red extends Reducer
{
public void reduce(Text sex, Iterable salaries, Context con) throws IOException , InterruptedException
{
int tot=0;
for(IntWritable sal:salaries)
{
tot+=sal.get();
}
con.write(sex, new IntWritable(tot));
}
}
public static void main(String[] args) throws Exception
{
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
Job j = new Job(c,"multiple");
j.setJarByClass(MultipleFiles.class);
j.setMapperClass(Map1.class);
j.setMapperClass(Map2.class);
j.setReducerClass(Red.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(j, p1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(j,p2, TextInputFormat.class, Map2.class);
FileOutputFormat.setOutputPath(j, p3);
System.exit(j.waitForCompletion(true) ? 0:1);
}

}

==========================================================================================================================

When we run a MapReduce Job , hadoop jar [jar name] [driver class name] [input path] [output path_folder name]

3 kinds of files will be created in output directory

1. _SUCCESS file - which is 0kb file to tell job is success

2. PART-r-00000 files where 00000 is the starting sequence of outputs got generated.

3. Logs - Job logs will be stored here.

Now to write data to different files provided by user, find below example

I have a file contains Order Details ,Member Details, Item Details, Tax Details, Tender Details

Based on a column value in this file should write outputs to diffrent filez

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;

public class keyCount {

public static class MapClass extends Mapper {

private Text k = new Text();
private Text val = new Text();

int iOrdSeq = 1;

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

System.out.println("value is 1:::::" + value);
System.out.println("Key1:" + value.toString().substring(12, 14));
String key1 = value.toString().substring(12, 14);
/*
* WC-TYPE : 10 -- OrderFile ; WC-TYPE : 28 -- MmemberFile ; WC-TYPE
* :31 -- ItemFile ; WC-TYPE : 37 -- TaxFile ; WC-TYPE : 44 --
* TenderFile
*/
if ("10".equals(key1)) {

String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);

String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + "|");
strVal.append(strOrdCountryCode + "|");
strVal.append(strOrdDate + "|");
strVal.append(strOrdTime + "|");
strVal.append(iOrdSeq + "|");
strVal.append(strOrdOrderID + "|");
strVal.append(strOrdChannelCode );
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

}

else if ("28".equals(key1)) {

String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("31".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("37".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("44".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

}

context.write(k, val);
context.setStatus(context.getCurrentKey().toString());
iOrdSeq++;
}

}

public static class ReducerClass extends Reducer {

private MultipleOutputs mos;

public void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {

System.out.println("REDUCER Start");

for (Text value : values) {
String key1 = key.toString();


if (key1.equalsIgnoreCase("10")) {
mos.write("ORDER", NullWritable.get(), value);

} else if (key1.equalsIgnoreCase("28")) {
mos.write("MEMBER", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("31")) {
mos.write("ITEM", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("37")) {
mos.write("TAX", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("44")) {
mos.write("TENDER", key1, value);
}

}

}

public void cleanup(Context context) throws IOException {
try {
mos.close();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();

if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = new Job(conf, "SAMSTLogFileGeneration");
job.setJarByClass(keyCount.class);
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReducerClass.class);
//job.setOutputValueClass(Text.class);

MultipleOutputs.addNamedOutput(job, "ORDER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "MEMBER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "ITEM", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TAX", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TENDER", TextOutputFormat.class,
Text.class, Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

1 comment:

  1. Really nice blog post.provided a helpful information.I hope that you will post more updates like this Big data hadoop online Course India


    ReplyDelete