pyflink map函数例子

这篇具有很好参考价值的文章主要介绍了pyflink map函数例子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[root@master pyflink]# cat k102.py 
from pyflink.datastream import StreamExecutionEnvironment

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/test.log')

# 对每行数据添加字符串 'aaaa'
new_stream = data_stream.map(lambda x:  'aaaa->'+x)

# 输出到控制台
new_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')

[root@master pyflink]# python k102.py 
4> aaaa->3333333333333333,ccccccccccccc
2> aaaa->222222233333333,hhhhhhhhhhhhh
3> aaaa->1111111111111111,aaaaaaaaaaaaaa
1> aaaa->4444444444444444,eeeeeeeeeeeee
4> aaaa->1111111111111111,ddddddddddddd
1> aaaa->111111122222222,ffffffffffffff
3> aaaa->2222222222222222,bbbbbbbbbbbbbb
[root@master pyflink]# cat test.log 
1111111111111111,aaaaaaaaaaaaaa
2222222222222222,bbbbbbbbbbbbbb
3333333333333333,ccccccccccccc
1111111111111111,ddddddddddddd
4444444444444444,eeeeeeeeeeeee
111111122222222,ffffffffffffff
222222233333333,hhhhhhhhhhhhh
[root@master pyflink]# python k102.py 
2> aaaa->1111111111111111,aaaaaaaaaaaaaa
2> aaaa->2222222222222222,bbbbbbbbbbbbbb
3> aaaa->3333333333333333,ccccccccccccc
3> aaaa->1111111111111111,ddddddddddddd
4> aaaa->4444444444444444,eeeeeeeeeeeee
4> aaaa->111111122222222,ffffffffffffff
1> aaaa->222222233333333,hhhhhhhhhhhhh

在上述代码中,map() 函数接受一个 lambda 表达式作为参数,用于对每行数据进行处理。lambda 表达式中的 

x 变量表示每行数据,x + 'aaaa' 表示将每行数据末尾添加字符串 'aaaa'。

最后通过 print() 方法将处理后的数据输出到控制台。

对数据流应用映射转换。转换调用的MapFunction

数据流的每个元素。每个MapFunction调用只返回一个元素。文章来源地址https://www.toymoban.com/news/detail-467896.html

到了这里,关于pyflink map函数例子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——用户自定义函数之UDF

    PyFlink中关于用户定义方法有: UDF:用户自定义函数。 UDTF:用户自定义表值函数。 UDAF:用户自定义聚合函数。 UDTAF:用户自定义表值聚合函数。 这些字母可以拆解如下: UD表示User Defined(用户自定义); F表示Function(方法); T表示Table(表); A表示Aggregate(聚合); Aggr

    2024年02月08日
    浏览(35)
  • 0基础学习PyFlink——用户自定义函数之UDTF

    在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF 我们对比下UDF和UDTF 可以发现: UDF比UDTF多了func_type和udf_type参数; UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str]; 特别是最后一点,可以认为是UDF和UD

    2024年02月07日
    浏览(28)
  • 0基础学习PyFlink——用户自定义函数之UDTAF

    在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。 UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样 可以返回任

    2024年02月08日
    浏览(39)
  • Elasticsearch(ES)(版本7.x)创建索引报错:Faile to parse mapping [_doc] Root mapping definition has unsupported

    Elasticsearch(ES)(版本7.x)创建索引报错: 因es7.0版本之后不再需要type doc,把上面语句中的doc删掉,再运行就可以创建索引了。 如果还需要type doc则需要增加include_type_name=true即可解决。 示例:

    2024年02月16日
    浏览(32)
  • 矩阵指数函数的计算及例子

            最近在学习矩阵指数函数相关的知识,遇到了一些比较有意思的计算方法,写出来供大家一同学习。         矩阵指数函数的计算是线性系统学习过程中非常重要的部分,下面我将给出四种常见的计算矩阵指数函的方法: (1)定义法: 对于给定的矩阵,计算的

    2024年02月03日
    浏览(28)
  • c语言,函数的址传递例子

    编码如下: #include stdio.h void swap(int* x,int* y ){ int tmp; tmp=*x; *x=*y; *y=tmp ; }; int main() { int a=4; int b=5; printf(\\\"befern\\\"); printf(\\\"a=%dn\\\",a); printf(\\\"b=%dn\\\",b); swap(a,b); printf(\\\"aftern\\\"); printf(\\\"a=%dn\\\",a); printf(\\\"b=%dn\\\",b); return 0; } 输出的结果: befer a=4 b=5 after a=5 b=4

    2024年02月06日
    浏览(30)
  • Mysql 创建存储过程和函数及各种例子

    1.1.1 语法结构 无参的存储过程 有参数的存储过程 删除存储过程: 1.1.2 简单解释 部分语法简单介绍: delimiter $$ $$ 是分隔符,用其他符号也行,比如一个 $ 或者 // 等 定义变量: DECLARE 例子: @符号 使用 SET 直接赋值变量,变量名以 @ 开头:如: set @dogNum = 1002; 其他使用例子如

    2024年02月06日
    浏览(28)
  • py的函数讲解

    前言:本章节我们来讲函数,主播略微感觉到有点小难,友友们需要认真看 目录 一.初始函数 1.1关于函数 1.2举例 1.3小结 二.函数的基础语法 2.1关于函数的语法 2.2举例 2.3小结 三.函数的参数 3.1关于函数的参数 3.2举例 3.3小结 四.函数的返回值定义语法 4.1关于函数返回值的定义

    2024年01月22日
    浏览(33)
  • yolov5数据读取报错:train: No labels found in /root/yolov5-master/VOCData/dataSet_path/train.cache

    这个问题是由于路径设置错误导致的,以下几个文件的路径都要保持一致。 (1)yolov5-master/VOCData/xml_to_yolo.py 这个文件是将xml格式的label转为txt格式,这个地方建议直接改为绝对路径。  (2)yolov5-mastertrain.py train文件里面的ROOT也需要改为yolov5-master所在路径,后续代码都使用

    2024年02月13日
    浏览(34)
  • ElasticSearch创建索引报错:mapper_parsing_exception Root mapping definition has unsupported parameters

    ElasticSearch版本号:5.6.14,这个错误和ES版本有一定的关系,还是先交代下版本号,免得有的读者根据我的方法操作后无效 错误翻译: mapper_parsing_exception :映射解析异常 Root mapping definition has unsupported parameters :根映射定义包含不受支持的参数 错误映射语句1: 错误映射语句

    2024年02月11日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包