Flink 系例 之 Connectors 连接 ElasticSearch

这篇具有很好参考价值的文章主要介绍了Flink 系例 之 Connectors 连接 ElasticSearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作;

示例环境

java.version: 1.8.x
flink.version: 1.11.1
elasticsearch:6.x

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Map;
/**
 * @Description 从elasticsearch中获取数据并输出到DataStream数据流中
 */
public class DataStreamSource {
    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
            private RestClientBuilder builder = null;
            //job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                builder = RestClient.builder(new HttpHost("192.168.1.3", 9200, "http"));
            }
            //执行查询并对数据进行封装
            @Override
            public void run(SourceContext<TUser> ctx) throws Exception {
                Gson gson = new Gson();
                RestHighLevelClient client = null;
                //匹配查询
                SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
                //定义索引库
                SearchRequest request = new SearchRequest();
                request.types("doc");
                request.indices("flink_demo");
                request.source(sourceBuilder);
                try {
                    client = new RestHighLevelClient(builder);
                    SearchResponse response = client.search(request, new Header[]{});
                    SearchHits hits = response.getHits();
                    System.out.println("查询结果有" + hits.getTotalHits() + "条");
                    for (SearchHit searchHits : hits ) {
                        Map<String,Object> dataMap = searchHits.getSourceAsMap();
                        TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
                        ctx.collect(user);
                    }
                    //ID查询
//                    GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
//                    client = new RestHighLevelClient(builder);
//                    GetResponse getResponse = client.get(request, new Header[]{});
//                    Map<String,Object> dataMap = getResponse.getSourceAsMap();
//                    TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
//                    ctx.collect(user);
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }finally {
                    if (client != null){
                        client.close();
                    }
                }
            }
            //Job结束时调用
            @Override
            public void cancel() {
                try {
                    super.close();
                } catch (Exception e) {
                }
                builder = null;
            }
        });
        dataStream.print();
        env.execute("flink es to data job");
    }

}

数据流输出

DataStreamSink.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Description 将DataStream数据流输出到elasticsearch中
 */
public class DataStreamSink {

    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(2);
        //1.设置Elasticsearch连接,创建索引数据
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.1.3", 9200, "http"));
        //创建数据源对象 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
                        Gson gson = new Gson();
                        Map<String,Object> map = gson.fromJson(user, Map.class);
                        indexer.add(Requests.indexRequest()
                                .index("flink_demo")
                                .type("doc")
                                .source(map));
                    }
                }
        );
        // 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前缓冲区的最大数据大小(以MB为单位)
        esSinkBuilder.setBulkFlushMaxSizeMb(500);
        //论缓冲操作的数量或大小如何都要刷新的时间间隔
        esSinkBuilder.setBulkFlushInterval(4000);

        //2.写入数据到流中
        //封装数据
        TUser user = new TUser();
        user.setId(9);
        user.setName("wang1");
        user.setAge(23);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
        //3.将数据写入到Elasticearch中
        input.addSink(esSinkBuilder.build());
        env.execute("flink data to es job");
    }

}

数据展示

flink-connector-elasticsearch,Flink,elasticsearch,搜索引擎,flink文章来源地址https://www.toymoban.com/news/detail-592846.html

到了这里,关于Flink 系例 之 Connectors 连接 ElasticSearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Cloud Gateway系例—参数配置(CORS 配置、SSL、元数据)

    你可以配置网关来控制全局或每个路由的 CORS 行为。两者都提供同样的可能性。 “global” CORS配置是对 Spring Framework CorsConfiguration 的URL模式的映射。下面的例子配置了 CORS。 Example 77. application.yml 在前面的例子中,对于所有GET请求的路径,允许来自 docs.spring.io 的请求的CORS请求

    2024年02月13日
    浏览(37)
  • flink连接kafka

    示例 以下属性在构建 KafkaSource 时是必须指定的: Bootstrap server,通过 setBootstrapServers(String) 方法配置 消费者组 ID,通过setGroupId(String) 配置 要订阅的 Topic / Partition, 用于解析 Kafka消息的反序列化器(Deserializer) 起始消费位点 Kafka source 能够通过位点初始化器(OffsetsInitialize

    2024年02月22日
    浏览(33)
  • Flink 之 Kafka连接器

    Flink附带了一个通用的Kafka连接器,它试图跟踪Kafka客户端的最新版本。Kafka的客户端版本会在Flink不同版本间发生变化。现代Kafka客户端向后兼容broker 0.10.0版本及以后的版本。 用法 Kafka Source 提供了一个构造器类来构建KafkaSource的实例。下面代码展示如何构建一个KafkaSource来消

    2023年04月08日
    浏览(34)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(38)
  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(36)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(35)
  • Flink系列之:JDBC SQL 连接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外

    2024年02月02日
    浏览(33)
  • Flink系列之:Elasticsearch SQL 连接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 如果 DDL 中没有定义主键,那么

    2024年02月04日
    浏览(40)
  • 【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

    SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。 SQL 提示一般可以用于以下: 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行; 增加元数据(或者统计信息):如\\\"已扫描的表索引\\\"和\\\"一些混洗键(shu

    2024年04月25日
    浏览(25)
  • flink cdc 连接posgresql 数据库相关问题整理

    01 、flink posgresql cdc 前置工作 1,更改配置文件postgresql.conf wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值 更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改 2,新建用户并且给用户复制流权限 3,发

    2024年02月07日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包