Pivotal Knowledge Base

フォローする

Oozieワークフローを利用したMap Reduceの実行方法

環境

製品 バージョン
PHD 1.1.1
Oozie 3.3.2

目的

本記事は、Oozieを利用してhadoopディストリビューションで提供されるMap Reduceプログラムや jarファイルを実行する方法を記述している。本記事は、アプリケーションやワークフローの開始点並びに、基本設定として活用することができる。

前提条件

  • Oozieを動作させる基本的な知識
  • PHDクラスタ

使用する例

  • hadoop-mapreduce-examples.jarによるWordCountプログラム
  • hadoop-mapreduce-examples.jarは、Pivotal HD内のjar ファイルのシンボリックリンクとして設定されている

以下手順を示す。

1) hadoop-mapreduce-examples.jarの解凍。Pivotal HD クラスタにおいては、jarファイルは /usr/lib/gphd/hadoop-mapreduceディレクトリに保存されている。

[hadoop@hdm1 test]$ jar xf hadoop-mapreduce-examples.jar
[hadoop@hdm1 test]$ ls
hadoop-mapreduce-examples.jar META-INF org

2) WordCountプログラムに関連付けられたクラスファイルが保存されているディレクトリに移動する。

[hadoop@hdm1 test]$ cd org/apache/hadoop/examples/
[hadoop@hdm1 examples]$ ls WordCount*
WordCount.class WordCount$IntSumReducer.class WordCount$TokenizerMapper.class

WordCountプログラムにおいて、mapper クラス名はWordCount$TokenizerMapper.class、さらにreducer クラス名はWordCount$IntSumReducer.classと定義されている。 以降の手順でOozieのworkflow.xmlを定義する際に、これらのクラス名を使用する。

3) job.propertiesファイルを作成。Oozieジョブのパラメータは、javaのプロパティファイル(.properties)もしくは、Hadoopの設定xmlファイル(.xml)により付与される 。今回は、.propertiesファイルを使用する。

nameNode=hdfs://phdha
jobTracker=hdm1.phd.local:8032
queueName=default
examplesRoot=examplesoozie oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/map-reduce
outputDir=map-reduce

各設定項目の意味:
nameNode = HDFSにアクセスするためのNameNodeパスの変数を定義。フォーマットは、次の形式となる。
hdfs://<nameservice> または hdfs://<namenode_host>:<port>
jobTracker = Yarnを実装しているケースでは、Resource Managerのアドレスをこの変数で定義する 。フォーマットは次の形式となる:<resourcemanager_hostname>:<port>
queueName = キャパシティースケジューラやフェアスケジューラ等により定義されたキューの名前。デフォルトでは「default」と設定。
examplesRoot = ワークフローの環境変数
oozie.wf.application.path = 実行されるworkflow.xmlを保存するHDFSへのパスを定義する環境変数
outputDir = 出力先を定義する変数

注意: oozie.libpathにより パラメータを定義することも可能。oozie.libpath の定義によりMapReduceのプログラムに必要なすべてのライブラリを保存することができる。ただし、ここでは使用しない。

設定例:

oozie.libpath=${nameNode}/$(user.name)/share/lib

4) workflow.xmlを作成。workflow.xmlは、実行される一連の動作の順序や 制御依存関係(Control Dependency Direct Acyclic Graph)を定義する。

あるアクションから別のアクションへの制御依存関係とは、最初のアクションが完了するまでは次のアクションは実行されないということである。

詳細に関しては以下のドキュメントを参照すること。

http://oozie.apache.org/docs/3.3.2/WorkflowFunctionalSpec.html.

<workflow-app xmlns="uri:oozie:workflow:0.1" name="map-reduce-wf">
<start to="mr-node"/>
<action name="mr-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
</prepare>

<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
</property>
<property>
<name>mapreduce.combine.class</name>
<value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

5) Oozieジョブに関連するすべてのファイルを保存するディレクトリをHDFSに作成する。前述の手順で作成されたworkflow.xmlをこのディレクトリに保存する

[hadoop@hdm1 map-reduce]$ hdfs dfs -mkdir -p /user/hadoop/examplesoozie/map-reduce
[hadoop@hdm1 map-reduce]$ hdfs dfs -copyFromLocal workflow.xml /user/hadoop/examplesoozie/map-reduce/workflow.xml

6) Oozieジョブ用に作成したディレクトリ配下に、libという名前のディレクトリを作成する。 このディレクトリは、必要なライブラリやjarファイルの保存に使用する。 

[hadoop@hdm1 map-reduce]$ hdfs dfs -mkdir -p /user/hadoop/examplesoozie/map-reduce/lib

7) ディレクトリの作成後、hadoop-mapreduce-examples.jarを コピーする。  

[hadoop@hdm1 map-reduce]$ hdfs dfs -copyFromLocal /usr/lib/gphd/hadoop-mapreduce/hadoop-mapreduce-examples.jar /user/hadoop/examplesoozie/map-reduce/lib/hadoop-mapreduce-examples.jar

8) 作成したワークフローを利用し、wordcountのサンプルジョブを実行することができる。

[hadoop@hdm1 ~]$ oozie job -oozie http://localhost:11000/oozie -config examplesoozie/map-reduce/job.properties -run

9) 以下のようにジョブステータスを確認することが出来る。

[hadoop@hdm1 ~]$ oozie job -oozie http://localhost:11000/oozie -info 0000009-140529162032574-oozie-oozi-W
Job ID : 0000009-140529162032574-oozie-oozi-W
------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : map-reduce-wf
App Path      : hdfs://phdha/user/hadoop/examplesoozie/map-reduce
Status        : SUCCEEDED
Run           : 0
User          : hadoop
Group         : -
Created       : 2014-05-30 00:31 GMT
Started       : 2014-05-30 00:31 GMT
Last Modified : 2014-05-30 00:32 GMT
Ended         : 2014-05-30 00:32 GMT
CoordAction ID: -

Actions
------------------------------------------------------------------------------------------------------------------------------------
ID                                                                            Status    Ext ID                 Ext Status Err Code
------------------------------------------------------------------------------------------------------------------------------------
0000009-140529162032574-oozie-oozi-W@:start:                                  OK        -                      OK         -
------------------------------------------------------------------------------------------------------------------------------------
0000009-140529162032574-oozie-oozi-W@mr-node                                  OK        job_1401405229971_0022 SUCCEEDED  -
------------------------------------------------------------------------------------------------------------------------------------
0000009-140529162032574-oozie-oozi-W@end                                      OK        -                      OK         -
------------------------------------------------------------------------------------------------------------------------------------

10) ジョブが完了したら、workflow.xmlで定義したディレクトリで出力結果を確認することが出来る。

[hadoop@hdm1 ~]$ hdfs dfs -cat /user/hadoop/examplesoozie/output-data/map-reduce/part-r-00000
SSH:/var/empty/sshd:/sbin/nologin	1
Server:/var/lib/pgsql:/bin/bash	1
User:/var/ftp:/sbin/nologin	1
Yarn:/home/yarn:/sbin/nologin	1
adm:x:3:4:adm:/var/adm:/sbin/nologin	1
bin:x:1:1:bin:/bin:/sbin/nologin	1
console	1
daemon:x:2:2:daemon:/sbin:/sbin/nologin	1
ftp:x:14:50:FTP	1
games:x:12:100:games:/usr/games:/sbin/nologin	1
gopher:x:13:30:gopher:/var/gopher:/sbin/nologin	1
gpadmin:x:500:500::/home/gpadmin:/bin/bash	1
hadoop:x:503:501:Hadoop:/home/hadoop:/bin/bash	1
halt:x:7:0:halt:/sbin:/sbin/halt	1

補足

1. workflow.xmlが正しく指定されていない場合、以下のような例外処理が発生する 。例えば、workflow.xml 内の、mapreduce.map.class のスペルがmapreduce.mapper.class と間違っている場合、あるいはmapreduce.reduce.classがmapreduce.reducer.classと間違っている場合等が考えられる。

2014-05-29 16:29:26,870 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause:java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
2014-05-29 16:29:26,874 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable

2. workflow.xmlのサンプルは以下のドキュメントを参照のこと。
https://github.com/yahoo/oozie/wiki/Oozie-WF-use-cases

3. 参考:wordcount.javaプログラム

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

コメント

Powered by Zendesk