MapReduce编程模型是什么

75次阅读
没有评论

共计 7903 个字符,预计需要花费 20 分钟才能阅读完成。

这篇文章主要讲解了“MapReduce 编程模型是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“MapReduce 编程模型是什么”吧!

MapReduce:大型集群上的简单数据处理

摘要

MapReduce 是一个设计模型,也是一个处理和产生海量数据的一个相关实现。用户指定一个用于处理一个键值(key-value)对生成一组 key/value 对形式的中间结果的 map 函数,以及一个将中间结果键相同的键值对合并到一起的 reduce 函数。许多现实世界的任务都能满足这个模型,如这篇文章所示。

使用这个功能形式实现的程序能够在大量的普通机器上并行执行。这个运行程序的系统关心下面的这些细节:输入数据的分区、一组机器上调度程序执行、处理机器失败问题,以及管理所需的机器内部的通信。这使没有任何并行处理和分布式系统经验的程序员能够利用这个大型分布式系统的资源。

我们的 MapReduce 实现运行在一个由普通机器组成的大规模集群上,具有很高的可扩展性:一个典型的 MapReduce 计算会在几千台机器上处理许多 TB 的数据。程序员们发现这个系统很容易使用:目前已经实现了几百个 MapReduce 程序,在 Google 的集群上,每天有超过一千个的 MapReduce 工作在运行。

一、  介绍

在过去的 5 年中,本文作者和许多 Google 的程序员已经实现了数百个特定用途的计算程序,处理了海量的原始数据,包括抓取到的文档、网页请求日志等,计算各种衍生出来的数据,如反向索引、网页文档的图形结构的各种表示、每个 host 下抓取到的页面数量的总计、一个给定日期内的最频繁查询的集合等。大多数这种计算概念明确。然而,输入数据通常都很大,并且计算必须分布到数百或数千台机器上以确保在一个合理的时间内完成。如何并行计算、分布数据、处理错误等问题使这个起初很简单的计算,由于增加了处理这些问题的很多代码而变得十分复杂。

为了解决这个复杂问题,我们设计了一个新的抽象模型,它允许我们将想要执行的计算简单的表示出来,而隐藏其中并行计算、容错、数据分布和负载均衡等很麻烦的细节。我们的抽象概念是受最早出现在 lisp 和其它结构性语言中的 map 和 reduce 启发的。我们认识到,大多数的计算包含对每个在输入数据中的逻辑记录执行一个 map 操作以获取一组中间 key/value 对,然后对含有相同 key 的所有中间值执行一个 reduce 操作,以此适当的合并之前的衍生数据。由用户指定 map 和 reduce 操作的功能模型允许我们能够简单的进行并行海量计算,并使用 re-execution 作为主要的容错机制。

这项工作的最大贡献是提供了一个简单的、强大的接口,使我们能够自动的进行并行和分布式的大规模计算,通过在由普通 PC 组成的大规模集群上实现高性能的接口来进行合并。

第二章描述了基本的编程模型,并给出了几个例子。第三章描述了一个为我们的聚类计算环境定制的 MapReduce 接口实现。第四章描述了我们发现对程序模型很有用的几个优化。第六章探索了 MapReduce 在 Google 内部的使用,包括我们在将它作为生产索引系统重写的基础的一些经验。第七章讨论了相关的和未来的工作。

二、  编程模型

这个计算输入一个 key/value 对集合,产生一组输出 key/value 对。MapReduce 库的用户通过两个函数来标识这个计算:Map 和 Reduce。

Map,由用户编写,接收一个输入对,产生一组中间 key/value 对。MapReduce 库将具有相同中间 key I 的聚合到一起,然后将它们发送给 Reduce 函数。

Reduce,也是由用户编写的,接收中间 key I 和这个 key 的值的集合,将这些值合并起来,形成一个尽可能小的集合。通常,每个 Reduce 调用只产生 0 或 1 个输出值。这些中间值经过一个迭代器(iterator)提供给用户的 reduce 函数。这允许我们可以处理由于数据量过大而无法载入内存的值的链表。

2.1 例子

考虑一个海量文件集中的每个单词出现次数的问题,用户会写出类似于下面的伪码:

 

Map 函数对每个单词增加一个相应的出现次数(在这个例子中仅仅为“1”)。Reduce 函数将一个指定单词所有的计数加到一起。

此外,用户使用输入和输出文件的名字、可选的调节参数编写代码,来填充一个 mapreduce 规格对象,然后调用 MapReduce 函数,并把这个对象传给它。用户的代码与 MapReduce 库(C++ 实现)连接到一起。。附录 A 包含了这个例子的整个程序。

2.2 类型

尽管之前的伪代码中使用了字符串格式的输入和输出,但是在概念上,用户定义的 map 和 reduce 函数需要相关联的类型:

map  (k1, v1)                      —         list(k2, v2)

reduce  (k2, list(v2))                —          list(v2)

也就是说,输入的键和值和输出的键和值来自不同的域。此外,中间结果的键和值与输出的键和值有相同的域。

MapReduce 的 C ++ 实现与用户定义的函数使用字符串类型进行参数传递,将类型转换的工作留给用户的代码来处理。

2.3 更多的例子

这里有几个简单有趣的程序,能够使用 MapReduce 计算简单的表示出来。

分布式字符串查找(Distributed Grep):map 函数将匹配一个模式的行找出来。Reduce 函数是一个恒等函数,只是将中间值拷贝到输出上。

URL 访问频率计数(Count of URL Access Frequency):map 函数处理 web 页面请求的日志,并输出 URL, 1。Reduce 函数将相同 URL 的值累加到一起,生成一个 URL, total count 对。

翻转网页连接图(Reverse Web-Link Graph):map 函数为在一个名为 source 的页面中指向目标(target)URL 的每个链接输出 target, source 对。Reduce 函数将一个给定目标 URL 相关的所有源(source)URLs 连接成一个链表,并生成对:target, list(source)。

主机关键向量指标(Term-Vector per Host):一个检索词向量将出现在一个文档或是一组文档中最重要的单词概述为一个 word, frequency 对链表。Map 函数为每个输入文档产生一个 hostname, term vector(hostname 来自文档中的 URL)。Reduce 函数接收一个给定 hostname 的所有文档检索词向量,它将这些向量累加到一起,将罕见的向量丢掉,然后生成一个最终的 hostname, term vector 对。

倒排索引(Inverted Index):map 函数解析每个文档,并生成一个 word, document ID 序列。Reduce 函数接收一个给定单词的所有键值对,所有的输出对形成一个简单的倒排索引。可以通过对计算的修改来保持对单词位置的追踪。

分布式排序(Distributed Sort):map 函数将每个记录的 key 抽取出来,并生成一个 key, record 对。Reduce 函数不会改变任何的键值对。这个计算依赖了在 4.1 节提到的分区功能和 4.2 节提到的排序属性。

三、  实现

MapReduce 接口有很多不同的实现,需要根据环境来做出合适的选择。比如,一个实现可能适用于一个小的共享内存机器,而另一个实现则适合一个大的 NUMA 多处理器机器,再另一个可能适合一个更大的网络机器集合。

这一章主要描述了针对在 Google 内部广泛使用的计算环境的一个实现:通过交换以太网将大量的普通 PC 连接到一起的集群。在我们的环境中:

(1)  机器通常是双核 x86 处理器、运行 Linux 操作系统、有 2 -4G 的内存。

(2)  使用普通的网络硬件—通常是 100Mb/ s 或者是 1Gb/ s 的机器带宽,但是平均值远小于带宽的一半。

(3)  由数百台或者数千台机器组成的集群,因此机器故障是很平常的事

(4)  存储是由直接装在不同机器上的便宜的 IDE 磁盘提供。一个内部的分布式文件系统用来管理存储这些磁盘上的数据。文件系统在不可靠的硬件上使用副本机制提供了可用性和可靠性。

(5)  用户将工作提交给一个调度系统,每个工作由一个任务集组成,通过调度者映射到集群中可用机器的集合上。

3.1 执行概述

通过自动的将输入数据分区成 M 个分片,Map 调用被分配到多台机器上运行。数据的分片能够在不同的机器上并行处理。使用分区函数(如,hash(key) mod R)将中间结果的 key 进行分区成 R 个分片,Reduce 调用也被分配到多台机器上运行。分区的数量(R)和分区函数是由用户指定的。

 

独立的工作机器的计数器值周期性的传送到 master(附在 ping 的响应上)master 将从成功的 map 和 reduce 任务上获取的计数器值进行汇总,当 MapReduce 操作完成时,将它们返回给用户的代码。当前的计数器值也被显示在了 master 的状态页面上,使人们能够看到当前计算的进度。当汇总计数器值时,master 通过去掉同一个 map 或 reduce 任务的多次执行所造成的影响来防止重复计数。(重复执行可能会在我们使用备用任务和重新执行失败的任务时出现。)

一些计数器的值是由 MapReduce 库自动维护的,如已处理的输入 key/value 对的数量和已生成的输出 key/value 对的数量。

用户发现计数器对检查 MapReduce 操作的行为很有用处。例如,在一些 MapReduce 操作中,用户代码可能想要确保生成的输出对的数量是否精确的等于已处理的输入对的数量,或者已处理的德国的文档数量在已处理的所有文档数量中是否被容忍。

五、  性能

在这章中,我们测试两个运行在一个大规模集群上的 MapReduce 计算的性能。一个计算在大约 1TB 的数据中进行特定的模式匹配,另一个计算对大约 1TB 的数据进行排序。

这两个程序能够代表实际中大量的由用户编写的 MapReduce 程序,一类程序将数据从一种表示方式转换成另一种形式;另一类程序是从海里的数据集中抽取一小部分感兴趣的数据。

5.1 集群配置

所有的程序运行在一个由将近 1800 台机器组成的集群上。每个机器有两个 2GHz、支持超线程的 Intel Xeon 处理器、4GB 的内存、两个 160GB 的 IDE 磁盘和一个 1Gbps 的以太网链路,这些机器部署在一个两层的树状交换网络中,在根节点处有大约 100-200Gbps 的带宽。所有的机器都采用相同的部署,因此任意两个机器间的 RTT 都小于 1ms。

在 4GB 内存里,有接近 1 -1.5GB 用于运行在集群上的其它任务。程序在一个周末的下午开始执行,这时主机的 CPU、磁盘和网络基本都是空闲的。

5.2 字符串查找(Grep)

这个 grep 程序扫描了大概 1010 个 100 字节大小的记录,查找出现概率相对较小的 3 个字符的模式(这个模式出现在 92337 个记录中)。输入被分割成接近 64MB 的片(M=15000),整个输出被放到一个文件中(R=1)。

  

图 3:对于排序程序的不同执行过程随时间的数据传输速率

图 3(a)显示了排序程序的正常执行过程。左上方的图显示了输入读取的速率,这个速率峰值大约为 13GB/s,因为所有的 map 任务执行完成,速率也在 200 秒前下降到了 0。注意,这里的输入速率比字符串查找的要小,这是因为排序程序的 map 任务花费了大约一半的处理时间和 I / O 带宽将终结结果输出到它们的本地磁盘上,字符串查找相应的中间结果输出几乎可以忽略。

左边中间的图显示了数据通过网络从 map 任务发往 reduce 任务的速率。这个缓慢的数据移动在第一个 map 任务完成时会尽快开始。图中的第一个峰值是启动了第一批大概 1700 个 reduce 任务(整个 MapReduce 被分配到大约 1700 台机器上,每个机器每次最多只执行一个 reduce 任务)。这个计算执行大概 300 秒后,第一批 reduce 任务中的一些执行完成,我们开始执行剩下的 reduce 任务进行数据处理。所有的处理在计算开始后的大约 600 秒后完成。

左边下方的图显示了 reduce 任务就爱那个排序后的数据写到最终的输出文件的速率。在第一个处理周期完成到写入周期开始间有一个延迟,因为机器正在忙于对中间数据进行排序。写入的速率会在 2 -4GB/ s 上持续一段时间。所有的写操作会在计算开始后的大约 850 秒后完成。包括启动的开销,整个计算耗时 891 秒,这与 TeraSort benchmark 中的最好记录 1057 秒相似。

一些事情需要注意:因为我们的位置优化策略,大多数数据从本地磁盘中读取,绕开了网络带宽的显示,所以输入速率比处理速率和输出速率要高。处理速率要高于输出速率,因为输出过程要将排序后的数据写入到两个拷贝中(为了可靠性和可用性,我们将数据写入到两个副本中)。我们将数据写入两个副本,因为我们的底层文件系统为了可靠性和可用性提供了相应的机制。如果底层文件系统使用容错编码(erasure coding)而不是复制,写数据的网络带宽需求会降低。

5.4 备用任务的作用

在图 3(b)中,我们显示了一个禁用备用任务的排序程序的执行过程。执行的流程与如 3(a)中所显示的相似,除了有一个很长的尾巴,在这期间几乎没有写入行为发生。在 960 秒后,除了 5 个 reduce 任务的所有任务都执行完成。然而,这些落后者只到 300 秒后才执行完成。整个计算任务耗时 1283 秒,增加了大约 44% 的时间。

5.5 机器故障

在图 3(c)中,我们显示了一个排序程序的执行过程,在计算过程开始都的几分钟后,我们故意 kill 掉了 1746 个工作进程中的 200 个。底层的调度者会迅速在这些机器上重启新的工作进程(因为只有进程被杀掉,机器本身运行正常)。

工作进程死掉会出现负的输入速率,因为一些之前已经完成的 map 工作消失了(因为香港的 map 工作进程被 kill 掉了),并且需要重新执行。这个 map 任务会相当快的重新执行。整个计算过程在 933 秒后完成,包括了启动开销(仅仅比普通情况多花费了 5% 的时间)。

六、  经验

我们在 2003 年 2 月完成了 MapReduce 库的第一个版本,并在 2003 年 8 月做了重大的改进,包括位置优化、任务在工作机器上的动态负载均衡执行等。从那时起,我们惊喜的发现,MapReduce 库能够广泛的用于我们工作中的各种问题。它已经被用于 Google 内部广泛的领域,包括:

大规模机器学习问题

Google 新闻和 Froogle 产品的集群问题

抽取数据用于公众查询的产品报告

从大量新应用和新产品的网页中抽取特性(如,从大量的位置查询页面中抽取地理位置信息)

大规模图形计算

表 1:2004 年 8 月运行的 MapReduce 任务

在每个工作的最后,MapReduce 库统计了工作使用的计算资源。在表 1 中,我们看到一些 2004 年 8 月在 Google 内部运行的 MapReduce 工作的一些统计数据。

6.1 大规模索引

目前为止,MapReduce 最重要的应用之一就是完成了对生产索引系统的重写,它生成了用于 Google 网页搜索服务的数据结构。索引系统的输入数据是通过我们的爬取系统检索到的海量文档,存储为就一个 GFS 文件集合。这些文件的原始内容还有超过 20TB 的数据。索引程序是一个包含了 5 -10 个 MapReduce 操作的序列。使用 MapReduce(代替了之前版本的索引系统中的 adhoc 分布式处理)有几个优点:

索引程序代码是一个简单、短小、易于理解的代码,因为容错、分布式和并行处理都隐藏在了 MapReduce 库中。比如,一个计算程序的大小由接近 3800 行的 C ++ 代码减少到使用 MapReduce 的大约 700 行的代码。

MapReduce 库性能非常好,以至于能够将概念上不相关的计算分开,来代替将这些计算混合在一起进行,避免额外的数据处理。这会使索引程序易于改变。比如,对之前的索引系统做一个改动大概需要几个月时间,而对新的系统则只需要几天时间。

索引程序变得更易于操作,因为大多数由于机器故障、机器处理速度慢和网络的瞬间阻塞等引起的问题都被 MapReduce 库自动的处理掉,而无需人为的介入。

七、  相关工作

许多系统都提供了有限的程序模型,并且对自动的并行计算使用了限制。比如,一个结合函数可以在 logN 时间内在 N 个处理器上对一个包含 N 个元素的数组使用并行前缀计算,来获取所有的前缀 [6,9,13]。MapReduce 被认为是这些模型中基于我们对大规模工作计算的经验的简化和精华。更为重要的是,我们提供了一个在数千个处理器上的容错实现。相反的,大多数并行处理系统只在较小规模下实现,并将机器故障的处理细节交给了程序开发者。

Bulk Synchronous Programming 和一些 MPI 源于提供了更高层次的抽象使它更易于让开发者编写并行程序。这些系统和 MapReduce 的一个关键不同点是 MapReduce 开发了一个有限的程序模型来自动的并行执行用户的程序,并提供了透明的容错机制。

我们的位置优化机制的灵感来自于移动磁盘技术,计算用于处理靠近本地磁盘的数据,减少数据在 I / O 子系统或网络上传输的次数。我们的系统运行在挂载几个磁盘的普通机器上,而不是在磁盘处理器上运行,但是一般方法是类似的。

我们的备用任务机制与 Charlotte 系统中采用的 eager 调度机制类似。简单的 Eager 调度机制有一个缺点,如果一个给定的任务造成反复的失败,整个计算将以失败告终。我们通过跳过损坏计算路的机制,解决了这个问题的一些情况。

MapReduce 实现依赖了内部集群管理系统,它负责在一个大规模的共享机器集合中分发和运行用户的任务。尽管不是本篇文章的焦点,但是集群管理系统在本质上与像 Condor 的其它系统类似。

排序功能是 MapReduce 库的一部分,与 NOW-Sort 中的操作类似。源机器(map 工作进程)将将要排序的数据分区,并将其发送给 R 个 Reduce 工作进程中的一个。每个 reduce 工作进程在本地对这些数据进行排序(如果可能的话就在内存中进行)。当然 NOW-Sort 没有使 MapReduce 库能够广泛使用的用户定义的 Map 和 Reduce 函数。

River 提供了一个编程模型,处理进程通过在分布式队列上发送数据来进行通信。像 MapReduce 一样,即使在不均匀的硬件或系统颠簸的情况下,River 系统依然试图提供较好的平均性能。River 系统通过小心的磁盘和网络传输调度来平衡完成时间。通过限制编程模型,MapReduce 框架能够将问题分解成很多细颗粒的任务,这些任务在可用的工作进程上动态的调度,以至于越快的工作进程处理越多的任务。这个受限制的编程模型也允许我们在工作将要结束时调度冗余的任务进行处理,这样可以减少不均匀情况下的完成时间。

BAD-FS 与 MapReduce 有完全不同的编程模型,不像 MapReduce,它是用于在广域网下执行工作的。然而,它们有两个基本相似点。(1)两个系统都使用了重新执行的方式来处理因故障而丢失的数据。(2)两个系统都本地有限调度原则来减少网络链路上发送数据的次数。

TASCC 是一个用于简化结构的高可用性的网络服务。像 MapReduce 一样,它依靠重新执行作为一个容错机制。

感谢各位的阅读,以上就是“MapReduce 编程模型是什么”的内容了,经过本文的学习后,相信大家对 MapReduce 编程模型是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7903字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)