pyflink flat_map

这篇具有很好参考价值的文章主要介绍了pyflink flat_map。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_test.txt')

# 对每行数据添加字符串 'aaaa'

class LogEvent:
    buss_seq = None
    message = None
    
    def __init__(self, bus_seq,message,index_name):
        self.bus_seq = bus_seq
        self.message = message
        self.index_name= index_name

    def to_dict(self):
        return {
            "bus_seq": self.bus_seq,
            "message": self.message,
            "index_name" : self.index_name
        }
class MyMapFunction(FlatMapFunction):
   def open(self, runtime_context: RuntimeContext):
     pool = redis.ConnectionPool(host='127.0.0.1',port=6379,max_connections=50)
     self.r = redis.Redis(connection_pool=pool)
   def close(self):
     self.r.close()

   def flat_map(self,line):
      process_id='';
      bus_seq=''
      if not line.startswith("ES"):
          return 
      if '<Serial>' in line:
         try:
             pat=re.compile(r"<Serial>(\d+)</Serial>")
             bus_seq=pat.search(line).group(1)
             process_id=line.split()[1]
             self.r.set(process_id,bus_seq)
         except:
             return 
      process_id=line.split()[1]
      if not len(process_id)==6 :
          process_id=line.split()[2]
      try: 
          bus_seq=self.r.get(process_id).decode('UTF-8') 
      except:
          return 
      #self.r.delete(process_id)
      #log_event = LogEvent(bus_seq.decode('UTF-8'),line)
      #LogEvent['bus_seq']=bus_seq.decode('UTF-8')
      try:
         datetime.now().strftime("%Y-%m-%d")
         index_name='flink-test'+date_str
         log_event=LogEvent(bus_seq,line,index_name)
      except:
          return 
      yield log_event.to_dict()
    
env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
date_str = datetime.now().strftime("%Y-%m-%d")
es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07')) \
    .set_hosts(['127.0.0.1:9200']) \
    .build()
      

#new_stream = data_stream.map(MyMapFunction()).sink_to(es7_sink)
new_stream = data_stream.flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
#new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING()))
#new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING()))
# 输出到控制台
#new_stream.print()

# 执行任务
env.execute('Add "bus_seq" to each line')文章来源地址https://www.toymoban.com/news/detail-475726.html

到了这里,关于pyflink flat_map的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Visual Studio Code Import “flask“ could not be resolvedPylance

    首先,我使用的是 Visual Studio Code ,来编辑 python 代码, 1.查看现象是否与我遇到的情况相同, flask 下面 有 波浪线 在 Visual Studio Code 的 TERMINAL 窗口里 运行 :pip --version 查看 pip 安装的包 被放在 python 3.10 里,如下图所示 而 我的 Visual Studio Code 使用的 python 版本是: 3.11.1 所以

    2023年04月14日
    浏览(35)
  • 51.pyinstaller打包后,打开exe程序提示SyntaxError: Non-UTF-8 code starting with '\x90' in file的问题

    问题: 最后开发了一款小工具,然后确定一切测试没有问题,想通过pyinstaller将其打包成exe,像类似的打包以前也经常打包的,复杂一点的也都是打包成功的,但这里感觉程序很简单,打包居然出现了以下错误。 我的python版本是3.8.9,然后pyinstaller版本是5.9.0,不知道会不会是

    2024年02月11日
    浏览(32)
  • 数组扁平化flat方法的多种实现

    flat() 执行效果: toString() 注意:map()处理空值的问题 执行效果: JSON.stringify() 注意:map()处理空值的问题 执行效果: while() {} 执行效果: reduce() 对于一个数组,使用 reduce() 方法遍历其中的每个元素,如果元素是一个数组,则递归调用扁平化函数;否则,将元素添加到结果数

    2024年02月13日
    浏览(34)
  • JavaScript 数组展平方法: flat() 和 flatMap()

    从 ES2019 中开始引入了一种扁平化数组的新方法,可以展平任何深度的数组。 flat() 方法创建一个新数组,其中所有子数组元素以递归方式连接到特定深度。 语法:array.flat(depth) array : flat() 方法将在给定的数组中使用。 depth :可选参数,指定展平的深度,默认情况下,深度

    2024年02月09日
    浏览(33)
  • python flat_map字典写入es

    [root@master pyflink]# cat test.txt  aaaaa 111111 bbbbb 222222 ccccc 333333 ddddd 444444 eeeee 555555 [root@master pyflink]# cat test.py  # -*- coding: utf-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction from abc import ABC, abstractmethod from pyfl

    2024年02月09日
    浏览(73)
  • vue3+vant Failed to resolve import “E:/code3/jianmu-user-yd/node_modules/vant/lib/vant/es/icon/style

    Failed to resolve import \\\"E:/code3/jianmu-user-yd/node_modules/vant/lib/vant/es/icon/style\\\" from \\\"srcmain.js\\\". Does the file exist? 解决办法:在vite.config.js中配置:

    2024年02月06日
    浏览(33)
  • 论文阅读:DLME = Deep Local-flatness Manifold Embedding

    Author: Zelin Zang, Siyuan Li, Di Wu and Stan Z Li. 1-4: Westlake University 流形学习(ML, Manifold learning)旨在从高维数据中识别低维结构和嵌入,然而我们发现现有工作在采样不足的现实数据集上效果不佳。一般的ML方法对数据结构进行建模然后构造一个低维embedding,但是采样不足的现实数据

    2024年02月09日
    浏览(73)
  • Linux 内存模型(Memory: the flat, the discontiguous, and the sparse)

    计算机系统中的物理内存是一种宝贵的资源,因此人们付出了大量的努力来有效地管理它。由于现代系统上存储器体系结构的复杂性,这项任务变得更加困难。有几个抽象层处理如何布局物理内存的细节;其中一个简单地称为“内存模型”。内核中支持三个模型,但其中一个

    2024年02月14日
    浏览(30)
  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(30)
  • PyFlink核心知识点

    四层 说明 备注 SteamGraph 代码生成的最初的图 表示程序的拓扑结构 JobGraph 将多个符合条件的节点,链接为一个节点 可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗 ExecutionGraph JobGraph的并行化版本 是调度层最核心的数据结构 PhysicalGraph JobManager根据ExecutionGra

    2024年04月27日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包