python flat_map字典写入es

这篇具有很好参考价值的文章主要介绍了python flat_map字典写入es。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[root@master pyflink]# cat test.txt 
aaaaa 111111
bbbbb 222222
ccccc 333333
ddddd 444444
eeeee 555555


[root@master pyflink]# cat test.py 
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/test.txt')
class MyMapFunction(FlatMapFunction):
   def open(self, runtime_context: RuntimeContext):
       pass
   def close(self):
       pass

   def flat_map(self,line):
      r_value=str(int(line.split(' ')[1]) + 1)
      dict1={}
      dict1['r_value']=r_value
      yield dict1

env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
date_str = datetime.now().strftime("%Y-%m-%d")
es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07')) \
    .set_hosts(['127.0.0.1:9200']) \
    .build()
      
new_stream = data_stream.flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
# 输出到控制台

# 执行任务
env.execute('Add "bus_seq" to each line')文章来源地址https://www.toymoban.com/news/detail-488183.html

到了这里,关于python flat_map字典写入es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 35 | 并发安全字典sync.Map (下)

    我们在上一篇文章中谈到了,由于并发安全字典提供的方法涉及的键和值的类型都是interface{ },所以我们在调用这些方法的时候,往往还需要对键和值的实际类型进行检查。 这里大致有两个方案。我们上一篇文章中提到了第一种方案,在编码时就完全确定键和值的类型,然后

    2024年01月24日
    浏览(37)
  • 【网络编程】学习成果day7:用sqlite3和fgetc将字典写入数据库中的数据表。

    1.将字典写入数据库中的数据表 代码: 运行结果:

    2024年02月09日
    浏览(44)
  • Elasticsearch(ES)(版本7.x)创建索引报错:Faile to parse mapping [_doc] Root mapping definition has unsupported

    Elasticsearch(ES)(版本7.x)创建索引报错: 因es7.0版本之后不再需要type doc,把上面语句中的doc删掉,再运行就可以创建索引了。 如果还需要type doc则需要增加include_type_name=true即可解决。 示例:

    2024年02月16日
    浏览(43)
  • 机顶盒晶晨s905l3b芯片刷第三方系统+安卓9 root教程+armbian写入EMMC教程

    机顶盒s905l3b芯片刷第三方系统+安卓9 root教程+刷armbian写入EMMC教程 最近我在装修房子,看抖音刷到了HAOS系统(全屋智能-安装homeassistant),就心血来潮到咸鱼市场购买了一个机顶盒,机顶盒的基本配置是:中兴ZXV10 B860AV3.2-M,CPU晶晨S905l3-b,支持杜比,4g运行32g闪存真实配置中国

    2024年01月24日
    浏览(362)
  • yolov5数据读取报错:train: No labels found in /root/yolov5-master/VOCData/dataSet_path/train.cache

    这个问题是由于路径设置错误导致的,以下几个文件的路径都要保持一致。 (1)yolov5-master/VOCData/xml_to_yolo.py 这个文件是将xml格式的label转为txt格式,这个地方建议直接改为绝对路径。  (2)yolov5-mastertrain.py train文件里面的ROOT也需要改为yolov5-master所在路径,后续代码都使用

    2024年02月13日
    浏览(47)
  • ElasticSearch创建索引报错:mapper_parsing_exception Root mapping definition has unsupported parameters

    ElasticSearch版本号:5.6.14,这个错误和ES版本有一定的关系,还是先交代下版本号,免得有的读者根据我的方法操作后无效 错误翻译: mapper_parsing_exception :映射解析异常 Root mapping definition has unsupported parameters :根映射定义包含不受支持的参数 错误映射语句1: 错误映射语句

    2024年02月11日
    浏览(56)
  • 数组扁平化flat方法的多种实现

    flat() 执行效果: toString() 注意:map()处理空值的问题 执行效果: JSON.stringify() 注意:map()处理空值的问题 执行效果: while() {} 执行效果: reduce() 对于一个数组,使用 reduce() 方法遍历其中的每个元素,如果元素是一个数组,则递归调用扁平化函数;否则,将元素添加到结果数

    2024年02月13日
    浏览(44)
  • JavaScript 数组展平方法: flat() 和 flatMap()

    从 ES2019 中开始引入了一种扁平化数组的新方法,可以展平任何深度的数组。 flat() 方法创建一个新数组,其中所有子数组元素以递归方式连接到特定深度。 语法:array.flat(depth) array : flat() 方法将在给定的数组中使用。 depth :可选参数,指定展平的深度,默认情况下,深度

    2024年02月09日
    浏览(41)
  • Python 字典 get()函数使用详解,字典获取值

    「作者主页」: 士别三日wyx 「作者简介」: CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」: 小白零基础《Python入门到精通》 get() 可以根据键 「获取值」 语法 参数 key :(必选)指定要搜索的键 value :(可选)如果键不存在,就返回

    2024年02月14日
    浏览(35)
  • Python 列表转字典:实现列表和字典之间的转换

    Python 列表转字典:实现列表和字典之间的转换 在 Python 中,列表(List)和字典(Dictionary)是两种常见的数据类型。列表用于存储一组有序的元素,而字典则是一组无序的键值对。有时候我们需要将一个列表转换成一个字典,或者将一个字典转换成一个列表。这种需求在实际

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包