Java大数据开发(三)Hadoop(17)-MapReduce4

2021年11月21日 阅读数:6
这篇文章主要向大家介绍Java大数据开发(三)Hadoop(17)-MapReduce4,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。
导读:上一节咱们介绍了hadoop的序列化,本节咱们将自定义序列化bean,实现案例操做。

序列化案例实操


需求css


统计每个手机号耗费的总上行流量、下行流量、总流量java


(1)输入数据apache



1  13736230513  192.196.100.1  www.jd.com  2481  24681  2002  13846544121  192.196.100.2      264  0  2003  13956435636  192.196.100.3      132  1512  2004  13966251146  192.168.100.1      240  0  4045  18271575951  192.168.100.2  www.jd.com  1527  2106  2006  11184188413  192.168.100.3  www.jd.com  4116  1432  2007  13590439668  192.168.100.4      1116  954  2008  15910133277  192.168.100.5  www.hao123.com  3156  2936  2009  13729199489  192.168.100.6      240  0  20010 13630577991  192.168.100.7  www.shouhu.com  6960  690  20011 15043685818  192.168.100.8  www.baidu.com  3659  3538  20012 15959002129  192.168.100.9  www.jd.com  1938  180  50013 13560439638  192.168.100.10      918  4938  20014 13470253144  192.168.100.11      180  180  20015 13682846555  192.168.100.12  www.qq.com  1938  2910  20016 13992314666  192.168.100.13  www.gaga.com  3008  3720  20017 13509468723  192.168.100.14  www.qinghua.com  7335  110349  40418 18390173782  192.168.100.15  www.sogou.com  9531  2412  20019 13975057813  192.168.100.16  www.baidu.com  11058  48243  20020 13768778790  192.168.100.17      120  120  20021 13568436656  192.168.100.18  www.alibaba.com  2481  24681  20022 13568436656  192.168.100.19      1116  954  200

(2)输入数据格式:微信


7   13560436666   120.196.100.99    1116         954            200网络

id   手机号码网络    ip                        上行流量   下行流量     网络状态码app


(3)指望输出数据格式ide


13560436666        1116              954               2070函数

手机号码                上行流量        下行流量        总流量oop


需求分析大数据



编写MapReduce程序


(1)编写流量统计的Bean对象


// 1 实现writable接口public class FlowBean implements Writable{
private long upFlow; private long downFlow; private long sumFlow;
//2 反序列化时,须要反射调用空参构造函数,因此必须有 public FlowBean() { super(); }
public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; }
//3 写序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
//4 反序列化方法 //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); }
// 6 编写toString方法,方便后续打印到文本 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }}

(2)编写mapper类

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
FlowBean v = new FlowBean(); Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行 String line = value.toString();
// 2 切割字段 String[] fields = line.split("\t");
// 3 封装对象 // 取出手机号码 String phoneNum = fields[1];
// 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum); v.set(downFlow, upFlow);
// 4 写出 context.write(k, v); }}

(3)编写Reducer类

import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {
long sum_upFlow = 0; long sum_downFlow = 0;
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); }
// 2 封装对象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); // 3 写出 context.write(key, resultBean); }}


(4)编写Driver类


public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径须要根据本身电脑上实际的输入输出路径设置 args = new String[] { "d:/input/inputflow", "d:/output1" };
// 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowsumDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}


(5)运行查看结果



11184188413  4116  1432  554813470253144  180  180  36013509468723  7335  110349  11768413560439638  918  4938  585613568436656  3597  25635  2923213590439668  1116  954  207013630577991  6960  690  765013682846555  1938  2910  484813729199489  240  0  24013736230513  2481  24681  2716213768778790  120  120  24013846544121  264  0  26413956435636  132  1512  164413966251146  240  0  24013975057813  11058  48243  5930113992314666  3008  3720  672815043685818  3659  3538  719715910133277  3156  2936  609215959002129  1938  180  211818271575951  1527  2106  363318390173782  9531  2412  11943


这个案例是很是重要的,要重点掌握,在实际开发中,会常用序列化的bean进行传输。


关注「跟我一块儿学大数据」

跟我一块儿学大数据

本文分享自微信公众号 - 跟我一块儿学大数据(java_big_data)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。