Elasticsearch 集成---Spark Streaming 框架集成

这篇具有很好参考价值的文章主要介绍了Elasticsearch 集成---Spark Streaming 框架集成。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.es</groupId>
    <artifactId>es-sparkstreaming</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch依赖2.x的log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>com.fasterxml.jackson.core</groupId>-->
        <!--            <artifactId>jackson-databind</artifactId>-->
        <!--            <version>2.11.1</version>-->
        <!--        </dependency>-->
        <!--        &lt;!&ndash; junit单元测试 &ndash;&gt;-->
        <!--        <dependency>-->
        <!--            <groupId>junit</groupId>-->
        <!--            <artifactId>junit</artifactId>-->
        <!--            <version>4.12</version>-->
        <!--        </dependency>-->
    </dependencies>
</project>

2.功能实现

package com.atguigu.es

import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType

object SparkStreamingESTest {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        ds.foreachRDD(
            rdd => {
                rdd.foreach(
                    data => {
                        val client = new RestHighLevelClient(
                            RestClient.builder(new HttpHost("localhost",9200, "http"))
                        )

                        val ss = data.split(" ")

                        val request = new IndexRequest()
                        request.index("product").id(ss(0))
                        val json =
                            s"""
                              | {  "data" : "${ss(1)}" }
                              |""".stripMargin
                        request.source(json, XContentType.JSON)

                        val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
                        println(response.getResult)
                        client.close()
                    }
                )
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999
4.2 启动测试

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据

4.3 cmd输入 1001 jianzi

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

Elasticsearch 集成---Spark Streaming 框架集成,ElasticSearch,elasticsearch,spark,大数据文章来源地址https://www.toymoban.com/news/detail-672993.html

到了这里,关于Elasticsearch 集成---Spark Streaming 框架集成的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch与PHP的集成:如何使用PHP与ElasticSearch进行交互

    Elasticsearch是一个基于Lucene库的搜索引擎,它提供了实时、可扩展和可伸缩的搜索功能。它通常用于构建实时搜索、分析和数据可视化应用程序。PHP是一种广泛使用的服务器端脚本语言,它可以与Elasticsearch集成以实现高效的搜索功能。 在本文中,我们将讨论如何使用PHP与Ela

    2024年02月22日
    浏览(86)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • SpringBoot集成 ElasticSearch

    对于ElasticSearch比较陌生的小伙伴可以先看看ElasticSearch的概述ElasticSearch安装、启动、操作及概念简介 好的开始啦~ 1.1、导入依赖 新版本配置方式(推荐使用) 新的配置方式使用的是 High Level REST Client 的方式来替代之前的 Transport Client 方式,使用的是 HTTP 请求,和 Kibana 一样使

    2023年04月20日
    浏览(39)
  • ElasticSearch集成SpringBoot实践

    Search API Search Request,用于搜索文档,聚合,相关的任何操作,还提供了高亮显示结果文档的方法 使用SearchSourceBuilder,大多数控制搜索行为的选项都可以在SearchSourceBuilder上设置 构建查询,搜索查询是使用QueryBuilder对象创建的,ES的查询DSL支持的每一种搜索查询类型都有一个

    2024年02月03日
    浏览(40)
  • SpringBoot 集成 Elasticsearch

    版本说明详见 Elasticsearch 下载 kibana下载 ik分词器下载 2.1 解压,在elasticsearch-7.8.0plugins 路径下新建ik目录 2.2 将ik分词器解压放入ik目录 2.3 扩展词汇测试示例 2.3.1 ik/config 目录下新建custom.dic文件 2.3.2 编辑custom.dic文件,加入新词汇 注意:custom.dic文件内容的格式的编码为UTF-8格

    2024年02月14日
    浏览(40)
  • SpringBoot集成ElasticSearch

    实现搜索并高亮 在线体验:http://www.sixkey-world.top

    2024年03月17日
    浏览(47)
  • 五、Elasticsearch 集成

    Spring Data 是一个用于简化数据库开发的开源框架。其主要目标是使得对数据的访问变得方便快捷, Spring Data 可以极大的简化操作数据库的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了 CRUD 外,还包括如分页、排序等一些常用的功能。 Spring Data 的官

    2024年04月12日
    浏览(24)
  • MyBatis与Elasticsearch集成

    MyBatis是一种高效的Java数据访问框架,它可以简化数据库操作,提高开发效率。Elasticsearch是一个开源的搜索和分析引擎,它可以提供实时、可扩展的搜索功能。在某些场景下,我们可能需要将MyBatis与Elasticsearch集成,以实现高效的数据存储和搜索功能。 在本文中,我们将讨论

    2024年02月20日
    浏览(23)
  • 应用日志集成到ElasticSearch

    阿里sls集成日志步骤 安装docker容器 Docker安装 拉取镜像: 启动: 拷贝容器中的数据文件到宿主机: 设置权限 配置filebeat 修改样例如下: 日志输入源 : 使用 filebeat.inputs 部分定义了两个日志输入源。每个输入源监视不同路径下的日志文件,并根据路径指定的类型和字段进行

    2024年04月17日
    浏览(42)
  • 六、SpringBoot集成elasticsearch

    目录 官网API介绍 1、新建maven项目 2、检查elasticsearch依赖的版本 3、配置RestHighLevelClient对象 4、使用springboot-test测试API的使用 Java API Client https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/index.html Java REST Client(rest-high-level-client): https://www.elastic.co/guide/en/elasticsearc

    2024年02月09日
    浏览(66)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包