0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

这篇具有很好参考价值的文章主要介绍了0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。
select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sink
    my_sink_ddl = """
        CREATE TABLE WordsCountTableSink (
            `word` STRING,
            `count` BIGINT
        ) WITH (
            'connector' = 'print'
        );
    """
    t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insert
    my_select_ddl = """
        insert into WordsCountTableSink
        select word, count(1) as `count`
        from source
        group by word
    """
    t_env.execute_sql(my_select_ddl).wait()

完整代码

import argparse
import logging
import sys

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)

def word_count(input_path):
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)

    # define the source
    my_source_ddl = """
            create table source (
                word STRING
            ) with (
                'connector' = 'filesystem',
                'format' = 'csv',
                'path' = '{}'
            )
        """.format(input_path)
    t_env.execute_sql(my_source_ddl).print()
    tab = t_env.from_path('source')

    # define the sink
    my_sink_ddl = """
        CREATE TABLE WordsCountTableSink (
            `word` STRING,
            `count` BIGINT
        ) WITH (
            'connector' = 'print'
        );
    """
    t_env.execute_sql(my_sink_ddl).print()
    
    # execute insert
    my_select_ddl = """
        insert into WordsCountTableSink
        select word, count(1) as `count`
        from source
        group by word
    """
    t_env.execute_sql(my_select_ddl).wait()

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容文章来源地址https://www.toymoban.com/news/detail-721443.html

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",

到了这里,关于0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——用户自定义函数之UDTF

    在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF 我们对比下UDF和UDTF 可以发现: UDF比UDTF多了func_type和udf_type参数; UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str]; 特别是最后一点,可以认为是UDF和UD

    2024年02月07日
    浏览(28)
  • 0基础学习PyFlink——用户自定义函数之UDTAF

    在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。 UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样 可以返回任

    2024年02月08日
    浏览(39)
  • 0基础学习PyFlink——事件时间和运行时间的窗口

    在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTime Windows)作为窗口的参考时间: 而得到的结果也是不稳定的。 这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。 为了让结果稳

    2024年02月05日
    浏览(42)
  • 0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

    在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分 那么有没有办法让上图中(B,2)和(D,5)也会被计算呢? 这就可以使用本节介绍的时间滚动窗口。它不

    2024年02月06日
    浏览(32)
  • Explainable AI (XAI) 帮助机器学习模型理解外部世界,并找出影响预测结果的最重要因素

    作者:禅与计算机程序设计艺术 Explainable Artificial Intelligence (XAI)是一种通过可解释的方式来帮助机器学习系统理解自身运作方式,进而更好地被人类所理解的领域。其发展历史可以追溯到1987年IBM Watson团队发表的一篇文章《The Vision of AI: A Cognitive View of the Future》中提出的概

    2024年02月06日
    浏览(31)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(37)
  • 【Jetson目标检测SSD-MobileNet应用实例】(四)在Jetson上使用CSI摄像头进行视频推理并输出检测结果

    【Jetson目标检测SSD-MobileNet应用实例】(一)win11中配置SSD-MobileNet网络训练境搭建 【Jetson目标检测SSD-MobileNet应用实例】(二)制作自己的数据集–数据集的采集、标注、预处理 【Jetson目标检测SSD-MobileNet应用实例】(三)训练自己的检测模型和推理测试 关于Jetson nano或者NX上的

    2023年04月10日
    浏览(28)
  • 写出以下代码的输出结果

    1,输出执行结果 2,闭包作用域相关题 3,闭包题,写出以下代码的输出结果 4,面向对象面试题: 写出执行结果 5,输出结果: 6,写出代码的执行结果

    2024年02月13日
    浏览(41)
  • 单片机学习 11-中断系统(定时器中断+外部中断)

    ​ 中断是为使单片机具有对外部或内部随机发生的事件实时处理而设置的,中断功能的存在,很大程度上提高了单片机处理外部或内部事件的能力。它也是单片机最重要的功能之一,是我们学习单片机必须要掌握的。很多初学者被困在中断中,学了很久仍然不知道中断究竟是

    2024年02月05日
    浏览(38)
  • 【机器学习】全网最全模型评价指标(性能指标、YOLOv5训练结果分析、轻量化指标、混淆矩阵详解)【基础收藏】

    在目标检测任务中,我们常用的评价指标一般有两种,一种是使用Pascal VOC的评价指标,一种是更加严格的COCO评价指标,一般后者会更常用点。 如何判断一个检测结果是否正确。目前最常用的方式就是去计算检测框与真实框的IOU,然后 根据IOU去判别两个框是否匹配 。 常见指

    2024年02月04日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包