MapReduce完成reduce端join,多数据源ITeye - 娱乐之横扫全球

MapReduce完成reduce端join,多数据源ITeye

2019年03月25日10时42分16秒 | 作者: 凌寒 | 标签: 数据源,假如,选用 | 浏览: 1154

在MR的时分经常会遇到多数据源join的问题,假如简略的剖析使命选用hive处理就好,假如杂乱一点需求自己写MR。

 

多数据源选用MultipleInputs类的addInputPath办法增加。

 

Job类

public class EfcOrderProRangeOdJob extends Configured implements Tool {
 //TODO 途径
 private final static String INTPUT_A = "D:/order/order/";
 private final static String INTPUT_B = "D:/order/address/";
 private final static String OUTPUT = "D:/testAAAAA/";
// private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/";
 private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day";
 public static void main(String[] args) {
 try {
 int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args);
 System.exit(res);
 } catch (Exception e) {
 e.printStackTrace();
 @Override
 public int run(String[] args) throws Exception {
 try {
 String start = "20130217"; //TODO
 Configuration conf = ConfUtil.getConf(getConf());
 conf.set("start", start);
 Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first");
 Path pathOrder = new Path(INTPUT_A);
 Path pathAddress = new Path(INTPUT_B);
 Path output = new Path(OUTPUT + start + "/");
 FileSystem fs = FileSystem.get(conf);
 if(fs.exists(output)){
 fs.delete(output,true);
 job1.setMapOutputKeyClass(TextPair.class);
 job1.setMapOutputValueClass(Text.class);
 FileOutputFormat.setOutputPath(job1, output);
 MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class);
 MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class);
 job1.setReducerClass(EfcOrderProRangeReducer.class);
 job1.setJarByClass(EfcOrderProRangeOdJob.class);
 Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second");
 FileInputFormat.setInputPaths(job2, output);
 job2.setMapperClass(EfcOrderProRangeSecondMapper.class);
 job2.setMapOutputKeyClass(Text.class);
 job2.setMapOutputValueClass(IntWritable.class);
 TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2);
 return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range");
 } catch (Exception e) {
 e.printStackTrace();
 return 0;
 public static class TextPair implements WritableComparable TextPair {
 private Text first;
 private Text second;
 public TextPair() {
 set(new Text(), new Text());
 public TextPair(String first, String second) {
 set(new Text(first), new Text(second));
 public TextPair(Text first, Text second) {
 set(first, second);
 public void set(Text first, Text second) {
 this.first = first;
 this.second = second;
 public Text getFirst() {
 return first;
 public Text getSecond() {
 return second;
 public void write(DataOutput out) throws IOException {
 first.write(out);
 second.write(out);
 public void readFields(DataInput in) throws IOException {
 first.readFields(in);
 second.readFields(in);
 public int compareTo(TextPair tp) {
 return first.compareTo(tp.first);
}

 

mapper1类

public class EfcOrderProRangeOrderMapper extends Mapper LongWritable, Text, TextPair, Text {
 private static final int ORDER_ID_INDEX = 2;
 private static final int ORDER_STATUS_INDEX = 5;
 private static final String EFFECTIVE_STATUS = "3";
 private static final String COL_SPLITER = "\001";
 @Override
 public void map(LongWritable key, Text value, Context context) {
 try {
 String [] order = value.toString().split(COL_SPLITER);
 String orderId = order[ORDER_ID_INDEX];
 String status = order[ORDER_STATUS_INDEX];
 if(!EFFECTIVE_STATUS.equals(status)){
 return;
 TextPair textPair = new TextPair(new Text(orderId),new Text("order"));
 context.write(textPair, new Text(status));
 } catch (Exception e) {
 e.printStackTrace();
}

 

mapper2类

public class EfcOrderProRangeAddressMapper extends Mapper LongWritable, Text, TextPair, Text {
 //TODO 经过hivemeta去取index
 private static final int ORDER_ID_INDEX = 0;
 private static final int PROVINCE_ID_INDEX = 1;
 private static final String COL_SPLITER = "\001";
 @Override
 public void map(LongWritable key, Text value, Context context) {
 try {
 String [] address = value.toString().split(COL_SPLITER);
 String orderId = address[ORDER_ID_INDEX];
 String province = address[PROVINCE_ID_INDEX];
 TextPair textPair = new TextPair(new Text(orderId),new Text("address"));
 context.write(textPair, new Text(province));
 } catch (Exception e) {
 e.printStackTrace();
}

 

reducer端做join操作,经过TextPair中的second来获取来历,获得需求获得的维度。

public class EfcOrderProRangeReducer extends Reducer TextPair,Text,Text,Text {
 private static final String COL_SPLITER = "\001";
 @Override
 protected void reduce(TextPair key, Iterable Text values, Context context) {
 try {
 Text tag = key.getSecond();
 Text orderId = key.getFirst();
 String status = null;String province = null;
 StringBuilder out = new StringBuilder();
 for (Text value : values) {
 if(tag.toString().equals("order")){
 status = value.toString();
 if(tag.toString().equals("address")){
 province = value.toString();
 if (province != null status != null){
 out.append(orderId.toString()).append(COL_SPLITER).append(status).append(COL_SPLITER).append(province);
 context.write(null, new Text(out.toString()));
 } catch (Exception e) {
 e.printStackTrace();
}

 

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表娱乐之横扫全球立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章