10年大数据专家,使用Flink实现索引数据到Elasticsearch,快来学

这篇具有很好参考价值的文章主要介绍了10年大数据专家,使用Flink实现索引数据到Elasticsearch,快来学。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的Sink Operator实现。

flink elasticsearch,大数据,flink,elasticsearch

Flink批式处理模式,运行Flink Batch Job时作用在有界的输入数据集上,所以Job运行的时间是有时限的,一旦Job运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个Hive SQL查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的OutputFormat,然后基于该OutputFormat来构建一个Sink,下面看下OutputFormat接口的定义,如下所示:

flink elasticsearch,大数据,flink,elasticsearch

上面,configure()方法用来配置一个OutputFormat的一些输出参数;open()方法用来实现与外部存储系统建立连接;writeRecord()方法用来实现对Flink Batch Job处理后,将数据记录输出到外部存储系统。开发Batch Job时,通过调用DataSet的output()方法,参数值使用一个OutputFormat的具体实现即可。后面,我们会基于Elasticsearch来实现上面接口中的各个方法。

flink elasticsearch,大数据,flink,elasticsearch

Flink流式处理模式,运行Flink Streaming Job时一般输入的数据集为流数据集,也就是说输入数据元素会持续不断地进入到Streaming Job的处理过程中,但你仍然可以使用一个HDFS数据文件作为Streaming Job的输入,即使这样,一个Flink Streaming Job启动运行后便会永远运行下去,除非有意外故障或有计划地操作使其终止。在流式处理模式下处理数据的输出时,我们需要是实现一个SinkFunction,它指定了如下将流数据处理后的结果,输出到指定的外部存储系统中,下面看下SinkFunction的接口定义,如下所示:

flink elasticsearch,大数据,flink,elasticsearch

通过上面接口可以看到,需要实现一个invoke()方法,实现该方法来将一个输入的IN value输出到外部存储系统中。一般情况下,对一些主流的外部存储系统,Flink实现了一下内置(社区贡献)的SinkFunction,我们只需要配置一下就可以直接使用。而且,对于Streaming Job来说,实现的SinkFunction比较丰富一些,可以减少自己开发的工作量。开发Streaming Job时,通过调用DataStream的addSink()方法,参数是一个SinkFlink的具体实现。下面,我们分别基于批式处理模式和批式处理模式,分别使用或实现对应组件将Streaming Job和Batch Job的处理结果输出到Elasticsearch中:

基于Flink DataSteam API实现

在开发基于Flink的应用程序过程中,发现Flink Streaming API对Elasticsearch的支持还是比较好的,比如,如果想要从Kafka消费事件记录,经过处理最终将数据记录索引到Elasticsearch 5.x,可以直接在Maven的POM文件中添加如下依赖即可:

flink elasticsearch,大数据,flink,elasticsearch

我们使用Flink Streaming API来实现将流式数据处理后,写入到Elasticsearch中。其中,输入数据源是Kafka中的某个Topic;输出处理结果到lasticsearch中,我们使用使用Transport API的方式来连接Elasticsearch,需要指定Transport地址和端口。具体实现,对应的Scala代码,如下所示:

flink elasticsearch,大数据,flink,elasticsearch

flink elasticsearch,大数据,flink,elasticsearch

上面有关数据索引到Elasticsearch的处理中, 最核心的就是创建一个ElasticsearchSink,然后通过DataStream的API调用addSink()添加一个Sink,实际是一个SinkFunction的实现,可以参考Flink对应DataStream类的addSink()方法代码,如下所示:

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] = 
  stream.addSink(sinkFunction)

基于Flink DataSet API实现

目前,Flink还没有在Batch处理模式下实现对应Elasticsearch对应的Connector,需要自己根据需要实现,所以我们基于Flink已经存在的Streaming处理模式下已经实现的Elasticsearch Connector对应的代码,经过部分修改,可以直接拿来在Batch处理模式下,将数据记录批量索引到Elasticsearch中。我们基于Flink 1.6.1版本,以及Elasticsearch 6.3.2版本,并且使用Elasticsearch推荐的High Level REST API来实现(为了复用Flink 1.6.1中对应的Streaming处理模式下的Elasticsearch 6 Connector实现代码,我们选择使用该REST Client),需要在Maven的POM文件中添加如下依赖:

flink elasticsearch,大数据,flink,elasticsearch

我们实现的各个类的类图及其关系,如下图所示:

flink elasticsearch,大数据,flink,elasticsearch

如果熟悉Flink Streaming处理模式下Elasticsearch对应的Connector实现,可以看到上面的很多类都在
org.apache.flink.streaming.connectors.elasticsearch包里面存在,其中包括批量向Elasticsearch中索引数据(内部实现了使用BulkProcessor)。上图中引入的ElasticsearchApiCallBridge,目的是能够实现对Elasticsearch不同版本的支持,只需要根据Elasticsearch不同版本中不同Client实现,进行一些适配,上层抽象保持不变。

如果需要在Batch处理模式下批量索引数据到Elasticsearch,可以直接使用ElasticsearchOutputFormat即可实现。但是创建ElasticsearchOutputFormat,需要几个参数:

private ElasticsearchOutputFormat(  
  Map<String, String> bulkRequestsConfig,   
  List<HttpHost> httpHosts,    
  ElasticsearchSinkFunction<T> elasticsearchSinkFunction,  
  DocWriteRequestFailureHandler failureHandler,    
  RestClientFactory restClientFactory) {  
  super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  
        bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}

当然,我们可以通过代码中提供的Builder来非常方便的创建一个ElasticsearchOutputFormat。下面,我们看下我们Flink Batch Job实现逻辑。

  • 实现ElasticsearchSinkFunction

我们需要实现ElasticsearchSinkFunction接口,实现一个能够索引数据到Elasticsearch中的功能,代码如下所示:

final ElasticsearchSinkFunction<String> elasticsearchSinkFunction = new ElasticsearchSinkFunction<String>() {  
  @Override  
  public void process(String element, RuntimeContext ctx, RequestIndexer indexer)
  {    
    indexer.add(createIndexRequest(element, parameterTool));  
  }   
  private IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {  
    LOG.info("Create index req: " + element);   
    JSONObject o = JSONObject.parseObject(element);    
    return Requests.indexRequest()          
      .index(parameterTool.getRequired("es-index"))     
      .type(parameterTool.getRequired("es-type"))         
      .source(o); 
  } 
};

上面代码,主要是把一个将要输出的数据记录,通过RequestIndexer来实现索引到Elasticsearch中。

  • 读取Elasticsearch配置参数

配置连接Elasticsearch的参数。从程序输入的ParameterTool中读取Elasticsearch相关的配置:

flink elasticsearch,大数据,flink,elasticsearch

  • 创建ElasticsearchOutputFormat

创建一个我们实现的ElasticsearchOutputFormat,代码片段如下所示:

flink elasticsearch,大数据,flink,elasticsearch

上面很多配置项指定了向Elasticsearch中进行批量写入的行为,在ElasticsearchOutputFormat内部会进行设置并创建
Elasticsearch6BulkProcessorIndexer,优化索引数据处理的性能。

  • 实现Batch Job主控制流程

最后我们就可以构建我们的Flink Batch应用程序了,代码如下所示:

flink elasticsearch,大数据,flink,elasticsearch

我们输入的HDFS文件中,是一些已经加工好的JSON格式记录行,这里为了简单,直接将原始JSON字符串索引到Elasticsearch中,而没有进行更多其他的处理操作。

flink elasticsearch,大数据,flink,elasticsearch文章来源地址https://www.toymoban.com/news/detail-861700.html

到了这里,关于10年大数据专家,使用Flink实现索引数据到Elasticsearch,快来学的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Elasticsearch使用reindex命令同步跨集群索引数据(跨服务器)

    参考(不需要写协议名称http) 修改新的ES集群的配置,修改config/elasticsearch.yml ,添加上面的配置,修改完之后重新启动服务。 2. 在新的ES集群中执行请求 请求url:_reindex 请求方式:post 请求体 注意点 1.请求需要在新的ES集群中执行 2.如果执行有个别数据报错type类型无法转换,会终

    2024年02月14日
    浏览(44)
  • 【粉丝福利社】Elasticsearch 通过索引阻塞实现数据保护深入解析(文末送书-完结)

    🏆 作者简介,愚公搬代码 🏆《头衔》:华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,51CTO博客专家等。 🏆《近期荣誉》:

    2024年04月14日
    浏览(38)
  • Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享

    public static void main(String[] args) throws Exception{ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /** * 为了保险起见,防止生产方未启动队列未创建的情况下消费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列必须

    2024年04月16日
    浏览(41)
  • Spring Boot Elasticsearch7.6.2实现创建索引、删除索引、判断索引是否存在、获取/添加/删除/更新索引别名、单条/批量插入、单条/批量更新、删除数据、递归统计ES聚合的数据

    注意:我的版本是elasticsearch7.6.2、spring-boot-starter-data-elasticsearch-2.5.6 引入依赖 有时候你可能需要查询大批量的数据,建议加上下面配置文件

    2024年02月13日
    浏览(47)
  • Elasticsearch Dump的详细安装和迁移es索引和数据的使用教程

    如果希望将数据导出到本地文件而不是通过编程方式处理,可以考虑使用Elasticsearch的导出工具,如 Elasticsearch Dump (Elasticdump)或 Elasticsearch Exporter 。这些工具可以将Elasticsearch索引中的数据导出为可用于后续处理的文件格式,如JSON或CSV,本文主要介绍使用Elasticsearch Dump进行索

    2024年02月14日
    浏览(33)
  • 【ElasticSearch】基于Docker 部署 ElasticSearch 和 Kibana,使用 Kibana 操作索引库,以及实现对文档的增删改查

    Elasticsearch 和 Kibana 是强大的工具,用于构建实时搜索和数据可视化解决方案。Elasticsearch 是一个分布式、高性能的搜索引擎,可以用于存储和检索各种类型的数据,从文本文档到地理空间数据。Kibana 则是 Elasticsearch 的可视化工具,用于实时分析和可视化大规模数据集。 在本

    2024年02月06日
    浏览(66)
  • 2023年大数据开题报告详细模版

    🙌秋名山码民的主页 😂oi退役选手,Java、大数据、单片机、IoT均有所涉猎,热爱技术,技术无罪 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 获取源码,添加WX 首先,开题报告是你后续论文研究内容的书面表达形式,百度给出的作用如下: 确定研究方向和目标:开题报告帮助研

    2024年02月07日
    浏览(22)
  • elasticsearch批量索引数据示例

       示例数据文件document.json(index表示在索引中增加或替换现有文档,create表示如果文档不存在则添加文档,delete表示删除文档): { \\\"index\\\": { \\\"_index\\\": \\\"addr\\\", \\\"_type\\\": \\\"contact\\\", \\\"_id\\\": 1 }} { \\\"name\\\": \\\"Fyodor Dostoevsky\\\", \\\"country\\\": \\\"RU\\\" } { \\\"create\\\": { \\\"_index\\\": \\\"addr\\\", \\\"_type\\\": \\\"contact\\\", \\\"_id\\\": 2 }} { \\\"name\\\": \\\"Erich M

    2024年02月08日
    浏览(35)
  • Elasticsearch中复制一个索引数据到新的索引中

    我有时候,需要调试一个已经存在的ES索引,需要从已有的索引复制数据到新的索引中去。 这里我借助一个GUI工具,来解决这个问题,底层它是使用Reindex的API实现索引数据复制的。利用Reindex API搞不定这个事情,原索引mapping结构不会被复制。 # 步骤 选中已存在的redix菜单,准

    2024年02月22日
    浏览(34)
  • 使用PyTorch实现混合专家(MoE)模型

    Mixtral 8x7B 的推出在开放 AI 领域引发了广泛关注,特别是混合专家(Mixture-of-Experts:MoEs)这一概念被大家所认知。混合专家(MoE)概念是协作智能的象征,体现了“整体大于部分之和”的说法。MoE模型汇集了各种专家模型的优势,以提供更好的预测。它是围绕一个门控网络和一

    2024年01月17日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包