Java大数据开发(三)Hadoop(23)-自定义InputFormat案例

2021年11月21日 阅读数:13
这篇文章主要向大家介绍Java大数据开发(三)Hadoop(23)-自定义InputFormat案例,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

导读:在实际开发中,hadoop框架自带的inputformat类型不能知足全部应用场景,须要自定义inputformat来解决实际问题。java


自定义inputformat案例实操


不管HDFS仍是MapReduce,在处理小文件时效率都很是低,但又不免面临处理大量小文件的场景,此时,就须要有相应解决方案。能够自定义InputFormat实现小文件的合并。swift


1. 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

2. 实现步骤

(1) 自定义一个类继承FileInputFormat
① 重写isSplitable()方法,返回false不可切割
② 重写createRecordReader(),建立自定义的RecordReader对象,并初始化
(2) 重写RecordReader,实现一次读取一个完整的文件封装为KV
采用IO流一次读取一个文件输出到value中,由于设置了不可切片,最终把全部文件都封装到了value中
获取文件路径信息+名称,并设置key
(3) 在输出的时候,使用SequenceFileOutPutFormat输出合并文件
(4) 设置Driver

自定义inputformat
   
   
   
    
    
public class WholeFileInputformat extends FileInputFormat<TextBytesWritable>{ @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader;   } }

自定义RecordReader类
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{  FileSplit split;  Configuration configuration;  Text k = new Text();  BytesWritable v = new BytesWritable();  boolean isProgress = true;    @Override  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {    // 初始化    this.split = (FileSplit) split;    configuration = context.getConfiguration();  }  @Override  public boolean nextKeyValue() throws IOException, InterruptedException {    // 核心业务逻辑处理    if (isProgress) {      byte[] buf = new byte[(int) split.getLength()];            // 1 获取fs对象      Path path = split.getPath();      FileSystem fs = path.getFileSystem(configuration);            // 2 获取输入流      FSDataInputStream fis = fs.open(path);            // 3 拷贝      IOUtils.readFully(fis, buf, 0, buf.length);            // 4 封装v      v.set(buf, 0, buf.length);            // 5 封装k      k.set(path.toString());            // 6 关闭资源      IOUtils.closeStream(fis);      isProgress = false;      return true;    }    return false;  }  @Override  public Text getCurrentKey() throws IOException, InterruptedException {    return k;  }  @Override  public BytesWritable getCurrentValue() throws IOException, InterruptedException {    return v;  }  @Override  public float getProgress() throws IOException, InterruptedException {    // TODO Auto-generated method stub    return 0;  }  @Override  public void close() throws IOException {    // TODO Auto-generated method stub    }}

SequenceFileMapper
   
   
   
    
    
public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
@Override protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { context.write(key, value); } }

SequenceFileReducer
   
   
   
    
    
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{
@Override protected void reduce(Text key, Iterable<BytesWritable> values, Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { // 循环写出 for (BytesWritable value : values) { context.write(key, value); } } }

SequenceFileDriver
   
   
   
    
    
public class SequenceFileDriver {
public static void main(String[] args) throws Exception, IOException {
// 输入输出路径须要根据本身电脑上实际的输入输出路径设置 args = new String[] { "e:/input/inputinputformat", "e:/output4" };
// 1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
// 2 设置jar包存储位置、关联自定义的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class);
// 7设置输入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 8设置输出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 3 设置map输出端的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class);
// 4 设置最终输出端的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class);
// 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

关注「跟我一块儿学大数据」
跟我一块儿学大数据

本文分享自微信公众号 - 跟我一块儿学大数据(java_big_data)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。微信