Learning Spark 中文版 -- 第四章 -- 使用键值对(2)

本贴最后更新于 2102 天前,其中的信息可能已经事过景迁

Actions Available on Pair RDDs (键值对 RDD 可用的 action)

  和 transformation(转换)一样,键值对 RDD 也可以使用基础 RDD 上的 action(开工),并且键值对 RDD 有一些利用键值对数据特性的的 action,如下表:

表 4-3 键值对 RDD 上的 action

函数名 描述 例子 结果
countByKey() 计算每个键元素的总数 rdd.countByKey() {(1,1),(3,2)}
collectAsMap() 结果收集成一个 map 便于查询 rdd.collectAsMap() Map{(1,2),(3,4),(3,6)}
lookup(key) 根据键返回值 rdd.lookup(3) [4,6]

  键值对 RDD 还有很多其他保存 RDD 的 action,我们将在第五章进行讨论。

Data Partitioning(Advanced)(数据分区)

  我们这一章讨论的最后一个 Spark 特性就是如何控制节点间的数据分区。在分布式程序中,主机间的通信代价高昂,所以把数据安排妥当来最小化网络间的通信可以极大地提高性能。很像单台机器的程序需要为数据选择正确的数据结构,Spark 能够控制 RDD 的分区来减少网络间通信。分区不会对所有的应用都有用,举个例子,如果给定的 RDD 值只被扫描一次,那么预先对其分区没有什么意义。只有多次使用如 join 这样的的键操作的 RDD,分区才有意义。稍后会有一些例子。

  Spark 中的所有键值对 RDD 都可以使用分区,因为系统的分组函数是根据每个元素的键。尽管 Spark 没有明确地控制每个键所对应的工作节点(也因为系统在某些工作节点失败的情况下也能正常运行),它允许程序能够确保一组键会出现在同一个节点上。例如,你可以选择哈希分区(hashpartition)将一个 RDD 划分成 100 个分区,这样模除 100 后有相同哈希值的键会出现在一个节点上。或者你可以使用区间分区(range-partition)按区间对键进行分区,这样键在相同范围内的元素会在相同的节点上。

  举个简单例子,想想一个内存中保存大量用户信息的应用,(UserId,UserInfo)组成的键值对 RDD,UserInfo 包含用户话题订阅列表。这个应用定期把这个表和一个记录了过去五分钟发生的点击事件的小文件结合,就是一个(UserId,LinkInfo)键值对,记录了用户五分钟内点击网站链接的信息的日志。举例来讲,我们想统计用户访问和他们订阅主题无关的链接数量。我们可以执行 Spark 的 join 操作,把 UserInfo 和 LinkInfo 键值对根据 UserId 键分组。Example4-22 展示了这个例子:

Example 4-22. Scala simple application

// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
//初始化代码;我们从HDFS的Hadoop SequenceFile加载用户信息,它通过他们找到的HDFS block来分发userData元素,Spark并不知道每个UserId在分区中的位置
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
//函数定期被调用处理过去五分钟的事件日志,(假定这个SequenceFIle包含(UserId,Link//Info)键值对

def processNewLogs(logFileName: String) {
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
    val offTopicVisits = joined.filter {
        case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
    println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

  这段代码能够达到我们的目的,但是效率会很差。这是因为每次调用 processNewLogs() 时调用的 join() 操作都不知道键在数据集的分区方式。默认情况下,这些操作会用 hash 值混洗两个数据集的所有键,把具有相同哈希值的键发送到相同的机器中,然后在这台机器上 join 相同键的元素(如图 4-4)。因为我们知道 userData 表比每五分钟的点击事件日志大很多,这浪费了大量工作:userData 表每次被调用都要在通过网络把数据打乱再用哈希值对键分组,有时候用户表甚至没有变化也要这样做。

image


  改正其实很简单:在程序开始时对 userData 使用 partitionBy() 转换(transformation)来把数据进行哈希分区。还需传递一个 spark.HashPartitioner 对象给 partitionBy,如例所示:

Example 4-23. Scala custom partitioner
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                .partitionBy(new HashPartitioner(100)) // Create 100 partitions
                .persist()

  processNewLogs() 可以保持不变:事件 RDD 是从本地进入的 processNewLogs() 方法,并且在本方法中只被使用了一次,所以时间 RDD 被指定分区就没什么好处。因为我们构建 userData 时调用了 partitionBy(),Spark 会立即知道它被哈希分区了,调用 join() 时会利用这些信息。特别是当我们调用 userData.join(events) 时,Spark 会只混洗 eventsRDD,向包含用户数据的相应哈希分区的机器发送带有每个特定 UserId 的事件(如图 4-5)。结果就是只有少量数据需要在网络中通信,程序极大提高速度。

image
  注意一点 partitionBy() 是 transformation(转换),所以他总是返回一个新 RDD,原始 RDD 不会被改变。RDD 一旦创建可以永不改变。因此,持久化并且保存 partitionBy() 后的 userData 的结果是很重要的,而不是保存原始的 sequenceFile()。并且,把 100 传递给 partitionBy()表示分区的数量,这会控制相同数量的并行 task(任务)在 RDD 执行后续的操作(例如:join);通常,这个数量至少和集群上的核心数一样大。

在 partitionBy()之后未能持久化会导致后续使用 RDD 重复对数据分区。没有持久化,已分区 RDD 的使用将会导致对 RDD 完整继承关系的重新求导。这是 partitionBy() 的弊端,这会导致跨网络的重复分区和数据洗牌,类似于没有指定分区的情况。

  实际上,Spark 的许多操作会自动生成附加分区信息的 RDD,并且很多操作会利用这些分区信息,除了 join()。举个例子,sortBykey()groupByKey() 会分别生成区间分区和哈希分区。另一方面,类似 map() 操作产生的新 RDD 会忘记父 RDD 的分区信息,因为这种操作理论上有可能修改每条记录的键信息。后面部分会介绍如何决定 RDD 分区,和 Spark 不同的操作如何影响分区。

Java,Python 和 Scala 三者的 API 受益于分区的方式并无二致。但是,在 Python 中,你不能把一个 Hash Partitioner 对象传递给 partitionBy;你可以直接传递分区要求的数量(例如:rdd.partitionBy(100))。

Determining an RDD's Patitioner(决定 RDD 的分区器)

  在 Scala 和 Java 中,你可以通过 partitioner 的属性决定 RDD 如何分区(或者 Java 中的 partitioner() 方法)。这回返回一个 scala.Option 对象,一个 Scala 中包含可能存在可能不存在对象的容器类。你还可以调用 Option 对象的 isDefined() 来检查是否有值,get() 方法返回这个值。如果有值,会是一个 spark.Partioner 对象。这本质上是一个表示 RDD 每个键的分区的函数;稍后会详细介绍。

  利用 partitioner 属性是个在 Spark shell 中测试 Spark 不同操作对分区影响的好手段,还能够检查你想在程序中执行的操作是否符合正确的结果(见 Example4-24)。

Example 4-24. Determining partitioner of an RDD

scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> pairs.partitioner
res0: Option[spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14

scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)

  在这个简单的 shell 会话中,我们创建了 (Int,Int) 键值对 RDD,其初始化是没有任何分区信息(Option 对象的值是 None)。然后我们通过对第一个 RDD 哈希分区创建了第二个 RDD。如果我们实际上想在后面的操作使用已定义的 partitioned,我们应该在例子第三行输入的末尾加上 persist()。这和在之前例子中需要对 userData 使用 persist() 的原因是相同的:如果不适用 persist(),后续 RDD 的 action 计算分区的整个继承关系,这会导致键值对被一遍又一遍地哈希分区。

Operations That Benefit from Partitioning(从分区获益的操作)

  很多 Spark 的操作会导致在网络间根据键对数据洗牌。这些操作都可以通过分区进行优化。像 Spark1.0,通过分区可以优化的操作有:cogroup(),groupWith(),join(),lefOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey()和lookup()

  对于运行单独 RDD 上的操作,如 reduceByKey(),运行一个分区 RDD 会导致每个键的所有值在单个机器上计算,只需要最后把本地归约的值从工作节点发送回主节点。对于二元运算,如 cogroup()join(),预分区(pre-partitioning)会导致至少一个 RDD(已知分区器的 RDD)不被混洗。如果 RDD 都是相同的分区器并且缓存在相同的机器上或者其中之一仍未被计算,那么不会发生网络间的数据洗牌。

Operations That Affect Partitioning(影响分区的操作)

  Spark 内部知道操作是如何影响分区的,自动对会为数据分区的操作创建的 RDD 设置分区器。举例来讲,假设你调用了 join() 来连接两个 RDD;因为有相同键的元素已经被哈希分区到相同的机器上了,Spark 知道结果就是哈希分区,在 join 产生的结果上的操作如 reduceByKey() 会明显变快。

  另一方面,有些无法确保会生成已知分区的转换,输出的 RDD 不会有分区器集。举例来说,如果你对一个哈希分区的键值对 RDD 调用 map()map() 中的函数参数理论上可以改变每个元素的键,所以结果不会包含分区器。Spark 不会分析你的函数来检查是否改变了键,而是提供了两个操作,mapvalues()flatMapValues() 来保证每个元组的键未被改变。

  总结一下,所有会输出分区器的操作:cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(),mapValues() (如果父RDD有分区器), flatMapValues() (如果父RDD有分区器), and filter() (如果父RDD有分区器)。剩下的操作不会产生分区器。

  对于二元操作,输出分区器的设置取决于父 RDD 的分区器。默认情况下,使用哈希分区器,分区的数量由操作的并行度确定。但是,如果父 RDD 其中之一有分区器集,那该分区器会设置为分区器,如果所有的父 RDD 都有分区器集,那么设置分区器为第一个父分区器。

Example:PageRank(例:PageRank)

  我们认为 PageRank(网页排名)算法是一个典型的会因分区提升效率的例子。PageRank 算法是以谷歌的 Larry Page 命名的,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。这可以用来为网页排名,也可以是论文或用户影响力。

  PageRank 是一个会执行很多 join 的迭代算法,所以是使用 RDD 分区的好样本。这个算法包含两个数据集:一个是(pageId,linkList),包含每个网页的邻居列表(该页面包含其他页面的链接,这个链接页面称为邻居页面);另一个是(pageID,rank),包含每个网页的当前排名。它的计算流程大致如下:

1.把每个页面的初始级别设置为 1.0.

2.每次迭代,页面 p 发送 rank(p)/numNeighbors(p) 的贡献给它的邻居网站(它有链接的页面)。

3.设置页面 p 级别为 0.5+0.85*收到的贡献。

  最后两步重复迭代多次,在这个过程中,算法会渐渐收敛到每个网页正确的 PageRank 值。实际上,通常进行十次迭代。

Example 4-25 gives the code to implement PageRank in Spark.

Example 4-25. Scala PageRank

// Assume that our neighbor list was saved as a Spark objectFile
//假设邻页列表存在了Spark的objectFile中。
val links = sc.objectFile[(String, Seq[String])]("links")
                .partitionBy(new HashPartitioner(100))
                .persist()
                
// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
//把每个页面的初始值设为1,因为使用mapValue,所以RDD会有和链接相同的分区器
var ranks = links.mapValues(v => 1.0)

// Run 10 iterations of PageRank
//运行十遍PageRank的迭代
for (i <- 0 until 10) {
    val contributions = links.join(ranks).flatMap {
        case (pageId, (links, rank)) =>
            links.map(dest => (dest, rank / links.size))
    }
    ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// Write out the final ranks
ranks.saveAsTextFile("ranks")

  就是这样!算法开始时,rankRDD 对每个元素设置 1.0 的初始值,每次迭代都会更新 rank 变量。PageRank 算法在 Spark 中表达非常简单:首先将当前等级 RDD 和静态链接 RDD 通过 join() 结合,这是为了得到(页面 ID,链接列表,页面等级)元组,然后使用 flatMap 生成“贡献”值发送给每个邻居页面。我们把贡献值按页面 ID 求和并且设置页面的等级为 0.15+0.85*收到的贡献

  虽然代码本身很简单,但是这个例子为了确保使用高效的方式分区 RDD 和最小化网络通信做了很多事情:

1.注意每次迭代都是 links.join(ranks)。因为 links 是一个静态数据集,程序开始就用 partitionBy 对它分区了,所以这个 RDD 并不需要在网络间对数据洗牌。实际上,linksRDD 可能会比 ranksRDD 大很多,因为它保存了每个页面的邻居页面列表,所以这种优化比简单地实现 PageRank(如,使用简单的 MapReduce)减少了大量的网络任务。

2.同理,我们把 linksRDD persist() 避免对其迭代。

3.当我们第一次创建 ranksRDD 时,我们使用 mapValues() 而不是 map() 来保存父 RDD(links)的分区信息,所以我们第一次 join 开销很小。

4.再循环体中,我们在 reduceByKey() 之后执行 mapValues();因为 reduceByKey() 的结果已经被哈希分区了,这将使得将 map 的结果与下一次迭代中的链接结合起来更有效率。

为了最大限度地发挥分区优化的潜力,当你不改变键的值时应该使用 mapValues() flatMapValues()

Custom Partitioners(定制分区器)

  尽管哈希分区器和区间分区器可以在很多场景使用,Spark 仍然允许你通过提供一个定制的 Partitioner 对象来自定义 RDD 分区方式。这可以帮助你利用特定领域的知识来减少网络通信的消耗。

  举例来说,假如我们想使用 PageRank 算法计算一组 web 页面,以页面 URL 做 RDD 的键,即 PageID 为 URL,使用哈希分区的话,域名相同后缀不相同的 URL 不会在一个分区(如,http://www.cnn.com/WORLDhttp://www.cnn.com/US)。我们知道同一个域名中的链接往往彼此连接。由于 PageRank 需要在每次迭代时将每个页面的消息发送给每个邻居,因此定制分区器有助于将这些页面分组到相同的分区中。我们可以定制一个分区器把相同域名的 URL 分区到一个节点上。

  定制分区器需要是 org.apache.spark.Partitioner 的子类并且实现三个方法:

  • numPartitions:Int,返回你创建分区的数量。

  • getPartition(key:Any):Int,返回对应键的分区 ID(0 到 numPartitions-1)。

  • equals(),标准 Java 相等方法。这个实现很重要,因为 Spark 需要测试你的分区器与其它实例是否相等来判断两个 RDD 的分区是否是一种方式。

  有一点需要注意的是如果你的算法中依赖了 Java 的 hashCode() 方法,这有可能返回一个负数。你需要确保 getPartition() 不会返回负数。

  Example4-26 展示了一个我们之前描述的域名分区器,这个分区器只对每个 URL 的域名进行哈希分区。

Example 4-26. Scala custom partitioner
class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
        val domain = new Java.net.URL(key.toString).getHost()
        val code = (domain.hashCode % numPartitions)
        if (code < 0) {
            code + numPartitions // Make it non-negative
        } else {
            code
    }
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
        dnp.numPartitions == numPartitions
    case _ =>
        false
    }
}

  注意在 equals() 方法中,我们使用 Scala 的模式匹配操作测试 other 是否是一个 DomainNamePartitioner 对象,如果是的就跳进里面的方法;这和使用 Java 的 intanceof 是一样的。

  使用定制分区器非常简单:把它传给 partitionBy() 方法就行了。Spark 中很多基于数据洗牌的操作,如 join()groupByKey(),也可以使用可选的 Partitioner 对象来控制分区的输出。

  在 Java 中创建一个定制分区器和 Scala 很相似:直接继承 spark.Partitioner 类并且实现需要的方法就行。

  在 Python 中,你不需要继承 Partitioner 类,但是需要给 RDD.partitionBy() 方法传递一个哈希函数作为额外的参数。示例如下:

Example 4-27. Python custom partitioner

import urlparse

def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)
    
rdd.partitionBy(20, hash_domain) # Create 20 partitions

  注意一点你传递的哈希函数会作为和其他 RDD 比较的标识。如果你想使用相同的分区器对多个 RDD 分区,那么需要传递相同的函数对象(如,全局函数)而不是为每个创建一个 lambda 表达式。

Conclusion(总结)

  本章中,我们了解了 Spark 中处理键值对数据的特殊函数。第三章学到的技术对键值对仍然适用。下一章节,我们将了解如何加载保存数据。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 549 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...