Flink 笔记 1---- 计算实时热门商品

本贴最后更新于 1919 天前,其中的信息可能已经时移俗易

代码和具体分析详见 http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

实战案例介绍

本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品

入口 StreamExecutionEnvironment

Flink 程序的第一步是创建一个 StreamExecutionEnvironment 。这是一个入口类,可以用来设置参数和创建数据源以及提交任务。

创建输入源 createInput

核心 DataStream

DataStream 是 Flink 中做流处理的核心 API,上面定义了非常多常见的操作(如,过滤、转换、聚合、窗口、关联等)。

时间类型

ProcessingTime:处理时间:消息被计算处理的时间,也就是由机器的系统时间来决定。
EventTime:事件时间:事件发生时的时间,一般就是数据本身携带的时间。
IngestionTime:摄取时间:事件进入流处理系统的时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Watermark

是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。assignTimestampsAndWatermarks 指定 watermark。
当事件的时间戳是单调递增的,可以将每条数据的业务时间就当做 Watermark。这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。
真实业务场景一般都是存在乱序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor。

过滤事件

DataStream<UserBehavior> pvData = timedData
    .filter(new FilterFunction<UserBehavior>() {
      @Override
      public boolean filter(UserBehavior userBehavior) throws Exception {
        // 过滤出只有点击的数据
        return userBehavior.behavior.equals("pv");
      }
    });

窗口计算

DataStream<ItemViewCount> windowedData = pvData
    .keyBy("itemId")
    .timeWindow(Time.minutes(60), Time.minutes(5))
    .aggregate(new CountAgg(), new WindowResultFunction());

分组 .keyBy("itemId")

窗口内容具体可看 http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/
此处使用滑动窗口(1 小时窗口,5 分钟滑动一次)

Time Window 时间窗口,是根据时间对数据流进行分组的

翻滚时间窗口(Tumbling Time Window) .timeWindow(Time.minutes(1))
例如统计每一分钟中用户购买的商品的总数。
将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口

滑动时间窗口(Sliding Time Window).timeWindow(Time size, Time slide)
例如我们可以每 30 秒计算一次最近一分钟用户购买的商品总数。
窗口是不间断的,需要平滑地进行窗口聚合,一个元素可以对应多个窗口

Count Window 计数窗口,是根据元素个数对数据流进行分组的。

翻滚计数窗口(Tumbling Count Window).countWindow(100)
例如每 100 个用户购买行为事件统计购买总数。
每当窗口中填满 100 个元素了,就会对窗口进行计算。

滑动计数窗口(Sliding Window) .countWindow(100, 10)
例如计算每 10 个元素计算一次最近 100 个元素的总和。

Session Window 会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1)
将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户 30 秒没有活动则视为会话断开(假设 raw data stream 是单个用户的购买行为流。)

聚合计算

.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数 WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出

ProcessFunction

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持 EventTime 或 ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。

@Override
		public void processElement(
			ItemViewCount input,
			Context context,
			Collector<String> collector) throws Exception {

			// 每条数据都保存到状态中
			itemState.add(input);
			// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
			context.timerService().registerEventTimeTimer(input.windowEnd + 1);
		}

		@Override
		public void onTimer(
			long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
			// 获取收到的所有商品点击量
			List<ItemViewCount> allItems = new ArrayList<>();
			for (ItemViewCount item : itemState.get()) {
				allItems.add(item);
			}
			// 提前清除状态中的数据,释放空间
			itemState.clear();
			// 按照点击量从大到小排序
			allItems.sort(new Comparator<ItemViewCount>() {
				@Override
				public int compare(ItemViewCount o1, ItemViewCount o2) {
					return (int) (o2.viewCount - o1.viewCount);
				}
			});
		}

ListState

使用 ListState 来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...