1. Introduction to Developing Hadoop Applications
- Introducing MapReduce concepts and history
- Discribing how MapReduce works at a high-level and how data flows in it
The typical example of MapReduce applications is Word Count. As an input, there is many files that are splitting amongst the TaskTracker nodes where the files are located. The splits are of multiple record, here a record is a line. The Map function gets a Key-Value pairs, and just uses the Value (i.e. line) to calculate one occurrence at a time of each word. Then a Combine function aggregates the occurrences and pass them to the Shuffle function The later is handled by the framework and aims to gather the output of prior functions by keys before sending them to the reducers. The Reduce function takes a list of all occurrence (i.e. value) of a word (i.e. key) to sum them up and return the total time the word has been seen.
Run the word count example:
1. Prepare a set of input text files:
$ mkdir -p /user/user01/1.1/IN1
$ cp /etc/*.conf /user/user01/1.1/IN1 2> /dev/null
$ ls /user/user01/1.1/IN1 | wc -l
2. Run word count application using the previously created files
$ hadoop jar /opt/mapr/hadoop/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1-mapr-1408.jar wordcount /user/user01/1.1/IN1 /user/user01/1.1/OUT1
3. Check the job output
$ wc -l /user/user01/1.1/OUT1/part-r-00000
$ more /user/user01/1.1/OUT1/part-r-00000
Trying binary files as input:
$ mkdir -p /user/user01/1.1/IN2/mybinary
$ cp /bin/cp /user/user01/1.1/IN2/mybinary
$ hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/hadoop-0.20.2-dev-examples.jar wordcount /user/user01/1.1/IN2/mybinary /user/user01/1.1/OUT2
$ more /user/user01/1.1/OUT2/part-r-00000
Look for reference in the input and output of work AUTH:
$ strings /user/user01/1.1/IN2/mybinary | grep -c ATUH
$ egrep -ac ATUH /user/user01/1.1/OUT2/part-r-00000
MapReduce Workflow:
- Introducing MapReduce concepts and history
- Discribing how MapReduce works at a high-level and how data flows in it
The typical example of MapReduce applications is Word Count. As an input, there is many files that are splitting amongst the TaskTracker nodes where the files are located. The splits are of multiple record, here a record is a line. The Map function gets a Key-Value pairs, and just uses the Value (i.e. line) to calculate one occurrence at a time of each word. Then a Combine function aggregates the occurrences and pass them to the Shuffle function The later is handled by the framework and aims to gather the output of prior functions by keys before sending them to the reducers. The Reduce function takes a list of all occurrence (i.e. value) of a word (i.e. key) to sum them up and return the total time the word has been seen.
MapReduce example: Word Count |
1. Prepare a set of input text files:
$ mkdir -p /user/user01/1.1/IN1
$ cp /etc/*.conf /user/user01/1.1/IN1 2> /dev/null
$ ls /user/user01/1.1/IN1 | wc -l
2. Run word count application using the previously created files
$ hadoop jar /opt/mapr/hadoop/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1-mapr-1408.jar wordcount /user/user01/1.1/IN1 /user/user01/1.1/OUT1
3. Check the job output
$ wc -l /user/user01/1.1/OUT1/part-r-00000
$ more /user/user01/1.1/OUT1/part-r-00000
Trying binary files as input:
$ mkdir -p /user/user01/1.1/IN2/mybinary
$ cp /bin/cp /user/user01/1.1/IN2/mybinary
$ file /user/user01/1.1/IN2/mybinary
$ strings /user/user01/1.1/IN2/mybinary | more$ hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/hadoop-0.20.2-dev-examples.jar wordcount /user/user01/1.1/IN2/mybinary /user/user01/1.1/OUT2
$ more /user/user01/1.1/OUT2/part-r-00000
Look for reference in the input and output of work AUTH:
$ strings /user/user01/1.1/IN2/mybinary | grep -c ATUH
$ egrep -ac ATUH /user/user01/1.1/OUT2/part-r-00000
MapReduce execution summary and data flow |
MapReduce Workflow:
- Load production data into HDFS with tools like Sqoop for SQL data, Flume for log data, or traditional tools as MapR-FS support POSIX operations and NFS access.
- Analyze, Store, Read.
The InputFormat object is responsible for validating the job input, splitting files among mappers and instantiating the RecordReader. By default, the size of an input split is equal to the size of a block which is 64Mb in Hadoop and it is the size of a chunk in MapR which is 256 Mb. Each input Split references a set of Records which will be broken into a Key-Value for the Mapper. The TaskTracker passes the split input to the RecordReader constructor which will read the records one by one and passes them to the mapper as key-value pairs. By default, the RecordReader considers a line as a record. This can be modified by extending the RecordReader and InputFormat classes to define different records in the input file, for example multi-line records.
The Partitioner takes the output generated by the Map functions, hashes the record key to create partitions based on the key. By default, each partition will be passed to a reducer, this behavior can be overrided. As part of Shuffle operation, The partitions are then sorted and merged as preparation before sending them to the reducers. Once an intermediate partition is complete, it will be send over the network using protocols like RPC or HTTP.
The result of a MapReduce job is writing to an output directory:
- an empty file named _SUCCESS is created to indicate the success of the job,
- the history of the job is captured under the _log/history* directory,
- the output of the reduce job is captured under part-r-00000, part-r-00001, ...
- if you run a map-only job the output will be part-m-00000, part-m-00001, ...
Hadoop Job Scheduling
Two schedulers are available in hadoop, the use of each one is declared in mapred-site.xml:- by default the Fair Scheduler is used where resources are shared evenly across pools (a slot of resources) and each user has its own pool. Custom pools can be configured to guaranty minimum access to pools to prevent starvation. This scheduler supports preemption.
- Capacity Scheduler: resources are shared across queues, the administrator configure hierarchically queues (percentage of total resources in the cluster) to control access to resources. The queues has ACL to control user access and it's also possible to configure soft and hard limits per user within a queue. The schedule support resource-based scheduling and job priority.
YARN architecture |
Hadoop Job Management
Dependent on the MapReduce version there is different ways to manage Hadoop jobs:- MRv1: through web UIs (JobTracker, TaskTraker), MapR metrics database, hadoop job CLI.
- MRv2 (YARN): through web UIs (Resource Manager, Node Manager, History Server), MapR metrics database (for future releases), mapred CLI.
The DistributedShell example
$ yarn jar /opt/mapr/hadoop/hadoop-2.4.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.4.1-mapr-1408.jar -shell_command /bin/ls -shell_args /user/user01 -jar /opt/mapr/hadoop/hadoop-2.4.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.4.1-mapr-1408.jar
Check the application logs
$ cd /opt/mapr/hadoop/hadoop-2.4.1/logs/userlogs/application_1430664875648_0001/
$ cat container_1430664875648_0001_01_000002/stdout # stdout file
$ cat container_1430664875648_0001_01_000002/stderr # stderr file
The logs can also be accessed from the History Server Web UI at http://node-ip:8088/
to be continued