pyflink get_list_state

这篇具有很好参考价值的文章主要介绍了pyflink get_list_state。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
import json
import re
import logging
import sys
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor,ListStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, TypeInformation
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from  pyflink.datastream.connectors import  DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
from datetime import datetime


logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s")
logger = logging.getLogger(__name__)

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
#env.add_jars("file:///root/pyflink/flink-sql-connector-kafka_2.11-1.14.4.jar")


## kafka


TEST_KAFKA_SERVERS = "1.1.223.1:9092,1.1.223.2:9092,1.1.223.3:9092"
TEST_KAFKA_TOPIC = "new-clpf-gaps-topic"
TEST_GROUP_ID =  "clpf_gaps_group_test"


def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
        "max.poll.records": "150"
    }
    return properties


properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)


class LogEvent:
    # id表示全局流水
    id = None
    # source ip
    source = None
    #进程名字
    fileTag= None
    #文件名字
    fileName = None
    #场景码
    serviceCode = None
    #系统名字
    appName= None
    #时间戳
    timestamp = None
    #偏移量
    offset = None

    def __init__(self, id,source, fileTag,fileName, serviceCode,appName,timestamp,offset,message,index_name):
        self.id=id
        self.source = source
        self.fileTag = fileTag
        self.fileName = fileName
        self.serviceCode = serviceCode
        self.appName = appName
        self.timestamp= timestamp
        self.offset = offset
        self.message = message
        self.index_name = index_name

    def to_dict(self):
        return {
            "id": str(self.id),
            "source": str(self.source),
            "fileTag": str(self.fileTag),
            "fileName":str(self.fileName),
            "serviceCode":str(self.serviceCode),
            "appName":str(self.appName),
            "timestamp":self.timestamp,
            "offset":str(self.offset),
            "message":self.message,
            "index_name": self.index_name
        }

    def get_source(self):
         return self.source


class MyMapFunction(FlatMapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.process_id_to_bus_seq = runtime_context.get_map_state(MapStateDescriptor('process_id_map_bus_seq', Types.STRING(), Types.STRING()))
        self.gapslist=runtime_context.get_list_state(ListStateDescriptor('process_list', Types.LIST(Types.STRING())))

    def flat_map(self, raw_message):
        id = ''
        source =''
        fileTag =''
        fileName =''
        serviceCode =''
        appName =''
        timestamp =''
        process_id = ''
        offset =''
        message =''
        unique_key =''
        index_name = ''
        bus_seq = ''
        try:
           raw_message = raw_message.replace("\n", "")
           #print(raw_message)
           out=json.loads(raw_message)
           message = out['message']
           source = out['source']
           fileTag = out['file_tag']
           serviceCode='00000'
           appName=out['app_name']
           timestamp=str(out.get('time_nano'))
           offset=out.get('offset')
           fileName=out.get('file_name')
           # pattern = r".*?接收数据.*?\d{26}"
           # matchObj = re.match(pattern, message)
        except:
             #logger.info('1111111111111111111111111111111')
               return
        date_str = datetime.now().strftime("%Y%m%d")
        index_name = 'flink-log-clpf-gapstest-' + str(date_str)
        if '开始分级日志' in message:
            self.process_id_to_bus_seq.clear()
            self.gapslist.clear()
            # 记录加到缓存
            self.gapslist.add([str(message)])
            return
        self.has_start='0'
        for x in self.gapslist.get():
             if  '开始分级日志' in x[0]:
                 self.has_start='1'
                 break;
        #<BUSS_SEQ_NO>20601020230621010072249201</BUSS_SEQ_NO>
        #if  "</BUSS_SEQ_NO>" in message:
        if re.findall("\<BUSS_SEQ_NO\>\d+\<\/BUSS_SEQ_NO\>",message):
              pat = re.compile(r"\<BUSS_SEQ_NO\>(\d+)\<\/BUSS_SEQ_NO\>")
              bus_seq = pat.search(message).group(1)
              self.process_id_to_bus_seq.put('id', bus_seq)
              self.gapslist.add([str(message)])
              id=bus_seq
              for output_message in  self.gapslist.get():
                      log_event = LogEvent(id, source, fileTag, fileName, serviceCode, appName, timestamp, offset,output_message[0], index_name)
                      yield log_event.to_dict()
              self.gapslist.clear()
              self.has_start='0'
              #log_event = LogEvent(id, source, fileTag, fileName, serviceCode, appName, timestamp, offset,message, index_name)
              #yield log_event.to_dict()
              return
               
        if   self.has_start == '1':
             self.gapslist.add([str(message)])
             return
        id= self.process_id_to_bus_seq.get('id')
        if not id:
           id ='0'
        log_event = LogEvent(id, source, fileTag, fileName, serviceCode, appName, timestamp, offset, message, index_name)
        yield log_event.to_dict()


data_stream = env.add_source(
    FlinkKafkaConsumer(topics=TEST_KAFKA_TOPIC,
        properties=properties,
        deserialization_schema=SimpleStringSchema()) \
        .set_commit_offsets_on_checkpoints(True) \
        .set_start_from_latest()
).name(f"消费{TEST_KAFKA_TOPIC}主题数据")

#env.add_jars("file:///root/pyflink/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")

# .set_hosts(['1.1.101.32:9200','1.1.101.33:9200','1.1.101.38:9200']) \
es_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
        .set_emitter(ElasticsearchEmitter.dynamic_index('index_name')) \
        .set_hosts(['1.1.101.32:9200','1.1.101.33:9200','1.1.101.38:9200']) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .set_bulk_flush_interval(1000) \
        .set_connection_request_timeout(30000) \
        .set_connection_timeout(31000) \
        .set_socket_timeout(32000) \
        .build()

def get_line_key(line):
    message = ''
    try:
        message = line.replace("\n", "")
        source = json.loads(message)['source']
    except:
        source = '999999'
    return source


data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(), Types.STRING())).sink_to(es_sink).set_parallelism(3)
#data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(), Types.STRING())).print()
#data_stream.key_by(get_line_key).flat_map(MyMapFunction()).print()

# 执行任务
env.execute('flink_clpf_gaps_test')


 文章来源地址https://www.toymoban.com/news/detail-504472.html

到了这里,关于pyflink get_list_state的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • (一)SQL state [99999]; error code [17056]; 不支持的字符集 (在类路径中添加 orai18n.jar): ZHS16GBK; nested exception

    不支持的字符集 (在类路径中添加 orai18n.jar): ZHS16GBK (一)SQL state [99999]; error code [17056]; 不支持的字符集 (在类路径中添加 orai18n.jar): ZHS16GBK; nested exception (二)java.sql.SQLException: 不支持的字符集 (在类路径中添加 orai18n.jar): ZHS16GBK 目录 一、报错解答 1、报错 2、背景 3、原

    2024年02月14日
    浏览(31)
  • sentinel整合nacos鉴权403问题(Nacos get changed dataId error, code: 403)

    由于spring-cloud-starter-alibaba-sentinel-2.2.1.RELEASE所依赖的spring-cloud-alibaba-sentinel-datasource-2.2.1.RELEASE不支持nacos鉴权,需要升级spring-cloud-starter-alibaba-sentinel版本或升级spring-cloud-alibaba-sentinel-datasource至2.2.2.RELEASE或以上版本。推荐仅升级spring-cloud-alibaba-sentinel-datasource至2.2.2.RELEASE,对其

    2024年02月11日
    浏览(26)
  • go对象的创建和使用 orm map对象和List 时间 json get/post请求

    GORM 指南 | 入门指南 |《GORM 中文文档 v2》| Go 技术论坛 GoFrame gmap遍历hashmap listmap treemap使用技巧_Golang_脚本之家 HTTPClient-基本使用 - GoFrame (ZH)-Latest - GoFrame官网 - 类似PHP-Laravel, Java-SpringBoot的Go企业级开发框架 go语言http请求(一)_go http import-CSDN博客 GO 发起HTTP请求调用接口_go-h

    2024年04月11日
    浏览(27)
  • MAC安装mysqlclient失败,× Getting requirements to build wheel did not run successfully.│ exit code: 1╰─

    问题: 在使用django项目安装mysqlclient时出现以下错误: 解决方案: 1、先去登录brew官网: macOS(或 Linux)缺失的软件包的管理器 — Homebrew 复制以下代码,到mac终端运行: /bin/bash -c \\\"$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)\\\" 安装好后根据提示运行一下两端

    2024年02月14日
    浏览(29)
  • Nacos开启鉴权后读取不到配置文件,get data from Nacos error,dataId:http error, code=403,dataId=

    报错信息 解决办法 我开始没加鉴权,使用的是application.yml是可以的,加了之后要将application.yml换成bootstrap.yml。 spring cloud Alibaba组件版本依赖关系 2021.x 分支 适配 Spring Boot 2.4,Spring Cloud 2021.x 版本及以上的 Spring Cloud Alibaba 版本按从新到旧排列如下表(最新版本用*标记):

    2024年03月23日
    浏览(32)
  • Task ‘wrapper‘ not found in project ‘:app‘.* Try:Run gradle tasks to get a list of available task.

    浅记录一下今天在 gitee 导入 android 项目时产生的问题~ 提示的错误是:在项目中没有包装器任务 解决方式:在 build.gradle 中加上,重新 Try Again 即可!

    2024年02月16日
    浏览(31)
  • 记Kubernetes(k8s) 集群报错:FATA[0000] listing images: rpc error: code = Unavailable desc = connection err

    💖The Begin💖点点关注,收藏不迷路💖 》报错详解: 根据输出信息,看起来 crictl 工具在尝试列出容器镜像时遇到了连接问题。错误信息显示 crictl 默认尝试使用多个端点进行连接,但由于默认设置已被弃用,建议您手动设置端点。 crictl 尝试使用 /var/run/dockershim.sock 等端点进

    2024年04月16日
    浏览(22)
  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(29)
  • pyflink map函数例子

    [root@master pyflink]# cat k102.py  from pyflink.datastream import StreamExecutionEnvironment # 创建 StreamExecutionEnvironment 对象 env = StreamExecutionEnvironment.get_execution_environment() # 读取文件,创建 DataStream 对象 data_stream = env.read_text_file(\\\'/root/pyflink/test.log\\\') # 对每行数据添加字符串 \\\'aaaa\\\' new_stream = data_st

    2024年02月07日
    浏览(32)
  • PyFlink核心知识点

    四层 说明 备注 SteamGraph 代码生成的最初的图 表示程序的拓扑结构 JobGraph 将多个符合条件的节点,链接为一个节点 可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗 ExecutionGraph JobGraph的并行化版本 是调度层最核心的数据结构 PhysicalGraph JobManager根据ExecutionGra

    2024年04月27日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包