spark RDD 基础

本贴最后更新于 2268 天前,其中的信息可能已经水流花落

RDD 全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他 RDD 转换而创建,为此,RDD 支持丰富的转换操作 ( 如: map, join, filter, groupBy 等),通过这种转换操作,新的 RDD 则包含了如何从其他 RDDs 衍生所必需的信息,所以说 RDDs 之间是有依赖关系的。

基于 RDDs 之间的依赖,RDDs 会形成一个有向无环图 DAG,该 DAG 描述了整个流式计算的流程,实际执行的时候,RDD 是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区。

总结起来,基于 RDD 的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的 DAG,然后写回稳定存储。另外 RDD 还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。

可以说 Spark 最初也就是实现 RDD 的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD 的关系类似于 Hadoop-MapReduce 关系。

RDD 特点

RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换操作,由一个 RDD 得到一个新的 RDD,新的 RDD 包含了从其他 RDD 衍生所必需的信息。

RDDs 之间存在依赖,RDD 的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化 RDD 来切断血缘关系。

分区

如下图所示,RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。

如果 RDD 是通过已有的文件系统构建,则 compute 函数是读取指定文件系统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute 函数是执行转换逻辑将其他 RDD 的数据进行转换。

只读

如下图所示,RDD 是只读的,要想改变 RDD 中的数据,只能在现有的 RDD 基础上创建新的 RDD。

由一个 RDD 转换到另一个 RDD,可以通过丰富的操作算子实现,不再像 MapReduce 那样只能写 map 和 reduce 了,如下图所示。

RDD 的操作算子包括两类,一类叫做 transformations,它是用来将 RDD 进行转化,构建 RDD 的血缘关系;另一类叫做 actions,它是用来触发 RDD 的计算,得到 RDD 的相关计算结果或者将 RDD 保存的文件系统中。下图是 RDD 所支持的操作算子列表。

依赖

RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs 之间分区是一一对应的,另一种是宽依赖,下游 RDD 的每个分区与上游 RDD(也称之为父 RDD)的每个分区都有关,是多对多的关系。

!

通过 RDDs 之间的这种依赖关系,一个任务流可以描述为 DAG(有向无环图),如下图所示,在实际执行过程中宽依赖对应于 Shuffle(图中的 reduceByKey 和 join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中 map 和 union 可以一起执行)。

缓存

如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该 RDD 的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

如下图所示,RDD-1 经过一系列的转换后得到 RDD-n 并保存到 hdfs,RDD-1 在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的 RDD-1 转换到 RDD-m 这一过程中,就不会计算其之前的 RDD-0 了。

Checkpoint

虽然 RDD 的血缘关系天然地可以实现容错,当 RDD 的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs 之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。

为此,RDD 支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为 checkpoint 后的 RDD 不需要知道它的父 RDDs 了,它可以从 checkpoint 处拿到数据。

小结

总结起来,给定一个 RDD 我们至少可以知道如下几点信息:1、分区数以及分区方式;2、由父 RDDs 衍生而来的相关依赖信息;3、计算每个分区的数据,计算步骤为:1)如果被缓存,则从缓存中取的分区的数据;2)如果被 Checkpoint,则从 Checkpoint 处恢复数据;3)根据血缘关系计算分区的数据。

编程模型

在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。经过一系列的 Transformations 后,就可以调用 Actions 触发 RDD 的计算,Action 可以是向应用程序返回结果( count, collect 等),或者是向存储系统保存数据(** saveAsTextFile **等)。在 Spark 中,只有遇到 Action,才会执行 RDD 的计算(即懒执行),这样在运行时可以通过管道的方式传输多个转换。

要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker,如下图所示。Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。

应用举例

下面介绍一个简单的 Spark 应用程序实例 WordCount,统计一个数据集中每个单词出现的次数,首先将从 HDFS 中加载数据得到原始 RDD-0,其中每条记录为数据中的一行句子,经过一个 flatMap 操作,将一行句子切分为多个独立的词,得到 RDD-1,再通过 map 操作将每个词映射为 key-value 形式,其中 key 为词本身,value 为初始计数值 1,得到 RDD-2,将 RDD-2 中的所有记录归并,统计每个词的计数,得到 RDD-3,最后将其保存到 HDFS。


object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: WordCount <inputfile> <outputfile>");
      System.exit(1);
    }
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    val result = sc.textFile(args(0))
                   .flatMap(line => line.split(" "))
                   .map(word => (word, 1))
                   .reduceByKey(_ + _)
    result.saveAsTextFile(args(1))
  }
}

结语

基于 RDD 实现的 Spark 相比于传统的 Hadoop MapReduce 有什么优势呢?总结起来应该至少有三点:

1)RDD 提供了丰富的操作算子,不再是只有 map 和 reduce 两个操作了,对于描述应用程序来说更加方便;

2)通过 RDDs 之间的转换构建 DAG,中间结果不用落地;

3)RDD 支持缓存,可以在内存中快速完成计算。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 550 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...