Friday, 10 January 2014
MapReduce - Framework Related
• Map
• Partitioning
• Sorting
• Shuffling
• Reduce
• What is “failed task”? Task can be failed due to exception, machine failure etc. A failed task will be re-attempted again (4 times).
• What is “killed task”? If task fails 4 times, then task is killed and entire job fails. Task which runs as part of speculative execution will also be marked as killed.
Portion or chunk of data on which mapper operates, Typically is equal to one block of data (dfs.block.size).
Each mapper works only on one input split
Consider your block size is 64 MB and your Data is 1 GB then Number of splits is 16. If your Block remains same and your Split size is 128MB then number of splits is 8 i.e 2 HDFS blocks will be a split i.e your one Mapper will run on 128 MB of data.
How to control Input Split size? Generally input split is equal to block size (64MB), if you want mapper to work only on 32 MB/128 MB of a block data.
Using 3 Properties we can control split size :
• Mapred.min.split.size ( default 1)
• Mapred.max.split.size (default LONG.MAX_VALUE)
• Dfs.block.size ( default 64 MB)
Simple Formal to set size split : Max(minSplitSize,min(maxSplitSize,blockSize).
We know task can be a Mapper or Reducer task.
Map task executes on input splits and
Reduce task executes on intermediate output generated by map task as showed in below figure.
1. Mapper is the first phase of MapReduce job.
2. Mapper works on 1 split of data typically 1 block (By default 1 split is 1 block, if your split is not same as your block size then there is an overhead that will be added to your mapreduce processing to divide your data which is present in HDFS(internally stored as blocks) into splits, this overhead will not be there if your split is same as block. Then you should have question why we have a choise of adjusting split size and how split size affect your M/R performance. I will cover this topic in a seperate feature blog.)
3. MapReduce framework ensures that map task is run closer to the data for implementing data Localization.
4. Several map tasks runs parallel on different machines and each working on different portions of data.
5. Mapper read's data from one split in the form of (in_key,in_value) and emits output in the form of (out_key1,out_value1).
6. Mapper can emit any number of output (key,value) pairs that depends on your mapper logic. i.e 0,1,0r n.
7. Mapper output will be created in local file system on that particular data node where mapper is running instead of creating in HDFS becuase mapper output is temporary in nature and replication is not required.
How mapper read's data from split ?
When writing a mapper function it is not required to deal with how to read data from split. This will be taken care by mapreduce framework class called RecordReader. You have to just tell RecordReader about your data how it look's,Wether your data is TextInputFormat/Sequential input format/Nline input format etc in your driver code. RecordReader is resposible of providing (in_key,in_value) to your mapper.
7. RecordReader call your map function which is present in your mapper for each record in your split.
a. Input Split consist of records,For each record in the input split, map function will be called.
b. Each record will be sent as key –value pair to map function.
c. When we write map function keep only one record in mind.
d. Mapper does not keep the state of whether how many records it has processed or how any records will appear.
Reducer :
1. Reducer runs when all mapper tasks are completed.
2. After mapper phase , all the intermediate values for a given intermediate keys is grouped together and form a list.

3. Reducer operates on Key, and List of Values
4. When writing reducer keep ONLY one key and its list of value in mind.
5. Reduce operates ONLY on one key and its list of values at a time.
6. For better load balancing you should have more than one reducer.User have control to increase or reduce number of reducers.
7. Reducer does not work based on data localization as intermediate data generated by your mappers is been sent where reducer logic is running i.e data is been moved to where your logic present.
After we understand some basics of mapreduce , See my blog on MapReduce - Basic Programming for how to write a mapper, reducer and driver class.
http://hadoopmapred.blogspot.in/2014/01/mapreduce-basic-programming.html
MapReduce Multiple Inputs - Multiple Outputs
How do we read multiple files and execute diffrent mapper logics. Here is the code for the same.
Example : We have 2 files .. Find sum of salaries gender wise.
File1:emp1.txt
101,f,3000
102,m,4000
103,f,5000
104,m,5000
105,m,9000
File2:emp2.txt
201,aaaa,m,10000,11
202,b,m,30000,11
203,c,f,6000,14
204,dd,f,90000,14
205,ee,m,10000,13
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MultipleFiles {
public static class Map1 extends Mapper
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[1];
int sal=Integer.parseInt(words[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Map2 extends Mapper
{
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[2];
int sal=Integer.parseInt(words[3]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Red extends Reducer
{
public void reduce(Text sex, Iterable
{
int tot=0;
for(IntWritable sal:salaries)
{
tot+=sal.get();
}
con.write(sex, new IntWritable(tot));
}
}
public static void main(String[] args) throws Exception
{
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
Job j = new Job(c,"multiple");
j.setJarByClass(MultipleFiles.class);
j.setMapperClass(Map1.class);
j.setMapperClass(Map2.class);
j.setReducerClass(Red.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(j, p1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(j,p2, TextInputFormat.class, Map2.class);
FileOutputFormat.setOutputPath(j, p3);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
==========================================================================================================================
When we run a MapReduce Job , hadoop jar [jar name] [driver class name] [input path] [output path_folder name]
3 kinds of files will be created in output directory
1. _SUCCESS file - which is 0kb file to tell job is success
2. PART-r-00000 files where 00000 is the starting sequence of outputs got generated.
3. Logs - Job logs will be stored here.
Now to write data to different files provided by user, find below example
I have a file contains Order Details ,Member Details, Item Details, Tax Details, Tender Details
Based on a column value in this file should write outputs to diffrent filez
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
public class keyCount {
public static class MapClass extends Mapper
private Text k = new Text();
private Text val = new Text();
int iOrdSeq = 1;
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("value is 1:::::" + value);
System.out.println("Key1:" + value.toString().substring(12, 14));
String key1 = value.toString().substring(12, 14);
/*
* WC-TYPE : 10 -- OrderFile ; WC-TYPE : 28 -- MmemberFile ; WC-TYPE
* :31 -- ItemFile ; WC-TYPE : 37 -- TaxFile ; WC-TYPE : 44 --
* TenderFile
*/
if ("10".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";
StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + "|");
strVal.append(strOrdCountryCode + "|");
strVal.append(strOrdDate + "|");
strVal.append(strOrdTime + "|");
strVal.append(iOrdSeq + "|");
strVal.append(strOrdOrderID + "|");
strVal.append(strOrdChannelCode );
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);
}
else if ("28".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";
StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);
} else if ("31".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";
StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);
} else if ("37".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";
StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);
} else if ("44".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";
StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);
}
context.write(k, val);
context.setStatus(context.getCurrentKey().toString());
iOrdSeq++;
}
}
public static class ReducerClass extends Reducer
private MultipleOutputs
public void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs
}
public void reduce(Text key, Iterable
throws IOException, InterruptedException {
System.out.println("REDUCER Start");
for (Text value : values) {
String key1 = key.toString();
if (key1.equalsIgnoreCase("10")) {
mos.write("ORDER", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("28")) {
mos.write("MEMBER", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("31")) {
mos.write("ITEM", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("37")) {
mos.write("TAX", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("44")) {
mos.write("TENDER", key1, value);
}
}
}
public void cleanup(Context context) throws IOException {
try {
mos.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount
System.exit(2);
}
Job job = new Job(conf, "SAMSTLogFileGeneration");
job.setJarByClass(keyCount.class);
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReducerClass.class);
//job.setOutputValueClass(Text.class);
MultipleOutputs.addNamedOutput(job, "ORDER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "MEMBER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "ITEM", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TAX", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TENDER", TextOutputFormat.class,
Text.class, Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


