flink 自定义 TableSink 输出数据到控制台

本贴最后更新于 1652 天前,其中的信息可能已经渤澥桑田

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);


    // 从 hdfs 读取文件
    DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file");

    // 统计
    SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);

    // 将统计结果转为表对象
    Table countsTable = tableEnv.fromDataStream(counts, "word,count");

    // 创建 TableSink
    PrintTableSink sink = PrintTableSink.ofConsole();

    // 注册 TableSink
    tableEnv.registerTableSink(
            "ConsoleSinkTable",
            new String[]{"word", "count"},
            new TypeInformation[]{Types.STRING, Types.INT},
            sink
    );

    // 将表内容输出
    countsTable.insertInto("ConsoleSinkTable");

    // 启动任务
    streamEnv.execute("this is job name");

}

/**
 * 分词器
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        
        String[] words = value.toLowerCase().split("\\W+");

        for (String word : words) {
            if (word.length() > 0) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private String target;
    private PrintTableSinkFunction sinkFunction;

    public PrintTableSink(String target) {
        this.target = target;

        /**
         * 重点!!!
         *
         * PrintTableSinkFunction 是一个自定义的 SinkFunction
         * 描述了当接收到一条数据时,该如何 sink 的具体逻辑
         */
        this.sinkFunction = new PrintTableSinkFunction(target);
    }

    /**
     * 添加当流被消费时的 sink 逻辑
     */
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "流" 添加 sink 逻辑(单条数据)
     */
    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "批" 添加 sink 逻辑(多条数据)
     */
    @Override
    public void emitDataSet(DataSet<Row> dataSet) {

        try {

            List<Row> elements = dataSet.collect();
            for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) {
                Row row = it.next();
                this.sinkFunction.invoke(row);
            }

            dataSet.print();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private String[] fieldNames;
    private TypeInformation<?>[] fieldTypes;

    /**
     * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。
     *
     * @param strings          字段名列表
     * @param typeInformations 字段类型列表
     * @return
     */
    @Override
    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        PrintTableSink sink = new PrintTableSink(target);
        sink.fieldNames = strings;
        sink.fieldTypes = typeInformations;

        return sink;
    }

    /**
     * 表的字段列表
     */
    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    /**
     * 表字段的数据类型
     */
    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    /**
     * 表字段类型的描述信息
     */
    @Override
    public TypeInformation<Row> getOutputType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }


    /**
     * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑
     */
    public static class PrintTableSinkFunction implements SinkFunction<Row> {
        private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class);
        private String target;

        public PrintTableSinkFunction(String target) {
            this.target = target;
        }

        @Override
        public void invoke(Row row, Context context) throws Exception {
            switch (target) {
                case "Console":
                    System.out.println(row);
                    break;
                case "Logger":
                    LOG.info(row.toString());
                    break;
                default:
            }
        }

        @Override
        public void invoke(Row value) throws Exception {
            invoke(value, null);
        }
    }

}

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
LyZane
我的眼神里,是自由光。 昆明

推荐标签 标签

  • 职场

    找到自己的位置,萌新烦恼少。

    126 引用 • 1699 回帖
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    396 引用 • 3416 回帖
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    261 引用 • 662 回帖
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    9 引用 • 32 回帖 • 166 关注
  • ReactiveX

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的 API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

    1 引用 • 2 回帖 • 126 关注
  • 旅游

    希望你我能在旅途中找到人生的下一站。

    85 引用 • 895 回帖
  • Kotlin

    Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,由 JetBrains 设计开发并开源。Kotlin 可以编译成 Java 字节码,也可以编译成 JavaScript,方便在没有 JVM 的设备上运行。在 Google I/O 2017 中,Google 宣布 Kotlin 成为 Android 官方开发语言。

    19 引用 • 33 回帖 • 28 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    34 引用 • 467 回帖 • 693 关注
  • danl
    62 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    536 引用 • 672 回帖
  • 导航

    各种网址链接、内容导航。

    37 引用 • 168 回帖
  • Sandbox

    如果帖子标签含有 Sandbox ,则该帖子会被视为“测试帖”,主要用于测试社区功能,排查 bug 等,该标签下内容不定期进行清理。

    368 引用 • 1212 回帖 • 581 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    140 引用 • 441 回帖
  • OpenResty

    OpenResty 是一个基于 NGINX 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。

    17 引用 • 36 关注
  • 黑曜石

    黑曜石是一款强大的知识库工具,支持本地 Markdown 文件编辑,支持双向链接和关系图。

    A second brain, for you, forever.

    10 引用 • 85 回帖
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 395 关注
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    7 引用 • 26 回帖
  • 笔记

    好记性不如烂笔头。

    303 引用 • 777 回帖
  • JWT

    JWT(JSON Web Token)是一种用于双方之间传递信息的简洁的、安全的表述性声明规范。JWT 作为一个开放的标准(RFC 7519),定义了一种简洁的,自包含的方法用于通信双方之间以 JSON 的形式安全的传递信息。

    20 引用 • 15 回帖 • 16 关注
  • 电影

    这是一个不能说的秘密。

    120 引用 • 597 回帖 • 1 关注
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 93 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    16 引用 • 53 回帖 • 121 关注
  • Bootstrap

    Bootstrap 是 Twitter 推出的一个用于前端开发的开源工具包。它由 Twitter 的设计师 Mark Otto 和 Jacob Thornton 合作开发,是一个 CSS / HTML 框架。

    18 引用 • 33 回帖 • 683 关注
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    34 引用 • 37 回帖 • 497 关注
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 598 关注
  • 微软

    微软是一家美国跨国科技公司,也是世界 PC 软件开发的先导,由比尔·盖茨与保罗·艾伦创办于 1975 年,公司总部设立在华盛顿州的雷德蒙德(Redmond,邻近西雅图)。以研发、制造、授权和提供广泛的电脑软件服务业务为主。

    8 引用 • 44 回帖
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    51 引用 • 226 回帖