`
ghost_face
  • 浏览: 52793 次
社区版块
存档分类
最新评论

MR实现将同一个key的内容分配到同一个输出文件

阅读更多

MapReduce程序默认的输出文件个数:

首先,根据setNumReduceTasks(int num)这个方法,

其次,根据Map的输出文件个数。

一般情况下,同一个key的数据,可能会被分散到不同的输出文件中。倘若我们要对某一个特定的key的所有value值进行遍历,则需要将包含该key的所有文件作为输入文件。当数据比较庞大时,这样的操作会浪费资源。如果同一个Key的所有的value值都会被分配到同一个文件中,就会比较理想。

在Hadoop-core包中,有个类MultiplyOutputs可以实现以上功能(其实就是在reduce中加一两句话,其他不变)。代码如下:

 

package io;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MultipleOut extends Configured implements Tool {

    static class Map extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            int index = line.indexOf(" ");
            if (index != -1) {
                context.write(new Text(line.substring(0, index)),
                        new Text(value.toString().substring(index + 1)));
            }
        }
    }


// 只需要在reduce中添加几句代码,其他部分不需要改动
    static class Reduce extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            MultipleOutputs mo = new MultipleOutputs(context);
            for (Text val : values) {
             //key.toString():表示输出文件名以Key命名,注意是相对路径
                mo.write(key, val, key.toString() + "/");
            }
            //一定要close,否则无数据
            mo.close();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        String path = "你的输入路径";
        if (strings.length != 1) {
            System.out.println("input:" + path);
            System.out.print("arg:<out>");
            return 1;
        }
        Configuration conf = getConf();
        Job job = new Job(conf, "MultipleOut");
        job.setJarByClass(MultipleOut.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(strings[0]));

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int rst = ToolRunner.run(conf, new MultipleOut(), args);
        System.exit(rst);
    }
}

 结果如图片所示,目录1,2,3,4,5是五个Key。

 



 

  • 大小: 12.4 KB
0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics