pyflink kafka es

这篇具有很好参考价值的文章主要介绍了pyflink kafka es。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema


import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")

TEST_KAFKA_SERVERS = "127.0.0.1:9092"
TEST_KAFKA_TOPIC = "topic_elink"
TEST_GROUP_ID = "pyflink_group"
def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
    }
    return properties
properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)


class LogEvent:
    buss_seq = None
    message = None
    index_name = None
    
    def __init__(self, bus_seq,message,index_name):
        self.bus_seq = bus_seq
        self.message = message
        self.index_name= index_name

    def to_dict(self):
        return {
            "bus_seq": self.bus_seq,
            "message": self.message,
            "index_name" : self.index_name
        }
   
class MyMapFunction(FlatMapFunction):
   def open(self, runtime_context: RuntimeContext):
       self.process_id_to_bus_seq = runtime_context.get_map_state(MapStateDescriptor('process_id', Types.STRING(), Types.STRING()))
      
   def close(self):
       pass

   def flat_map(self,line):
      bus_seq=''
      process_id=''
      message=''
      message = line.replace("\n", "")
      line = json.loads(message)['message']
      if not line.startswith("ES"):
          return 
      if '<Serial>' in line:
         try:
             pat=re.compile(r"<Serial>(\d+)</Serial>")
             bus_seq=pat.search(line).group(1)
             process_id=line.split()[1]
             self.process_id_to_bus_seq.put(process_id, bus_seq)
         except:
             return 
      process_id=line.split()[1]
      if not len(process_id)==6 :
          process_id=line.split()[2]
      bus_seq=self.process_id_to_bus_seq.get(process_id)
      if not bus_seq:
          bus_seq='0'
      #self.r.delete(process_id)
      #log_event = LogEvent(bus_seq.decode('UTF-8'),line)
      #LogEvent['bus_seq']=bus_seq.decode('UTF-8')
      date_str = datetime.now().strftime("%Y-%m-%d")
      index_name='flink-test'+date_str
      try:
         log_event=LogEvent(bus_seq,line,index_name)
      except:
          return 
      yield log_event.to_dict()
    
  
data_stream = env.add_source(
        FlinkKafkaConsumer(topics=TEST_KAFKA_TOPIC,
                           properties=properties,
                           deserialization_schema=SimpleStringSchema()) \
            .set_commit_offsets_on_checkpoints(True) \
            .set_start_from_latest()
    ).name(f"消费{TEST_KAFKA_TOPIC}主题数据")


env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")

es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.dynamic_index('index_name')) \
    .set_hosts(['127.0.0.1:9200']) \
    .build()
def get_line_key(line):
          message=''
          message = line.replace("\n", "")
          line = json.loads(message)['message']
          try:
              process_id=line.split()[1]
              if not len(process_id)==6 :
                  process_id=line.split()[2]
          except:
              process_id='9999'
          return    process_id 
#data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)


# 执行任务
env.execute('Add "bus_seq" to each line')文章来源地址https://www.toymoban.com/news/detail-480580.html

到了这里,关于pyflink kafka es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink DataStream之从Kafka读数据

    搭建Kafka 参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客  启动zookeeper 启动kafka 查看进程  创建topic 查看topic列表 导入pom依赖 新建类 启动程序 在终端向kafka生产数据,同时观察程序控制台flink的读取情况  如图说明flink从kafka成功读取数据。

    2024年02月13日
    浏览(40)
  • flink1.16使用消费/生产kafka之DataStream

    flink高级版本后,消费kafka数据一种是Datastream 一种之tableApi。 上官网 Kafka | Apache Flink 引入依赖 flink和kafka的连接器,里面内置了kafka-client 使用方法 很简单一目了然。 topic和partition  反序列化 其实就是实现接口 DeserializationSchema 的deserialize()方法 把byte转为你想要的类型。 起

    2024年02月16日
    浏览(32)
  • Visual Studio Code Import “flask“ could not be resolvedPylance

    首先,我使用的是 Visual Studio Code ,来编辑 python 代码, 1.查看现象是否与我遇到的情况相同, flask 下面 有 波浪线 在 Visual Studio Code 的 TERMINAL 窗口里 运行 :pip --version 查看 pip 安装的包 被放在 python 3.10 里,如下图所示 而 我的 Visual Studio Code 使用的 python 版本是: 3.11.1 所以

    2023年04月14日
    浏览(35)
  • 51.pyinstaller打包后,打开exe程序提示SyntaxError: Non-UTF-8 code starting with '\x90' in file的问题

    问题: 最后开发了一款小工具,然后确定一切测试没有问题,想通过pyinstaller将其打包成exe,像类似的打包以前也经常打包的,复杂一点的也都是打包成功的,但这里感觉程序很简单,打包居然出现了以下错误。 我的python版本是3.8.9,然后pyinstaller版本是5.9.0,不知道会不会是

    2024年02月11日
    浏览(32)
  • vue3+vant Failed to resolve import “E:/code3/jianmu-user-yd/node_modules/vant/lib/vant/es/icon/style

    Failed to resolve import \\\"E:/code3/jianmu-user-yd/node_modules/vant/lib/vant/es/icon/style\\\" from \\\"srcmain.js\\\". Does the file exist? 解决办法:在vite.config.js中配置:

    2024年02月06日
    浏览(33)
  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(30)
  • PyFlink核心知识点

    四层 说明 备注 SteamGraph 代码生成的最初的图 表示程序的拓扑结构 JobGraph 将多个符合条件的节点,链接为一个节点 可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗 ExecutionGraph JobGraph的并行化版本 是调度层最核心的数据结构 PhysicalGraph JobManager根据ExecutionGra

    2024年04月27日
    浏览(42)
  • 搭建PyFlink环境(2)

            😄伙伴们,好久不见!这里是 叶苍ii         ❀  作为一名大数据博主,我一直致力于分享最新的技术趋势和实战经验。近期,我在参加Flink的顾客营销项目,使用了PyFlink项目进行数据处理和分析。         ❀  在这个文章合集中,我将与大家分享我的实战经验,

    2024年02月19日
    浏览(19)
  • pyflink map函数例子

    [root@master pyflink]# cat k102.py  from pyflink.datastream import StreamExecutionEnvironment # 创建 StreamExecutionEnvironment 对象 env = StreamExecutionEnvironment.get_execution_environment() # 读取文件,创建 DataStream 对象 data_stream = env.read_text_file(\\\'/root/pyflink/test.log\\\') # 对每行数据添加字符串 \\\'aaaa\\\' new_stream = data_st

    2024年02月07日
    浏览(33)
  • pyflink读取文件并行度问题

    [root@master pyflink]# cat /root/pyflink/test.log  111111111   aaaaa 222222222   bbbbb 111111111   ccccc 222222222   ddddd 333333333   eeeee 111111111   fffff 111111111   ggggg 111111111   eeeee 111111111   hhhhh 111111111   iiiii 111111111   jjjjj 222222222   eeeee [root@master pyflink]# cat t107.py  from pyflink.common import WatermarkStrateg

    2024年02月05日
    浏览(18)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包