SparkStreaming Dstream 的 Transformation/output 算子(window,updateStateByKey)

本贴最后更新于 2450 天前,其中的信息可能已经时移世异

Ttransformation 算子

SparkStreaming Dstream 的 Transformation 算子和 SparkCore 的操作基本类似,毕竟 DStream 的底层还是 RDD.

TransformationMeaning
map对传入的每个元素返回一个新的元素
flatMap对传入的每个元素返回一个或多个元素
filter过滤掉不符合条件的元素/选择符合条件的元素
union将两个DStream合并
count返回元素的个数
reduce对所有的Values进行聚合
countByValue按值分组并统计每组的总数,返回(K,V)的格式
reduceByKey按K分组,对Values进行聚合
cogroup对两个Dstream进行链接操作,一个Key连接起来的两个RDD的数据,都会以Iterable'<'V'>'的形式出现在一个Tuple中
join对两个Dstream进行链接操作,每个连接起来的Pair作为Dstream的RDD的一个新元素
transform对Dstream进行转换,转换成RDD,Dataset.
updateStateBykey为每个Key维护一个状态,并进行更新
window对滑动窗口内的数据执行操作
可以看到大部分的算子跟RDD的操作算子基本相同,独有的算子如`transform,updateStateByKey,window`,也是SparkStreaming中比较有用的算子。

window

ac7a14b4f0604b3f9fd247a3b626aaa9-image.png

window:滑动窗口,最早在计算机网络中接触,用于做流量控制,这里的滑动窗口用于选择某一段时间内的 RDD 构成一个 Dstream 进行计算,实时流计算设置的 Batch interval 只能让我们获取一段一段的数据,每一段数据之间是不会交叉重叠的,对于数据分析来说,两组数据之间可能存在断层,阶跃。window 可以让数据之间进行平滑的过度。当然有一定的局限性,如果我们的目标仅仅里类似对数据进行一些处理,不去探寻数据见的关系的话,就没有必要,因为这会让每一台拿数据进行多次处理次数取决于 window 算子的参数。

使用:

参数 1:窗口长度
参数 2:滑动间隔
这两个参数必须是 batch inveral 的整数倍。
类似上图,窗口长度为 3 秒,时间间隔为 2 秒。每三秒获取的 inputDStream 聚合为一个窗口进行计算,间隔两秒再计算一次...

相关算子:

TransformationMeaning
window对每个滑动窗口的数据执行自定义计算
countByWindow对每个窗口的数据执行count操作
countByVaueAndWindow对每个窗口的数据执行countByValue操作
reduceByWindow对每个窗口的数据执行reduce操作
reduceBykeyAndWindow对每个窗口的数据执行reduceBykey操作
groupBykeyAndWindow对每个窗口的数据执行groupBykey操作
使用起来比较简单:只需要把wordcounnt中的Dstream执行window操作得到一个Dstream,对这个Dstream进行后续的操作即是对window的操作.
//3秒作为一个窗口,间隔为2秒
JavaPairDStream<String,Integer> pairDStreamWindows = pairDStream.window(Durations.seconds(3),Durations.seconds(2));
JavaPairDStream<String,Integer> wordCount =  pairDStreamWindows.reduceByKey((x1,x2)->(x1+x2));
//scala
def wordCountwindow(){
  val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
  val ssc  = new StreamingContext(conf,Seconds(1))
  val words = ssc.socketTextStream("localhost",9999)
				  .flatMap((line)=>line.split(" "))
				  .map(x=>(x,1))
	
  //window()此函数有多种重载。
  val w3window = words.window(Seconds(3),Seconds(2))
  
  val wordcount = w3window.reduceByKey(_+_)
  wordcount.print()
  ssc.start()
  ssc.awaitTermination()
}

如此我们从一个每一秒进行一次 wordcount 的统计就变成了每隔 2 秒统计 3 秒内的数据作为一个窗口统计一次 wordcount。

def wordCountRBwindow(){
  val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
  val ssc  = new StreamingContext(conf,Seconds(1))
  val words = ssc.socketTextStream("localhost",9999)
    .flatMap((line)=>line.split(" "))
    .map(x=>(x,1))

  //reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration),此函数有多种参数重载。
  val wordcount20 = words.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2,Seconds(20),Seconds(2))

  wordcount20.print()
  ssc.start()
  ssc.awaitTermination()
}

其他的 window 算子类似。print 算子是为了触发 job 操作。

updateStateByKey

updateStateBykey 可以为每一个 Key 维护一个 state,并在每次数据产生时对 state 进行更新。前提条件是必须开启CheckPoint机制。checkPoint 可以保证在内存中长期存贮的 state 故障丢失的时候可以得到恢复。

  • 定义一个 state,可以为任意数据类型。
  • 设置更新函数,更新函数会会执行如何来进行更新。

对于每一个 batch 的数据,spark 都会为之前已经存在的 key 更新 state[无论这个 batch 中是否有和 key 相同的数据],对于新出现的 key 也会进行 state 个更新。如果更新函数返回 none,那么 key 的 state 就会被删除。

回顾前面的 wordcount 程序,从开始的离线操作记录某个文件的 wordcount。到实时的 batch inveral 一个时间延迟内的 wordcount。再到滑动窗口的多个 batch inveral 内的 worcount。
而使用 updateStateByKey,可以实现全局的 worcount。我们用每个 key 的 state 保存 word 的 count,每次一 batch 将数据 state 进行累加,即每一次 batch 和前面所有的数据进行一次 worcount 的 reduceByKey 操作。
< 如果不借助 updateStateByKey 我们就需要将没给 batch 的 wordCount 保存/缓存起来,进行累加。>
开启 Checkpoint,并使用而使用 updateStateByKey:

private static  void wordCountUpdateStateByKey() throws InterruptedException{
	SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[*]");

	JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
	 //开启checkpoint
	jsc.checkpoint("hdfs://spark:9000/checkPointDir");

	//ReceiverInputDStream
	JavaReceiverInputDStream lines =   jsc.socketTextStream("localhost",9999);

	JavaDStream listDstream =  lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator());
	JavaPairDStream pairDStream =  listDstream.mapToPair(x->new Tuple2<>(x,1));
	
	//这里不再使用reduceByKey 而使用updateStateByKey
	JavaPairDStream wordCount =  pairDStream.updateStateByKey(
			new Function2, Optional, Optional>() {
				public Optional call(List values,Optional state) throws Exception {
					int vl =0;
					if(state.isPresent()){
						vl = state.get();
					}
					for(Integer value : values) {
						vl += value;
					}

					return Optional.of(vl);
				}
			});
	//这里是Lambda版本
	JavaPairDStream wordCount2 =  pairDStream.updateStateByKey(
				(values,state)->{
					int vl =0;
					if(state.isPresent()){
						vl = state.get();
					}
					for(Integer value : values) {
						vl += value;
					}
					return Optional.of(vl);
				 });

	wordCount.print();
	jsc.start();
	jsc.awaitTermination();
	jsc.close();
}

这里的 Optional 类是 Java8 中的一种防御性检查机制,用于消除空指针异常。详情请看我的《Java8 实战》拥抱变化,下载原书籍,第十章:用 Optional 取代 null。

上述的这些 Transformation 算子都是懒加载的都不能出发 job 操作。最终的操作结果需要 output 操作才能输出/保存起来。

Output 算子:

TransformationMeaning
print直接输出batch中的前十条数据,通常用来测试,在没有output操做的时候用于触发job
saveAsTextFile(prefix, [suffix])将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]
savaAsObjectFile同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。
saveAsHadoopFile同上,将数据保存到Hadoop中
foreachRDD最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。
>Dstream中的所有的操作最终都是由output操作触发的,没有output操作spark是不会执行前面的操作逻辑。其中较为特殊的是foreachRDD,尽管是用了这个output算子也不会触发job,在其中还需要action算子才能够真正的开始触发job执行.

foreachRDD:

顾名思义,遍历 RDD,然后对每个 RDD 进行操作。多数情况下时进行持久化操作,写入到外部存储。通常需要建立一个 Connection,比如 JDBC Connection。然后通过 Connnection 将数据写入外部存储。如下:
1、在 foreach 操作的外部创建 Connection,这种方式会导致 Connection 对象被序列化后传输到每个 task 中,实际上这种 Connection 对象是不能被序列化的,所以这是一种错误的操作。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()
  rdd.foreach { 
	record => connection.send(record)
  }
}

2、在 foreach 内部创建 Connection 对象,这种方式会让每一条数据都创建一个 Connection,非常消耗内存和性能。

dstream.foreachRDD { rdd =>
  rdd.foreach { 
    val connection = createNewConnection()
	record => connection.send(record)
	connection.close()
  }
}

3、使用 foreachPartition 代替 foreach,这样只会对一个 Partition 创建一个 Connection,节省开销。

dstream.foreachRDD { 
  rdd =>rdd.foreachPartition { 
		partitionOfRecords =>
		val connection = createNewConnection()
		partitionOfRecords.foreach(record => connection.send(record))
		connection.close()
        }
}

总结:

SparkStreaming 的操作算子也分为两类,Transformation 和 output(类似 Action),其中最实用的 window 和 updateStateByKey,以及最常用的 foreachRDD,对于 foreachRDD 需要注意写法,是一个值得优化的地方。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 549 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州