pyflink 写ES并发和串行

这篇具有很好参考价值的文章主要介绍了pyflink 写ES并发和串行。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

写ES并发执行:
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch


import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_test.txt')
# 对每行数据添加字符串 'aaaa'
class LogEvent:
    bus_seq = None
    line_number = None
    event_received_time = None
    app_name = None
    source_module_name = None
    source = None
    filename = None
    message = None
    serial_id = None
    thread_id = None
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     pool = redis.ConnectionPool(host='127.0.0.1',port=6379,max_connections=50)
     self.r = redis.Redis(connection_pool=pool)
   def close(self):
     self.r.close()

   def map(self,line):
    process_id='';
    bus_seq=''
    if not line.startswith("ES"):
        return 
    if '<Serial>' in line:
       pat=re.compile(r"<Serial>(\d+)</Serial>")
       bus_seq=pat.findall(line)
       process_id=line.split()[1]
       self.r.set(process_id,bus_seq[0])
    process_id=line.split()[1]
    if not len(process_id)==6 :
        process_id=line.split()[2]
     
    bus_seq=self.r.get(process_id) 
    if not bus_seq:
        return 
    #self.r.delete(process_id)
    log_event = LogEvent()
    LogEvent.bus_seq=bus_seq.decode('UTF-8')
    LogEvent.message=line
    #return (log_event.bus_seq.decode('UTF-8'),log_event.message)
    return LogEvent
    
class EsSink(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     self.es = Elasticsearch("http://127.0.0.1:9200")

   def close(self):
     pass

   def map(self,LogEvent):
     try:
        data = {
          "@timestamp": datetime.now().strftime( "%Y-%m-%dT%H:%M:%S.000+0800" ),
           "content" : LogEvent.message,
            "bus_seq" : LogEvent.bus_seq
          }
     except:
          return
     self.es.index( index="flink_test",  document=data )
      
new_stream = data_stream.map(MyMapFunction()).map(EsSink()).set_parallelism(3)

# 输出到控制台
#new_stream.print()

# 执行任务
env.execute('Add "bus_seq" to each line')

88行:
self.es.index( index="flink_test",  document=data )
     
     

[root@master pyflink]# python process_log.py 
process_log.py:88: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
  self.es.index( index="flink_test",  document=data )
process_log.py:88: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
  self.es.index( index="flink_test",  document=data )
process_log.py:88: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
  self.es.index( index="flink_test",  document=data 
  
  
 写ES串行执行:
 
 [root@master pyflink]# python process_log.py 
process_log.py:88: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
  self.es.index( index="flink_test",  document=data )
  
  文章来源地址https://www.toymoban.com/news/detail-472378.html

到了这里,关于pyflink 写ES并发和串行的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CentOS 8 执行yum命令报错:Failed to set locale, defaulting to C.UTF-8

    今天Docker新搞了一个 CentOS 镜像,在运行基于该镜像的容器,执行 yum 命令时,遇到了如下报错: 目前使用 docker pull centos 不指定tag,拉取的latest,centos的版本为 8.4.2105 其中 Error: Failed to download metadata for repo \\\'appstream\\\': Cannot prepare internal mirrorlist: No URLs in mirrorlist 是yum镜像源的问

    2024年02月11日
    浏览(38)
  • 51.pyinstaller打包后,打开exe程序提示SyntaxError: Non-UTF-8 code starting with '\x90' in file的问题

    问题: 最后开发了一款小工具,然后确定一切测试没有问题,想通过pyinstaller将其打包成exe,像类似的打包以前也经常打包的,复杂一点的也都是打包成功的,但这里感觉程序很简单,打包居然出现了以下错误。 我的python版本是3.8.9,然后pyinstaller版本是5.9.0,不知道会不会是

    2024年02月11日
    浏览(32)
  • MySQL执行异常: Illegal mix of collations (utf8mb4_0900_ai_ci IMPLICIT) and (utf8mb4_general_ci...

    Mysql生产库存储过程升级后执行报错:SQL错误( 1267 ) : Illegal mix of collations (utf8mb4_0900_ai _ci IMPLICIT) and (utf8mb4_general_ci IMPLICIT) for operation ‘=’。根据错误提示,报错原因应该是=号两侧内容的排序规则(collation)不一致导致的报错。 产生这个问题一种情况是两个字段的排序规则不

    2024年02月16日
    浏览(26)
  • 执行SQL文件出现【Unknown collation “utf8mb4_0900_ai_ci”】的解决方案

    从服务器MySQL中导出数据为SQL执行脚本后,在本地执行导出的SQL脚本。 报错:Unknown collation “utf8mb4_0900_ai_ci” 打开SQL脚本,查看 utf8mb4_0900_ai_ci ,这是字段的字符集。 1、MySQL 版本不一致。 2、字符集编码不支持。 1、升级 MySQL 数据库版本 将本地5.7版本的 MySQL数据库升

    2024年02月11日
    浏览(27)
  • 【MySQL异常解决】MySQL执行SQL文件出现【Unknown collation ‘utf8mb4_0900_ai_ci‘】的解决方案

    从服务器MySQL中导出数据为SQL执行脚本后,在本地电脑执行导出的SQL脚本, 报错: Unknown collation ‘utf8mb4_0900_ai_ci‘ 打开SQL脚本,查看 utf8mb4_0900_ai_ci ,这是字段的字符集。 1、MySQL 版本不一样; 2、utf8mb4_0900_ai_ci 在 MySQL 8 以下是不被支持的,检查发现本地数据库为5.7,

    2024年02月16日
    浏览(26)
  • 并发编程5:如何执行任务?

    目录 1、线程中执行任务的方式 2、Executor 框架 2.1 - 线程的执行策略 2.2 - 线程池 2.3 - Executor 的生命周期 2.4 - 延任务与周期任务 3、找出可利用的并行性-代码示例 3.1 - 单线程的 I/O 操作 3.2 - 携带任务结果的 Callable 与 Future(重要) 3.3 - 使用 Future 实现页面渲染器 3.5 - Completio

    2024年02月11日
    浏览(23)
  • 控制goroutine 的并发执行数量

    正常项目,协程数量超过十万就需要引起重视。如果有上百万goroutine,一般是有问题的。 但并不是说协程数量的上限是100多w 1048575的来自类似如下的demo代码: 执行后,很快报错 panic: too many concurrent operations on a single file or socket (max 1048575) 但这个是因为fmt.Printf导致的: 对单个

    2024年02月11日
    浏览(27)
  • SpringBoot整合ES,使用java操作ES并发请求

    对于java操作整合es有两种方案我先分别介绍然后解释一下最后我的选择为什么 1)、9300:TCP    spring-data-elasticsearch:transport-api.jar;    通过对9300端口建立一个长连接,但是因为springboot 版本不同, transport-api.jar 不同,不能适配 es 版本,并且7.x 已经不建议使用,8 以后就要废

    2023年04月08日
    浏览(35)
  • 深入理解高并发编程 - 线程的执行顺序

    在Java中,线程的执行顺序是由操作系统的调度机制决定的,具体顺序是不确定的,取决于多个因素,如操作系统的调度策略、线程的优先级、线程的状态转换等。因此,不能对线程的执行顺序做出可靠的假设。 以下是一个简单的Java代码示例,演示了多个线程的执行顺序是不

    2024年02月14日
    浏览(42)
  • 使用 Goroutine 和 Channel 来实现更复杂的并发模式,如并发任务执行、并发数据处理,如何做?

    使用 Goroutine 和 Channel 来实现更复杂的并发模式是 Go 语言的强大特性之一。 下面分别介绍如何实现并发任务执行和并发数据处理: 并发任务执行: 假设您有一些任务需要并发地执行,您可以使用 Goroutine 来同时执行这些任务,然后使用 Channel 来汇总结果。 下面是一个示例,

    2024年01月22日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包