Friday, 10 January 2014

MapReduce - Job Chaining / Mearging

MapReduce - Join Approches - Map Side and Reduce Side joins

MapReduce - Custom Grouping - Custom Partitining - Custom Sorting - Secondary Sorting

MapReduce - Custom Keys and Custom Values - Writable and WritableComparable

MapReduce - More on Record Reader

MapReduce - Custom InputFormats

MapReduce - InputFormats

MapReduce - Basic Programming

MapReduce - Framework Related

MapReduce : It’s a framework for processing the data residing on HDFS. Distributes the task (map/reduce) across several machines. Consist of typically 5 phases:
• Map
• Partitioning
• Sorting
• Shuffling
• Reduce
MapReduce Terminology :

What is job? Complete execution of mapper and reducers over the entire data set.

What is task? Single unit of execution (map or reduce task), Execution of map or reduce over a portion of data typically a block of data.

What is task attempt ? Instance of an attempt to execute a task (map or reduce task). If task is failed working on particular portion of data, another task will run on that portion of data on that machine itself. If a task fails 4 times, then the task is marked as failed and entire job fails. Make sure that atleast one attempt of task is run on different machine.
• How many tasks can run on portion of data? Maximum 4, If “speculative execution” is ON, more task will run.

What is “failed task”? Task can be failed due to exception, machine failure etc. A failed task will be re-attempted again (4 times).

What is “killed task”? If task fails 4 times, then task is killed and entire job fails. Task which runs as part of speculative execution will also be marked as killed.
Input Split :

Portion or chunk of data on which mapper operates, Typically is equal to one block of data (dfs.block.size).

Each mapper works only on one input split

Consider your block size is 64 MB and your Data is 1 GB then Number of splits is 16. If your Block remains same and your Split size is 128MB then number of splits is 8 i.e 2 HDFS blocks will be a split i.e your one Mapper will run on 128 MB of data.

How to control Input Split size? Generally input split is equal to block size (64MB), if you want mapper to work only on 32 MB/128 MB of a block data.

Using 3 Properties we can control split size :
Mapred.min.split.size ( default 1)
Mapred.max.split.size (default LONG.MAX_VALUE)
Dfs.block.size ( default 64 MB)
Simple Formal to set size split : Max(minSplitSize,min(maxSplitSize,blockSize).

We know task can be a Mapper or Reducer task.

Map task executes on input splits and
Reduce task executes on intermediate output generated by map task as showed in below figure.





Mapper :

1. Mapper is the first phase of MapReduce job.
2. Mapper works on 1 split of data typically 1 block (By default 1 split is 1 block, if your split is not same as your block size then there is an overhead that will be added to your mapreduce processing to divide your data which is present in HDFS(internally stored as blocks) into splits, this overhead will not be there if your split is same as block. Then you should have question why we have a choise of adjusting split size and how split size affect your M/R performance. I will cover this topic in a seperate feature blog.)
3. MapReduce framework ensures that map task is run closer to the data for implementing data Localization.
4. Several map tasks runs parallel on different machines and each working on different portions of data.
5. Mapper read's data from one split in the form of (in_key,in_value) and emits output in the form of (out_key1,out_value1).

                         

6. Mapper can emit any number of output (key,value) pairs that depends on your mapper logic. i.e 0,1,0r n.
7. Mapper output will be created in local file system on that particular data node where mapper is running instead of creating in HDFS becuase mapper output is temporary in nature and replication is not required.

How mapper read's data from split ?

When writing a mapper function it is not required to deal with how to read data from split. This will be taken care by mapreduce framework class called RecordReader. You have to just tell RecordReader about your data how it look's,Wether your data is TextInputFormat/Sequential input format/Nline input format etc in your driver code. RecordReader is resposible of providing (in_key,in_value) to your mapper.

7. RecordReader call your map function which is present in your mapper for each record in your split.
           a. Input Split consist of records,For each record in the input split, map function will be called.
           b. Each record will be sent as key –value pair to map function.
           c. When we write map function keep only one record in mind.
           d. Mapper does not keep the state of whether how many records it has processed or how any           records will appear.

Reducer :

1. Reducer runs when all mapper tasks are completed.
2. After mapper phase , all the intermediate values for a given intermediate keys is grouped together and form a list.

              
3. Reducer operates on Key, and List of Values
4. When writing reducer keep ONLY one key and its list of value in mind.
5. Reduce operates ONLY on one key and its list of values at a time.
6. For better load balancing you should have more than one reducer.User have control to increase or reduce number of reducers.
7. Reducer does not work based on data localization as intermediate data generated by your mappers is been sent where reducer logic is running i.e data is been moved to where your logic present.

After we understand some basics of mapreduce , See my blog on MapReduce - Basic Programming for how to write a mapper, reducer and driver class.

http://hadoopmapred.blogspot.in/2014/01/mapreduce-basic-programming.html
 

MapReduce Multiple Inputs - Multiple Outputs

How do we read multiple files and execute diffrent mapper logics. Here is the code for the same.

Example : We have 2 files .. Find sum of salaries gender wise.

File1:emp1.txt

101,f,3000

102,m,4000

103,f,5000

104,m,5000

105,m,9000

File2:emp2.txt

201,aaaa,m,10000,11

202,b,m,30000,11

203,c,f,6000,14

204,dd,f,90000,14

205,ee,m,10000,13

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MultipleFiles {
public static class Map1 extends Mapper {
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[1];
int sal=Integer.parseInt(words[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Map2 extends Mapper
{
public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[2];
int sal=Integer.parseInt(words[3]);
con.write(new Text(sex), new IntWritable(sal));
}
}
public static class Red extends Reducer
{
public void reduce(Text sex, Iterable salaries, Context con) throws IOException , InterruptedException
{
int tot=0;
for(IntWritable sal:salaries)
{
tot+=sal.get();
}
con.write(sex, new IntWritable(tot));
}
}
public static void main(String[] args) throws Exception
{
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
Job j = new Job(c,"multiple");
j.setJarByClass(MultipleFiles.class);
j.setMapperClass(Map1.class);
j.setMapperClass(Map2.class);
j.setReducerClass(Red.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(j, p1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(j,p2, TextInputFormat.class, Map2.class);
FileOutputFormat.setOutputPath(j, p3);
System.exit(j.waitForCompletion(true) ? 0:1);
}

}

==========================================================================================================================

When we run a MapReduce Job , hadoop jar [jar name] [driver class name] [input path] [output path_folder name]

3 kinds of files will be created in output directory

1. _SUCCESS file - which is 0kb file to tell job is success

2. PART-r-00000 files where 00000 is the starting sequence of outputs got generated.

3. Logs - Job logs will be stored here.

Now to write data to different files provided by user, find below example

I have a file contains Order Details ,Member Details, Item Details, Tax Details, Tender Details

Based on a column value in this file should write outputs to diffrent filez

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;

public class keyCount {

public static class MapClass extends Mapper {

private Text k = new Text();
private Text val = new Text();

int iOrdSeq = 1;

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

System.out.println("value is 1:::::" + value);
System.out.println("Key1:" + value.toString().substring(12, 14));
String key1 = value.toString().substring(12, 14);
/*
* WC-TYPE : 10 -- OrderFile ; WC-TYPE : 28 -- MmemberFile ; WC-TYPE
* :31 -- ItemFile ; WC-TYPE : 37 -- TaxFile ; WC-TYPE : 44 --
* TenderFile
*/
if ("10".equals(key1)) {

String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);

String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + "|");
strVal.append(strOrdCountryCode + "|");
strVal.append(strOrdDate + "|");
strVal.append(strOrdTime + "|");
strVal.append(iOrdSeq + "|");
strVal.append(strOrdOrderID + "|");
strVal.append(strOrdChannelCode );
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);
System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

}

else if ("28".equals(key1)) {

String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("31".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("37".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

} else if ("44".equals(key1)) {
String strOrdClubNo = value.toString().substring(5, 9);
String strOrdCountryCode = "US";
String strOrdDate = value.toString().substring(15, 21);
String strOrdTime = value.toString().substring(21, 25);
int strOrdSeq = 1;
String strOrdOrderID = value.toString().substring(36, 43);
String strOrdChannelCode = "7";

StringBuffer strVal = new StringBuffer();
strVal.append(strOrdClubNo + ",");
strVal.append(strOrdCountryCode + ",");
strVal.append(strOrdDate + ",");
strVal.append(strOrdTime + ",");
strVal.append(strOrdSeq + ",");
strVal.append(strOrdOrderID + ",");
strVal.append(strOrdChannelCode + ",");
String strVal1 = strVal.toString();
k.set(key1);
val.set(strVal1);

System.out.println("Key and Key Hashcode : " + k.toString()
+ "\t" + k.hashCode());
System.out.println("Partition Number : "
+ (k.hashCode() & Integer.MAX_VALUE) % 5);

}

context.write(k, val);
context.setStatus(context.getCurrentKey().toString());
iOrdSeq++;
}

}

public static class ReducerClass extends Reducer {

private MultipleOutputs mos;

public void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {

System.out.println("REDUCER Start");

for (Text value : values) {
String key1 = key.toString();


if (key1.equalsIgnoreCase("10")) {
mos.write("ORDER", NullWritable.get(), value);

} else if (key1.equalsIgnoreCase("28")) {
mos.write("MEMBER", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("31")) {
mos.write("ITEM", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("37")) {
mos.write("TAX", NullWritable.get(), value);
} else if (key1.equalsIgnoreCase("44")) {
mos.write("TENDER", key1, value);
}

}

}

public void cleanup(Context context) throws IOException {
try {
mos.close();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();

if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = new Job(conf, "SAMSTLogFileGeneration");
job.setJarByClass(keyCount.class);
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReducerClass.class);
//job.setOutputValueClass(Text.class);

MultipleOutputs.addNamedOutput(job, "ORDER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "MEMBER", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "ITEM", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TAX", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "TENDER", TextOutputFormat.class,
Text.class, Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}