pyflink map 字典写入ES

这篇具有很好参考价值的文章主要介绍了pyflink map 字典写入ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[root@master pyflink]# cat test.py 
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/test.txt')
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
       pass
   def close(self):
       pass

   def map(self,line):
      r_value=str(int(line.split(' ')[1]) + 1)
      dict1={}
      dict1['r_value']=r_value
      return dict1

env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
date_str = datetime.now().strftime("%Y-%m-%d")
es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07')) \
    .set_hosts(['127.0.0.1:9200']) \
    .build()
      
new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
# 输出到控制台

# 执行任务
env.execute('Add "bus_seq" to each line')文章来源地址https://www.toymoban.com/news/detail-497361.html

到了这里,关于pyflink map 字典写入ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • YOLOV7算法(一)test.py代码学习记录

    代码链接 :https://github.com/WongKinYiu/yolov7 输入指令 参数解析 上述代码中的参数基本与源码保持一致,只是修改了部分路径。 coco.yaml 如果已经提前下载好了coco2017数据集,可以注释掉代码: 根据解析参数,执行test() 模型加载 如果代码在gpu上运行,则将所有浮点参数和缓冲转

    2024年02月11日
    浏览(32)
  • Jenkins服务器连接JMeter分布式中的test-master

    Jenkins想要连接test-master就要通过代理 将下载好的agent.jar传输到test-master机器上的/usr/local(实际上任何目录都可以)下 然后我们在/usr/local目录下输入: (这个是在Jenkins页面自己生成的命令) 输入完成之后会提示我们连接成功,且这个 命令行窗口不可以关闭 回到Jenkins网页发

    2024年02月11日
    浏览(30)
  • [excel与dict] python 读取excel内容并放入字典、将字典内容写入 excel文件

    一 读取excel内容、并放入字典 1 读取excel文件 2 读取value,舍弃行号 3 读取为字典 一 读取excel内容、并放入字典(完整代码) 二、将字典内容写入 excel文件 1 假设已有字典内容为: 即student列表里有4个字典, 第一个字典里面有3对key-value \\\"num\\\": 1, \\\"name\\\": \\\"cod1\\\", \\\"wfm\\\": 0.1 2 导入Workb

    2024年02月04日
    浏览(36)
  • openssl3.2/test/certs - 004 - cross root and root cross cert

    索引贴 openssl3.2 - 官方demo学习 - test - certs // file my_openssl_linux_log_doc_004.txt // openssl3.2/test/certs - 004 - cross root and root cross cert // -------------------------------------------------------------------------------- // 官方脚本原始内容 // -------------------------------------------------------------------------------- //

    2024年01月24日
    浏览(36)
  • golang--sync.map(安全字典)

    引言:在Go语言中,多个goroutine之间安全地共享数据是一项挑战。为了解决这个问题,Go语言提供了sync包,并在其中引入了sync.Map类型。sync.Map是一种并发安全的映射数据结构,它提供了高效的并发访问方式,避免了显式的锁操作。本文将深入探讨sync.Map的使用方法和底层实现

    2024年02月13日
    浏览(35)
  • openssl3.2 - 官方demo学习 - test - certs - 001 - Primary root: root-cert

    实验前置条件为 openssl3.2 - linux脚本(.sh)调用openssl命令行参数的简单确认方法 做官方第2个实验时, 才发现, 自己记录的管道文件没有考虑-extfile后面带的管道. 重新修正了openssl的入口日志记录实现, 现在ok了. 命令行 - 原始 openssl genpkey -algorithm rsa -pkeyopt rsa_keygen_bits:2048 -out root-

    2024年01月22日
    浏览(28)
  • 35 | 并发安全字典sync.Map (下)

    我们在上一篇文章中谈到了,由于并发安全字典提供的方法涉及的键和值的类型都是interface{ },所以我们在调用这些方法的时候,往往还需要对键和值的实际类型进行检查。 这里大致有两个方案。我们上一篇文章中提到了第一种方案,在编码时就完全确定键和值的类型,然后

    2024年01月24日
    浏览(28)
  • openssl3.2/test/certs - 003 - genroot “Root CA“ root-key2 root-cert2

    索引贴 = openssl3.2 - 官方demo学习 - test - certs // openssl3.2/test/certs - 003 - genroot “Root CA” root-key2 root-cert2 // -------------------------------------------------------------------------------- // 官方原始脚本 // -------------------------------------------------------------------------------- ./mkcert.sh genroot “Root CA” roo

    2024年01月23日
    浏览(32)
  • YOLOv5源码逐行超详细注释与解读(4)——验证部分val(test).py

    本篇文章主要是对YOLOv5项目的验证部分。这个文件之前是叫test.py,后来改为 val.py 。 在之前我们已经学习了推理部分 detect.py 和训练部分 train.py 这两个,而我们今天要介绍的验证部分 val.py 这个文件主要是 train.py 每一轮训练结束后, 用 val.py 去验证当前模型的mAP、混淆矩阵等

    2023年04月15日
    浏览(74)
  • YOLOv7教程系列:一、基于自定义数据集训练专属于自己的目标检测模型(保姆级教程,含数据集预处理),包含对train.py/test.py/detect.py/export.py详细说明

    YOLOv7作为YOLO系列的又一大巅峰之作,下面将介绍利用自己的数据集训练YOLOv7模型。 github代码链接:https://github.com/WongKinYiu/yolov7 目前版本为v0.1 运行环境如下: ubuntu20.04 cuda11.0 cudnn8.0.4 python3.8 torch1.12.0 torchvision0.11.0 在data目录下新建Annotations, images, ImageSets, labels 四个文件夹 i

    2024年01月22日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包