pyflink中的状态ttl设置

这篇具有很好参考价值的文章主要介绍了pyflink中的状态ttl设置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# -*- coding: gbk -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
import json
import re
import logging
import sys
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, TypeInformation
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from  pyflink.datastream.connectors import  DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
from datetime import datetime
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s")
logger = logging.getLogger(__name__)

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
#env.add_jars("file:///root/pyflink/flink-sql-connector-kafka_2.11-1.14.4.jar")

TEST_KAFKA_SERVERS = "1.1.101.39:9092,1.1.101.40:9092,1.1.101.42:9092"
TEST_KAFKA_TOPIC = "elink-midsys-flink-topic"
TEST_GROUP_ID = "pyflink_elink_midsys"


def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
    }
    return properties


properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)


class LogEvent:
    # id表示全局流水
    id = None
    # source ip
    source = None
    #进程名字
    fileTag= None
    #文件名字
    fileName = None
    #场景码
    serviceCode = None
    #系统名字
    appName= None
    #时间戳
    timestamp = None
    #偏移量
    offset = None

    def __init__(self, id,source, fileTag,fileName, serviceCode,appName,timestamp,offset,message,index_name):
        self.id=id
        self.source = source
        self.fileTag = fileTag
        self.fileName = fileName
        self.serviceCode = serviceCode
        self.appName = appName
        self.timestamp= timestamp
        self.offset = offset
        self.message = message
        self.index_name = index_name

    def to_dict(self):
        return {
            "id": str(self.id),
            "source": str(self.source),
            "fileTag": str(self.fileTag),
            "fileName":str(self.fileName),
            "serviceCode":str(self.serviceCode),
            "appName":str(self.appName),
            "timestamp":self.timestamp,
            "offset":str(self.offset),
            "message":self.message,
            "index_name": self.index_name
        }


class MyMapFunction(FlatMapFunction):
    def open(self, runtime_context: RuntimeContext):
         ttl_config = StateTtlConfig \
            .new_builder(Time.seconds(120)) \
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
            .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
            .build()
         desciption_map=MapStateDescriptor('process_id_map_bus_seq', Types.STRING(), Types.STRING())
         desciption_map.enable_time_to_live(ttl_config)
         self.process_id_to_bus_seq = runtime_context.get_map_state(desciption_map)

    def flat_map(self, raw_message):
        id = ''
        source =''
        fileTag =''
        fileName =''
        serviceCode =''
        appName =''
        timestamp =''
        process_id = ''
        offset =''
        message =''
        unique_key =''
        try:
           raw_message = raw_message.replace("\n", "")
           #print(raw_message)
           out=json.loads(raw_message)
           message = out['message']
           source = out['source']
           fileTag = out['file_tag']
           serviceCode='00000'
           appName=out['app_name']
           timestamp=str(out.get('time_nano'))
           offset=out.get('offset')
           fileName=out.get('file_name')
           pattern = r".*?接收数据.*?\d{26}"
           matchObj = re.match(pattern, message)
        except:
             #logger.info('1111111111111111111111111111111')
             return
        if matchObj:
            try:
                if  self.process_id_to_bus_seq.contains(unique_key):
                   self.process_id_to_bus_seq.remove(unique_key)
                pat = re.compile(r".*?接收数据.*?(\d{26}).*?")
                bus_seq = pat.search(message).group(1)
                process_id = message.split()[1]
                unique_key=source+'_'+ appName +'_'+ fileTag +'_'+str(process_id)
                self.process_id_to_bus_seq.put(unique_key, bus_seq)
            except:
                #print('ValueError:', e)
                #logger.info('22222222222222222222222222222222')
                return
        try:         
            process_id = message.split()[1]
            unique_key=source+'_'+ appName +'_'+ fileTag +'_'+str(process_id)
        except:
            #print('ValueError:', e)
            #logger.info('333333333333333333333')
            return
        try:
            bus_seq = self.process_id_to_bus_seq.get(unique_key)
        except:
            return
        if not bus_seq:
            bus_seq = '0'
        id=bus_seq
        # self.r.delete(process_id)
        # log_event = LogEvent(bus_seq.decode('UTF-8'),message)
        # LogEvent['bus_seq']=bus_seq.decode('UTF-8')
        date_str = datetime.now().strftime("%Y%m%d")
        index_name = 'flink-log-elink-midsys-'+ str(date_str)
        try:
            log_event = LogEvent(id,source, fileTag,fileName, serviceCode,appName,timestamp,offset,message,index_name)
        except:
            return
        #print(log_event.to_dict())
   
        yield log_event.to_dict()
     


data_stream = env.add_source(
    FlinkKafkaConsumer(topics=TEST_KAFKA_TOPIC,
        properties=properties,
        deserialization_schema=SimpleStringSchema()) \
        .set_commit_offsets_on_checkpoints(True) \
        .set_start_from_latest()
).name(f"消费{TEST_KAFKA_TOPIC}主题数据")

#env.add_jars("file:///root/pyflink/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")

# .set_hosts(['1.1.101.32:9200','1.1.101.33:9200','1.1.101.38:9200']) \
es_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
        .set_emitter(ElasticsearchEmitter.dynamic_index('index_name')) \
        .set_hosts(['1.1.101.32:9200','1.1.101.33:9200','1.1.101.38:9200']) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .set_bulk_flush_max_actions(100) \
        .set_bulk_flush_interval(1000) \
        .set_connection_request_timeout(30000) \
        .set_connection_timeout(31000) \
        .set_socket_timeout(32000) \
        .build()


def get_line_key(line):
    message = ''
    try:
        message = line.replace("\n", "")
        source = json.loads(message)['source']
    except:
        source = '999999'
    return source

data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(), Types.STRING())).set_parallelism(2).sink_to(es_sink).set_parallelism(3)
#data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(), Types.STRING())).print()

# 执行任务
env.execute('xxx')
 文章来源地址https://www.toymoban.com/news/detail-543951.html

到了这里,关于pyflink中的状态ttl设置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(38)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(37)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(52)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(42)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(42)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(41)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(40)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(40)
  • 大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)

           编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_ti

    2024年03月24日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包