pylink消费kafka写入ES

这篇具有很好参考价值的文章主要介绍了pylink消费kafka写入ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, \
    FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")

TEST_KAFKA_SERVERS = "127.0.0.1:9092"
TEST_KAFKA_TOPIC = "topic_elink"
TEST_GROUP_ID = "pyflink_group"


def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
    }
    return properties


properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)


class LogEvent:
    # id表示全局流水
    id = None
    # source ip
    source = None
    #进程名字
    fileTag= None
    #文件名字
    fileName = None
    #场景码
    serviceCode = None
    #系统名字
    appName= None
    #时间戳
    timestamp = None
    #偏移量
    offset = None

    def __init__(self, id,source, fileTag,fileName, serviceCode,appName,timestamp,offset,message,index_name):
        self.id=id
        self.source = source
        self.fileTag = fileTag
        self.fileName = fileName
        self.serviceCode = serviceCode
        self.appName = appName
        self.timestamp= timestamp
        self.offset = offset
        self.message = message
        self.index_name = index_name

    def to_dict(self):
        return {
            "id": str(self.id),
            "source": str(self.source),
            "fileTag": str(self.fileTag),
            "fileName":str(self.fileName),
            "serviceCode":str(self.serviceCode),
            "appName":str(self.appName),
            "timestamp":str(self.timestamp),
            "offset":str(self.offset),
            "message":self.message,
            "index_name": self.index_name
        }


class MyMapFunction(FlatMapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.process_id_to_bus_seq = runtime_context.get_map_state(
            MapStateDescriptor('process_id_map_bus_seq', Types.STRING(), Types.STRING()))

    def close(self):
        pass

    def flat_map(self, raw_message):
        id = ''
        source=''
        fileTag=''
        fileName=''
        serviceCode=''
        appName=''
        timestamp=''
        process_id = ''
        offset=''
        message=''
        raw_message = raw_message.replace("\n", "")
        out=json.loads(raw_message)
        message = out['message']
        source = out['source']
        fileTag = out['file_tag']
        serviceCode=''
        appName=out['app_name']
        timestamp=out['@timestamp']
        offset=out['log']['offset']
        fileName=out['log']['file']['path']


        pattern = r".*?接收数据.*?\d{26}"
        matchObj = re.match(pattern, message)
        if matchObj:
            try:
                pat = re.compile(r".*?接收数据.*?(\d{26}).*?")
                bus_seq = pat.search(message).group(1)
                process_id = message.split()[1]
                self.process_id_to_bus_seq.put(process_id, bus_seq)
            except:
                return
        process_id = message.split()[1]
        bus_seq = self.process_id_to_bus_seq.get(process_id)
        if not bus_seq:
            bus_seq = '0'
        id=bus_seq
        # self.r.delete(process_id)
        # log_event = LogEvent(bus_seq.decode('UTF-8'),message)
        # LogEvent['bus_seq']=bus_seq.decode('UTF-8')
        date_str = datetime.now().strftime("%Y-%m-%d")
        index_name = 'flink-log-elink-' + date_str
        try:
            log_event = LogEvent(id,source, fileTag,fileName, serviceCode,appName,timestamp,offset,message,index_name)
        except:
            return
        #print(log_event.to_dict())
        yield log_event.to_dict()


data_stream = env.add_source(
    FlinkKafkaConsumer(topics=TEST_KAFKA_TOPIC,
                       properties=properties,
                       deserialization_schema=SimpleStringSchema()) \
        .set_commit_offsets_on_checkpoints(True) \
        .set_start_from_latest()
).name(f"消费{TEST_KAFKA_TOPIC}主题数据")

env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")

es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.dynamic_index('index_name')) \
    .set_hosts(['127.0.0.1:9200']) \
    .build()


def get_line_key(line):
    message = ''
    message = line.replace("\n", "")
    line = json.loads(message)['message']
    try:
        process_id = line.split()[1]
    except:
        process_id = '9999'
    return process_id


# data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
data_stream.key_by(get_line_key).flat_map(MyMapFunction(),
                                          output_type=Types.MAP(Types.STRING(), Types.STRING())).sink_to(es7_sink)

# 执行任务
env.execute('Add "bus_seq" to each line')


 文章来源地址https://www.toymoban.com/news/detail-481780.html

到了这里,关于pylink消费kafka写入ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(40)
  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(44)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(39)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • pyflink map 字典写入ES

    [root@master pyflink]# cat test.py  # -*- coding: utf-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction from abc import ABC, abstractmethod from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import  MapFunction

    2024年02月10日
    浏览(39)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(40)
  • pyflink kafka es

    # -*- coding: utf-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction from abc import ABC, abstractmethod from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunctio

    2024年02月08日
    浏览(29)
  • Flink DataStream之从Kafka读数据

    搭建Kafka 参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客  启动zookeeper 启动kafka 查看进程  创建topic 查看topic列表 导入pom依赖 新建类 启动程序 在终端向kafka生产数据,同时观察程序控制台flink的读取情况  如图说明flink从kafka成功读取数据。

    2024年02月13日
    浏览(52)
  • C# 快速写入日志 不卡线程 生产者 消费者模式

    有这样一种场景需求,就是某个方法,对耗时要求很高,但是又要记录日志到数据库便于分析,由于访问数据库基本都要几十毫秒,可在方法里写入BlockingCollection,由另外的线程写入数据库。 可以看到,在我的机子上面,1ms写入了43条日志。

    2024年02月15日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包