MapReduce,组合式,迭代式,链式
前面介绍一些怎样用户类制定自己的类,来达到减少中间数据:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499498.html
1.迭代式mapreduce
一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。有兴趣的可以参考一下mahout的源码。
在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。如代码所以:
Configuration conf1 = new Configuration(); Job job1 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job1,InputPaht1); FileOutputFromat.setOoutputPath(job1,Outpath1); job1.waitForCompletion(true); //sub Mapreduce Configuration conf2 = new Configuration(); Job job2 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job2,Outpath1); FileOutputFromat.setOoutputPath(job2,Outpath2); job2.waitForCompletion(true); //sub Mapreduce Configuration conf3 = new Configuration(); Job job3 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job3,Outpath2); FileOutputFromat.setOoutputPath(job3,Outpath3); job3.waitForCompletion(true); .....
下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。
但个人感觉现在的mapreduce迭代设计不太满意的地方。
1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。
2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。
好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。
期待着下个版本hadoop更好的支持迭代算法。
//main function while (!converged && iteration <= maxIterations) { log.info("K-Means Iteration {}", iteration); // point the output to a new directory per iteration Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration); converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta); // now point the input to the old output directory clustersIn = clustersOut; iteration++; } private static boolean runIteration(Configuration conf, Path input, Path clustersIn, Path clustersOut, String measureClass, String convergenceDelta) throws IOException, InterruptedException, ClassNotFoundException { conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString()); conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass); conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta); Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ClusterObservations.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Cluster.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(KMeansMapper.class); job.setCombinerClass(KMeansCombiner.class); job.setReducerClass(KMeansReducer.class); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, clustersOut); job.setJarByClass(KMeansDriver.class); HadoopUtil.delete(conf, clustersOut); if (!job.waitForCompletion(true)) { throw new InterruptedException("K-Means Iteration failed processing " + clustersIn); } FileSystem fs = FileSystem.get(clustersOut.toUri(), conf); return isConverged(clustersOut, conf, fs); }
2.依赖关系组合式MapReduce
我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,开启JobControl的线程即可运行程序。
要注意的地方就是hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。
下面给出伪代码:
Configuration job1conf = new Configuration(); Job job1 = new Job(job1conf,"Job1"); .........//job1 其他设置 Configuration job2conf = new Configuration(); Job job2 = new Job(job2conf,"Job2"); .........//job2 其他设置 Configuration job3conf = new Configuration(); Job job3 = new Job(job3conf,"Job3"); .........//job3 其他设置 job3.addDepending(job1);//设置job3和job1的依赖关系 job3.addDepending(job2); JobControl JC = new JobControl("123"); JC.addJob(job1);//把三个job加入到jobcontorl中 JC.addJob(job2); JC.addJob(job3); Thread jcThread = new Thread(JC); jcThread.start();
3.链式MapReduce
首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。
一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:
ChainMapper.addMapper(...); ChainReducer.addMapper(...); //addMapper()调用的方法形式如下: public static void addMapper(JOb job, Class<? extends Mapper> mclass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration conf ){ }
其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。
note:这些Mapper和Reducer之间传递的键和值都必须保持一致。
下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法
public void function throws IOException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ChianJOb"); // 在ChainMapper里面添加Map1 Configuration map1conf = new Configuration(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); // 在ChainReduce中加入Reducer,Map2; Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); Configuration map2Conf = new Configuration(); ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); job.waitForCompletion(true); }
相关推荐
近几年,研究者扩展和改进原始MapReduce,已开发了若干迭代式MapReduce以更好地为大数据处理而支持迭代计算。对迭代式MapReduce编程框架进行综合评述,较详细地阐述了这些研究成果,给出了它们各自的基本思想,并...
MapReduce求行平均值--标准差--迭代器处理--MapReduce案例
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
通过实例让你真正明白mapreduce填空式、分布(分割)编程 通过实例让你真正明白mapreduce填空式、分布(分割)编程
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业...比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip
1. 启 动 全 分 布 模 式 Hadoop 集 群 , 守护进程 包 括 NameNode 、 DataNode 、 SecondaryNameNode、ResourceManager、NodeManager 和 JobHistoryServer。 2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 ...
与迭代 MR 框架比较的实验 我们为在迭代 MapReduce 框架上运行迭代算法而实现的示例代码 来自 KAIST 菲尔
Guagua目前主要支持的是同步的Master-Workers结构的迭代式计算框架,今后我们希望能够支持异步方式的迭代计算框架,2012年Google MapReduce之父Jeff Dean发表了一篇论文,上面提到了对神经网络深度模型的支持,文章...
MapReduce发明人关于MapReduce的介绍
【MapReduce篇07】MapReduce之数据清洗ETL1
图解MapReduce,系统介绍Hadoop MapReduce工作过程原理
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
Hadoop 用mapreduce实现Wordcount实例,绝对能用
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
MapReduce简单程序示例
MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型 上升到构架:统一构架,为程序员隐藏系统层细节 MPI等...
MapReduce的实现细节,对mapreduce的具体实现讲解
hadoop网站通过SVN下载下来的mapreduce代码。欢迎现在学习!
mapreduce example
mapreduce