Pivotal Knowledge Base

Follow

Sample Map Reduce job to pull and process data from Pivotal HDB table

Question

How to design a MapReduce job to read data from HDB table and process it

Case Study which we will go through for learning purpose:

HDB table "employee_expenditure" holds information about an employee's expenses in every month. We have to develop a Map Reduce job which reads data from the table and provides us total expenditure done by each employee respectively.

Prerequisite:

  • An IDE environment like Eclipse, it will just make your life easy because not everyone is an expert in JAVA 

Solution Steps:

Step 1: Create a HDB table "employee_expenditure" and populate some data.

 CREATE TABLE employee_expenditure (employee_id int, expenditure_amt int, expenditure_month text) DISTRIBUTED RANDOMLY;
insert into employee_expenditure values (1, 20000, 'Jan');
insert into employee_expenditure values (2, 10000, 'Jan');
insert into employee_expenditure values (1, 15000, 'Feb');
insert into employee_expenditure values (2, 10000, 'Feb');
insert into employee_expenditure values (3, 500, 'Mar');
insert into employee_expenditure values (1, 20000, 'Mar');

select * from employee_expenditure;
employee_id | expenditure_amt | expenditure_month -------------+-----------------+------------------- 1 |           20000 | Jan 2 |           10000 | Jan 1 |           15000 | Feb 2 |           10000 | Feb 3 |             500 | Mar 1 |           20000 | Mar

Step 2: Create a Java Project, let's say "HAWQMapReduceProgramming". On Eclipse, go to : File -> New -> Java Project

Step 3: Create a folder named "lib", this folder will hold all the required hadoop jar's for the Map Reduce Program. On Eclipse, go to: File -> New -> Folder

Step 4: Import all the required libraries to the "lib" folder from your local filesystem. We would need the below jars.

  • HDB Mapreduce Input / Output related jars. By default they are located under /usr/local/hawq/lib/postgresql/hawq-mr-io on the HDB hosts. Just drag and drop the required libraries to the "lib" folder.
  • Hadoop Map Reduce jars. By default they are located under /usr/lib/gphd/hadoop-mapreduce/ on your PHD installation.
  • Hadooop Common jars. By default they are located under the below directories on your PHD installation., take all the jars.
    • /usr/lib/gphd/hadoop/
    • /usr/lib/gphd/hadoop/lib

Step 5: Create a package, let's say com.HAWQMapReduce.apps under which we will create the required class. It's just a good practice to put all the required classes into a package. On Eclipse, go to: File -> New -> Package

Step 6: Create a Java Class, let's say AggregatedExpenditure under the package com.HAWQMapReduce.apps. 

Step 7: Write Map Reduce code to implement aggregation of the expenditure made by each employee respectively.

//Below is the package name
package com.HAWQMapReduce.apps;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//The below are the classes added for HAWQ specific implementation
import com.pivotal.hawq.mapreduce.HAWQException;
import com.pivotal.hawq.mapreduce.HAWQInputFormat;
import com.pivotal.hawq.mapreduce.HAWQRecord;

//From below the class definition starts
public class AggregateExpenditure extends Configured implements Tool {
	
/*Name of the Map class is HAWQMapper which extends Mapper. As part of the mapper, we will just read the employee_id and expenditure_amt column from HAWQ table employee_expenditure and pass them to the Reducer class for aggregation */
	
	public static class HAWQMapper extends 
	Mapper<Object,HAWQRecord,IntWritable,IntWritable> {
		protected void setup(Mapper<Object,HAWQRecord, IntWritable,IntWritable>.Context context) 
				throws IOException, InterruptedException {
		}
		public void map (Object Key, HAWQRecord value, Context context) 
				throws IOException, InterruptedException {
			int employeeidToGroup=0;
			int expenditureToAggregate=0;
		//Here we are reading data from the HAWQ table, where indices like 1,2 refer to column 1 and 2 in the HAWQ table respectively
		try {
			employeeidToGroup=value.getInt(1);
			expenditureToAggregate=value.getInt(2);
		} 
		catch (HAWQException hawqE){
			throw new IOException(hawqE.getMessage());
		}
		context.write(new IntWritable(employeeidToGroup), new IntWritable(expenditureToAggregate));
		}
	}
	
	//Below is the reducer class, which receives key and value from mapper above, it iterates over the values for a same key and keeps adding them.
	public static class HAWQReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
		private IntWritable result = new IntWritable();
		public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum=0; 
			for (IntWritable val: values) {
				sum +=val.get();
			}
			result.set(sum);
			context.write(key,result);
		}
	}
 
	//Here it the main class from where the code execution starts.
	public static void main (String[] args) throws Exception {
		int res=ToolRunner.run(new Configuration(), new AggregateExpenditure(),args);
		System.exit(res);
	}	
	public int run(String[] args) throws Exception {
		if(args.length < 3) {
			System.out.println("The argument's for the jar are: <hostname:port/database> <table_name> <output_path> [username][password]");
			return 2;
		}
	//The below properties defines the configuration of the Map Reduce program, it the controlling body for the Map Reduce program.
		Job job=Job.getInstance(getConf());
		job.setJobName("hawq-mapreduce");
		job.setJarByClass(AggregateExpenditure.class);
		job.setMapperClass(HAWQMapper.class);
		job.setReducerClass(HAWQReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		String database_url=args[0];
		String table_name=args[1];
		String output_path=args[2];
		String username=null;
		if(args.length > 3) {
			username=args[3];
		}
		String password=null;
		if(args.length > 4) {
			password=args[4];
		}
		job.setInputFormatClass(HAWQInputFormat.class);
		HAWQInputFormat.setInput(job.getConfiguration(), database_url,username, password, table_name);
		FileOutputFormat.setOutputPath(job, new Path(output_path));
		int res = job.waitForCompletion(true) ? 0 : 1;
		return res;
	}
}

Step 8: Export the Map Reduce program created above to a jar file, let's say "HAWQAggregateExpenditure.jar". On Eclipse, go to, File -> Export, and chose Java -> JAR file, as below

Step 9: Set the value for LIBJARS, HADOOP_CLASSPATH and then execute the jar.

Export the required variables:

echo export LIBJARS=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar

export HADOOP_CLASSPATH=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar

Execute the jar:

hadoop jar /binary/HAWQAggregateExpenditure.jar com.HAWQMapReduce.apps.AggregateExpenditure -libjars $LIBJARS localhost:5432/gpadmin employee_expenditure /tmp/expenditure

where:

Name of the jar file created :-------------> /binary/HAWQAggregateExpenditure.jar

Name of the class file :-------------------> com.HAWQMapReduce.apps.AggregateExpenditure"

Flag to support additional jar files: -----> -libjars

Database Login URL :-----------------------> localhost:5432/gpadmin

Table Name :-------------------------------> employee_expenditure

HDFS Output dir:---------------------------> /tmp/expenditure

Step 10: Verify the data in /tmp/expenditure directory

 [root@phd11-client demo]# hdfs dfs -cat /tmp/expenditure/*

155000

220000

3500

Step 11 (Optional) : You could also create a table using PXF to view the output using SQL.

CREATE EXTERNAL TABLE aggregate_expenditure (employee_id int, expenditure_aggregate int) LOCATION ('pxf://phd11-nn.saturn.local:50070/tmp/expenditure/?Fragmenter=HdfsDataFragmenter&Accessor=LineReaderAccessor&Resolver=StringPassResolver') FORMAT 'TEXT' (DELIMITER E'\t');

[gpadmin@phd11-client ~]$ psql -c "SELECT * FROM aggregate_expenditure;"

 employee_id | expenditure_aggregate

-------------+-----------------------

           1 |                 55000

           2 |                 20000

           3 |                   500

 

 

Comments

  • Avatar
    srinivasa chary

    After running this example getting exception:

    14/03/28 13:26:11 INFO mapreduce.Job: Running job: job_1395767449482_0046
    14/03/28 13:26:19 INFO mapreduce.Job: Job job_1395767449482_0046 running in uber mode : false
    14/03/28 13:26:19 INFO mapreduce.Job: map 0% reduce 0%
    14/03/28 13:26:19 INFO mapreduce.Job: Job job_1395767449482_0046 failed with state FAILED due to: Application application_1395767449482_0046 failed 1 times due to AM Container for appattempt_1395767449482_0046_000001 exited with exitCode: 1 due to: Exception from container-launch:
    org.apache.hadoop.util.Shell$ExitCodeException:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:202)
    at org.apache.hadoop.util.Shell.run(Shell.java:129)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:322)
    at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:230)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:242)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:68)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Powered by Zendesk