pyflink读取文件并行度问题

这篇具有很好参考价值的文章主要介绍了pyflink读取文件并行度问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[root@master pyflink]# cat /root/pyflink/test.log 
111111111   aaaaa
222222222   bbbbb
111111111   ccccc
222222222   ddddd
333333333   eeeee
111111111   fffff
111111111   ggggg
111111111   eeeee
111111111   hhhhh
111111111   iiiii
111111111   jjjjj
222222222   eeeee


[root@master pyflink]# cat t107.py 
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds1 = env.read_text_file("/root/pyflink/test.log",'utf-8')
ds1.print()
env.execute()
[root@master pyflink]# python3 t107.py 
111111111   aaaaa
222222222   bbbbb
111111111   ccccc
222222222   ddddd
333333333   eeeee
111111111   fffff
111111111   ggggg
111111111   eeeee
111111111   hhhhh
111111111   iiiii
111111111   jjjjj
222222222   eeeee


并发读取乱序:

[root@master pyflink]# cat t107.py 
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
ds1 = env.read_text_file("/root/pyflink/test.log",'utf-8')
ds1.print()
env.execute()
[root@master pyflink]# python3 t107.py 
3> 111111111   iiiii
3> 111111111   jjjjj
3> 222222222   eeeee
2> 111111111   fffff
2> 111111111   ggggg
1> 111111111   aaaaa
1> 222222222   bbbbb
2> 111111111   eeeee
2> 111111111   hhhhh
1> 111111111   ccccc
1> 222222222   ddddd
1> 333333333   eeeee文章来源地址https://www.toymoban.com/news/detail-454579.html

到了这里,关于pyflink读取文件并行度问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 启动nginx报错:invalid number of arguments in “root“ directive in,是文件路径书写问题

    无法启动nginx,错误日志提示如下: 原因: 这个一个比较常见的问题,配置文件里面应该有路径有问题 注意在:这里如果路径名称有空格要用引号引起来,否则会被当成2个路径解析。 如上,提示nginx.conf文件的208行, 改成这样就没事了:

    2024年02月09日
    浏览(34)
  • 记录springboot在k8s下无法读取文件问题

    //加载配置文件 File file = ResourceUtils.getFile(\\\"classpath:/template/job.yaml\\\");  /对象映射  V1Job v1Job = (V1Job) Yaml.load(file); 开发的时候使用上面的方法可以读取文件数据,但是部署到k8s容器中之后,读取文件出现报错,找不到文件。于是改成了下面的写法: 成功读取文件数据。上述两种

    2024年02月15日
    浏览(33)
  • Python读取excel文件往Elasticsearch数据插入时遇到的问题

    背景:需要完成一个功能,使用python读取一个excel文件进行读取数据,然后将这些数据直接保存到Elasticsearch中。 用到的工具:python、Elasticsearch 一、问题描述 在将项目部署到甲方的时候,出现用户导入文件无法进行正常插入到Elasticsearch中的情况,当时看的服务器的日志,报

    2023年04月09日
    浏览(34)
  • STM32H7并行读取AD7606数据以及片内AD值不准解决办法

    先了解一下AD7606,16位,单电源,200k采样率,8路,除了贵没有其他缺点,数据相当的稳,一个5V供电,不用运放的情况下采集电压精度可以达到1mv,非常Nice 与单片机相连 单片机 调用代码 测试发现AD采集到的电压要远小于实际电压,H7的AD还是16位的,不能这么拉跨吧,在网上

    2024年02月11日
    浏览(31)
  • 解决C#报“MSB3088 未能读取状态文件*.csprojAssemblyReference.cache“问题

        今天在使用vscode软件+C#插件,编译.cs文件时,发现如下warning: 图(1) C#报cache没有更新     出现该warning的原因: 当前.cs文件修改了,但是其缓存文件*.csprojAssemblyReference.cache没有更新,需要重新清理一下工程,再编译、运行即可。     命令如下:     或者手动在vs里

    2024年02月11日
    浏览(35)
  • ADB读取和备份安卓应用数据(无Root)

    某一个特殊设备上的APP白屏无法打开,需要将数据库数据保留下来并导出,研究了几个方法最后得以获取数据。 进入shell, 执行run-as + 包名 ,就可以直接以root权限进入该应用的沙盒中查看包括数据库、xml、各种信息文件。接下来可以通过pull或者copy命令将需要的数据转移至

    2024年02月04日
    浏览(25)
  • 【昕宝爸爸小模块】深入浅出之针对大Excel做文件读取问题

    ➡️博客首页       https://blog.csdn.net/Java_Yangxiaoyuan        欢迎优秀的你👍点赞、🗂️收藏、加❤️关注哦。        本文章CSDN首发,欢迎转载,要注明出处哦!        先感谢优秀的你能认真的看完本文,有问题欢迎评论区交流,都会认真回复! 在POI中,提供

    2024年01月18日
    浏览(36)
  • 记录开发环境docker上的一次springboot无法读取更新的配置文件的问题

    背景:一般开发环境的管理不是很严格,当对代码进行一些组件的添加时,往往需要修改spring的配置文件,有的时候为了保险起见,回预先备份原本的配置文件,我采取在./config中创建了一个名为bak-日期的目录,将原本的配置文件mv到该目录下,将新的配置文件移到config目录

    2024年02月11日
    浏览(38)
  • U盘重装MAC OS遇到 无法识别文件系统和磁盘无法读取问题

    一台windows的电脑,一个差不多够大的U盘我这里差不多用了13G左右。 .dmg文件好难搞,不是难下,是下载转载后无法识别,所以很需要 带引导的os文件. 一个windows下dmg镜像恢复工具,这里用的是TransMac 10.4 ,没有注册码没事,有15天体验期。 1.插入U盘或其他存储设备 2.以管理员

    2024年02月21日
    浏览(28)
  • go获取文件md5后接着读取file对象EOF的问题记录

    目录 背景 分析 第一步:读取文件,获得file对象 第二步:获取文件md5 第三步:获取到md5后继续使用(读取)file对象,发现一次就读完了(EOF),但实际一次不可能读完 代码如下: 如上代码中,在获得该文件的md5后,file对象此时已指向文件末尾,因此接着读取必然是EOF直接

    2024年01月19日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包