Pivotal Knowledge Base

フォローする

Pivotal HDBテーブルのデータを読み取り実行するMap Reduceジョブのサンプル

問題提起

HDBテーブルのデータを読み取り、そして処理するためにどのようにMapReduceジョブを設計すれば良いのか。

以下の事例を通して学習する:

HDBテーブル”employee_expenditure” は従業員の月次経費情報を保管している。テーブルからデータを読み取り、各従業員の経費合計額を計算するMapReduceジョブを開発しなければならないものとする。

前提:

  • EclipseのようなIDE環境を用意 – すべての人はJAVAの専門家というわけではないので、単に作業を容易にするため

ソリューション手順:

手順 1: HDBテーブル"employee_expenditure"を作成し、ある程度のデータを入れる

CREATE TABLE employee_expenditure (employee_id int, expenditure_amt int, expenditure_month text) DISTRIBUTED RANDOMLY;
insert into employee_expenditure values (1, 20000, 'Jan');
insert into employee_expenditure values (2, 10000, 'Jan');
insert into employee_expenditure values (1, 15000, 'Feb');
insert into employee_expenditure values (2, 10000, 'Feb');
insert into employee_expenditure values (3, 500, 'Mar');
insert into employee_expenditure values (1, 20000, 'Mar');

select * from employee_expenditure;
employee_id | expenditure_amt | expenditure_month -------------+-----------------+------------------- 1 |           20000 | Jan 2 |           10000 | Jan 1 |           15000 | Feb 2 |           10000 | Feb 3 |             500 | Mar 1 |           20000 | Mar

手順 2: Javaプロジェクトを作成する。ここでは、プロジェクト名を”HAWQMapReduceProgramming”とする。Eclipseでは、 File -> New -> Java Project の順で作成

手順 3: “lib” という名前のフォルダーを作成し、このフォルダーに Map Reduce プログラムに必要とされるすべてのhadoop jarファイルを保存する。Eclipseでは、File -> New -> Folder の順で作成

手順 4: ローカルファイルシステムから、必要とされるすべてのライブラリを“lib”フォルダーにインポートする。以下の jar ファイルが必要である。

  • HDB Mapreduce Input / Output に関する jar ファイル。デフォルトではこれらのファイルはHDBホストの /usr/local/hawq/lib/postgresql/hawq-mr-io 配下にある。必要とされるライブラリを、単に “lib” フォルダーへドラッグ・アンド・ドロップすれば良い。
  • Hadoop Map Reduce jar ファイル。デフォルトでは、これらのファイルは PHDインストール環境の /usr/lib/gphd/hadoop-mapreduce/ 配下にある。
  • Hadoop Common jar ファイル。デフォルトで、PHDインストール環境の下記のディレクトリにある全ての jar をインポート。.
    • /usr/lib/gphd/hadoop/
    • /usr/lib/gphd/hadoop/lib

手順 5: パッケージを作成する。ここでは、パッケージ名をcom.HAWQMapReduce.appsとし、配下に必要となるクラスを作成する。単に必要となるすべてのクラスを1つのパッケージに入れると良いであろう。Eclipseでは、File -> New -> Packageの順で作成

手順 6: Javaクラスを作成する。ここでは、 AggregatedExpenditureとしcom.HAWQMapReduce.appsパッケージの配下に置く。

手順 7: Map Reduceコードを記述し、各従業員の経費合計額を計算するよう実装する。

//以下はパッケージ名
package com.HAWQMapReduce.apps;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//以下はHDB独自実装のために追加するクラス
import com.pivotal.hawq.mapreduce.HAWQException;
import com.pivotal.hawq.mapreduce.HAWQInputFormat;
import com.pivotal.hawq.mapreduce.HAWQRecord;

//ここから、クラス定義を開始
public class AggregateExpenditure extends Configured implements Tool {
	
/* Mapクラスの名前はHAWQMapperで、Mapperを継承する。Mapperの一部として、単にHDBテーブルemployee_expenditureからemployee_idとexpenditure_amtカラムを読み込み、
集約処理のためReducerにそれらを渡す */ public static class HAWQMapper extends Mapper<Object,HAWQRecord,IntWritable,IntWritable> { protected void setup(Mapper<Object,HAWQRecord, IntWritable,IntWritable>.Context context) throws IOException, InterruptedException { } public void map (Object Key, HAWQRecord value, Context context) throws IOException, InterruptedException { int employeeidToGroup=0; int expenditureToAggregate=0; //ここで、HDBテーブルからデータを読み込む。その際、以下の引数の1、2が、それぞれHDBテーブルのカラム1、カラム2を参照するよう指定する。 try { employeeidToGroup=value.getInt(1); expenditureToAggregate=value.getInt(2); } catch (HAWQException hawqE){ throw new IOException(hawqE.getMessage()); } context.write(new IntWritable(employeeidToGroup), new IntWritable(expenditureToAggregate)); } } //以下はreducerのクラスであり、上出のMappaerからキーとバリューを受け取る。ここでは、同じキーの全てのバリューに対して、当該値を加算していくような処理を繰り返す。 public static class HAWQReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable val: values) { sum +=val.get(); } result.set(sum); context.write(key,result); } }   //ここからコード実行の main クラス public static void main (String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new AggregateExpenditure(),args); System.exit(res); } public int run(String[] args) throws Exception { if(args.length < 3) { System.out.println("The argument's for the jar are: <hostname:port/database> <table_name> <output_path> [username][password]"); return 2; } //以下のプロパティはMap Reduceプログラムの設定を定義するものであり、Map Reduceプログラムをコントロールする実体である。 Job job=Job.getInstance(getConf()); job.setJobName("hawq-mapreduce"); job.setJarByClass(AggregateExpenditure.class); job.setMapperClass(HAWQMapper.class); job.setReducerClass(HAWQReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); String database_url=args[0]; String table_name=args[1]; String output_path=args[2]; String username=null; if(args.length > 3) { username=args[3]; } String password=null; if(args.length > 4) { password=args[4]; } job.setInputFormatClass(HAWQInputFormat.class); HAWQInputFormat.setInput(job.getConfiguration(), database_url,username, password, table_name); FileOutputFormat.setOutputPath(job, new Path(output_path)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } }

手順 8: 作成されたMap Reduce プログラムを1つの jar ファイルにエクスポートし、ここでは"HAWQAggregateExpenditure.jar"とする。Eclipseでは File -> Export, and chose Java -> JAR file の順で作成

手順 9: LIBJARS、HADOOP_CLASSPATH の値を設定し、jarファイルを実行する。

必須環境変数を export:

echo export LIBJARS=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar
export HADOOP_CLASSPATH=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar

Jarを実行:

hadoop jar /binary/HAWQAggregateExpenditure.jar com.HAWQMapReduce.apps.AggregateExpenditure -libjars $LIBJARS localhost:5432/gpadmin employee_expenditure /tmp/expenditure

ただし、各設定値の内容は以下の通り:

生成されたjarファイル名 :-------------> /binary/HAWQAggregateExpenditure.jar

クラスファイル名 :-------------------> com.HAWQMapReduce.apps.AggregateExpenditure"

追加jarファイルをサポートするためのフラグ : -----> -libjars

データベースへのログインURL :-----------------------> localhost:5432/gpadmin

テーブル名 :-------------------------------> employee_expenditure

HDFS出力ディレクトリ :---------------------------> /tmp/expenditure

手順 10: /tmp/expenditure ディレクトリ中のデータを確認する。

 [root@phd11-client demo]# hdfs dfs -cat /tmp/expenditure/*
155000
220000
3500

手順 11(任意) : PXFを使ってテーブルを作成し、SQLで結果を見ることもできる。

CREATE EXTERNAL TABLE aggregate_expenditure (employee_id int, expenditure_aggregate int) LOCATION ('pxf://phd11-nn.saturn.local:50070/tmp/expenditure/?Fragmenter=HdfsDataFragmenter&Accessor=LineReaderAccessor&Resolver=StringPassResolver') FORMAT 'TEXT' (DELIMITER E'\t');
[gpadmin@phd11-client ~]$ psql -c "SELECT * FROM aggregate_expenditure;"
 employee_id | expenditure_aggregate
-------------+-----------------------
           1 |                 55000
           2 |                 20000
           3 |                   500

 

 

コメント

Powered by Zendesk