Elasticsearch 集成--Flink 框架集成

这篇具有很好参考价值的文章主要介绍了Elasticsearch 集成--Flink 框架集成。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Flink 框架介绍

Elasticsearch 集成--Flink 框架集成,ElasticSearch,flink,大数据

       Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。
但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没
有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而
凸显的更加明显:
  •  数据精准一次性处理(Exactly-Once
  • 乱序数据,迟到数据
  •  低延迟,高吞吐,准确性
  •  容错性
        Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在
Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决, Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公
司在 2015 年改进 Flink ,并创建了内部分支 Blink ,目前服务于阿里集团内部搜索、推荐、
广告和蚂蚁等大量核心实时业务。

二、框架集成

2.1创建 Maven 项目

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project
        xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lun.es</groupId>
    <artifactId>flink-elasticsearch</artifactId>
    <version>1.0</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>
</project>

功能实现

package com.xmx.es;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlinkElasticsearchSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
        //httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

        // use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        return Requests.indexRequest()
                                .index("my-index")
                                //.type("my-type")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // provide a RestClientFactory for custom configuration on the internally createdREST client
        // esSinkBuilder.setRestClientFactory(
        // restClientBuilder -> {
        // restClientBuilder.setDefaultHeaders(...)
        // restClientBuilder.setMaxRetryTimeoutMillis(...)
        // restClientBuilder.setPathPrefix(...)
        // restClientBuilder.setHttpClientConfigCallback(...)
        // }
        // );
        source.addSink(esSinkBuilder.build());
        env.execute("flink-es");
    }
}

 文章来源地址https://www.toymoban.com/news/detail-702036.html

到了这里,关于Elasticsearch 集成--Flink 框架集成的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(87)
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 社区发

    2024年02月04日
    浏览(55)
  • 10年大数据专家,使用Flink实现索引数据到Elasticsearch,快来学

    使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为Sink Operator,包含了

    2024年04月29日
    浏览(29)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(46)
  • 【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

    Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。 下面是一些关于Flink的Elasticsearch Sink的基础概念: 数据源(Source) :Flink数据流

    2024年02月20日
    浏览(54)
  • Flink 输出至 Elasticsearch

    【1】引入 pom.xml 依赖 【2】 ES6 Scala 代码,自动导入的 scala 包需要修改为 scala._ 否则会出现错误。 【3】 ES6 输出展示

    2024年02月04日
    浏览(39)
  • 实时Flink与Elasticsearch的整合

    在大数据处理领域,实时数据处理和搜索功能是非常重要的。Apache Flink 是一个流处理框架,它可以处理大量实时数据,提供高性能和低延迟的数据处理能力。Elasticsearch 是一个分布式搜索引擎,它可以索引和搜索大量文档,提供快速、准确的搜索结果。在实际应用中,Flink 和

    2024年02月22日
    浏览(30)
  • Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作; 示例环境 示例数据源 (项目码云下载) Flink 系例 之 搭建开发环境与数据 示例模块 (pom.xml) Flink 系例 之 DataStream Connectors 与 示例模块 数据流输入

    2024年02月16日
    浏览(47)
  • flink 13.5 sink elasticsearch-7

    mysql 数据-- flink sql --es mysql flink elasticsearch 5.7.20-log 13.5 7.12.0 官网可以下载包 flink-sql-connector-elasticsearch7_2.11-1.13.6.jar https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

    2024年02月14日
    浏览(39)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包