Scala连接ES客户端

这篇具有很好参考价值的文章主要介绍了Scala连接ES客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.前言

大家好,我是楚生辉,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!

本文详细的介绍了如何使用Scala语言连接上Elasticsearch客户端,有需要的小伙伴可以自行获取与学习~

2.ES工具类

package com.xxxx

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeConfig
import org.apache.http.HttpHost
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.search.{SearchRequest, SearchResponse}
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.search.{SearchHit, SearchHits}
import org.elasticsearch.search.builder.SearchSourceBuilder
import java.util
import scala.collection.mutable.ListBuffer
/**
 * ES工具类
 * 用于对ES读写操作
 */
object MyEsutils {
    def searchField(indexName: String, fieldName: String): List[String] = {
        // 先判断索引是否存在
        val request = new GetIndexRequest(indexName)
        val bool: Boolean = esClient.indices().exists(request, RequestOptions.DEFAULT)
        if (!bool){
            return null
        }
        // 正常从ES中提取指定的字段
        val mids: ListBuffer[String] = ListBuffer[String]()
        val searchRequest = new SearchRequest(indexName)
        val searchSourceBuilder = new SearchSourceBuilder()
        searchSourceBuilder.fetchSource(fieldName,null).size(10000)
        searchRequest.source(searchSourceBuilder)
        val searchResponse: SearchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT)
        val hits: Array[SearchHit] = searchResponse.getHits.getHits
        
        for (hit <- hits) {
            val sourceAsMap: util.Map[String, AnyRef] = hit.getSourceAsMap
            val mid: String = sourceAsMap.get(fieldName).toString
            mids.append(mid)
        }
        mids.toList
    } 
    // ES客户端对象
    val esClient : RestHighLevelClient = build()
    
    // 创建ES客户端
    def build():RestHighLevelClient = {
        val host: String = "localhost"
        val port: String = "9200"
        val builder: RestClientBuilder = RestClient.builder(new HttpHost(host,port.toInt))
        val client = new RestHighLevelClient(builder)
        client
    }
    // 关闭ES对象
    def close():Unit={
        if (esClient != null){
            esClient.close()
        }
    }
    /*
     * 1.批量写
     * 2.幂等写
     */
    def bulkSave(indexName:String,docs: List[(String,AnyRef)]):Unit = {
        val bulkRequest = new BulkRequest()
        for ((docId,docObj) <- docs) {
            val indexRequest = new IndexRequest(indexName)
            val dataJson: String = JSON.toJSONString(docObj, new SerializeConfig(true))
            indexRequest.source(dataJson,XContentType.JSON)
            indexRequest.id(docId)
            bulkRequest.add(indexRequest)
        }
        esClient.bulk(bulkRequest,RequestOptions.DEFAULT)
    }
}
  • 使用方法
MyEsutils.bulkSave(indexName,orderWides)
MyEsutils.searchField(indexName,orderWides)

写入ES中,就要规划,是写入到一个索引中,还是分割索引(依据什么进行分割),建索引字段的类型,模板是什么,以及方便后续查询起索引别名文章来源地址https://www.toymoban.com/news/detail-812341.html

3.ES客户端常见API使用

package com.xxxx

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeConfig
import org.apache.http.HttpHost
import org.apache.lucene.search.TotalHits
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.get.{GetRequest, GetResponse}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.search.{SearchRequest, SearchResponse}
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.text.Text
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.query.{BoolQueryBuilder, MatchQueryBuilder, QueryBuilder, QueryBuilders, RangeQueryBuilder, TermQueryBuilder}
import org.elasticsearch.index.reindex.UpdateByQueryRequest
import org.elasticsearch.script.{Script, ScriptType}
import org.elasticsearch.search.SearchHit
import org.elasticsearch.search.aggregations.bucket.terms.{ParsedTerms, Terms, TermsAggregationBuilder}
import org.elasticsearch.search.aggregations.metrics.{AvgAggregationBuilder, ParsedAvg}
import org.elasticsearch.search.aggregations.{Aggregation, AggregationBuilder, AggregationBuilders, Aggregations, BucketOrder}
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.search.fetch.subphase.highlight.{HighlightBuilder, HighlightField}
import org.elasticsearch.search.sort.SortOrder
import org.elasticsearch.search.suggest.term.TermSuggestion.Score

import java.util

/**
 * 测试ES客户端
 */
object EsTest {
    def main(args: Array[String]): Unit = {

        // post()
        // bulk()
        // getById()
        // searchByFilter()
        searchByAggs()
        close()

        /*
         增:幂等
         */
        def put(): Unit = {
            val indexRequest = new IndexRequest()
            // 指定索引
            indexRequest.index("movie_test")
            // 指定doc
            val movie: Movie = Movie("1001", "速度与激情1")
            val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
            indexRequest.source(movieJson, XContentType.JSON)
            indexRequest.index("movie_test")
            indexRequest.id("1001")
            client.index(indexRequest, RequestOptions.DEFAULT)
        }

        /*
         增:非幂等写,不指定id
         */
        def post(): Unit = {
            val indexRequest = new IndexRequest()
            // 指定索引
            indexRequest.index("movie_test")
            // 指定doc
            val movie: Movie = Movie("1002", "速度与激情2")
            val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
            indexRequest.source(movieJson, XContentType.JSON)
            indexRequest.index("movie_test")
            client.index(indexRequest, RequestOptions.DEFAULT)
        }

        /*
        批量写
         */
        def bulk(): Unit = {
            val bulkRequest = new BulkRequest()
            val movies: List[Movie] = List[Movie](
                Movie("1002", "长津湖"),
                Movie("1003", "熊出没"),
                Movie("1004", "狙击手"),
                Movie("1005", "长门桥")
            )
            for (movie <- movies) {
                // 指定索引
                val indexRequest = new IndexRequest("movie_test")
                val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
                indexRequest.source(movieJson, XContentType.JSON)
                // 如果是幂等,就指定id,不是幂等就不指定
                indexRequest.id(movie.id)

                // 将indexRequest加入到bulk批次中
                bulkRequest.add(indexRequest)
            }
            // 最后一次批次执行
            client.bulk(bulkRequest, RequestOptions.DEFAULT)
        }

        /*
        修改:单条修改
         */
        def update(): Unit = {
            val updateRequest = new UpdateRequest()
            updateRequest.index("movie_test")
            updateRequest.id("1001")
            // 把docid为1001的数据,修改movie_name的值
            updateRequest.doc("movie_name", "功夫")
            client.update(updateRequest, RequestOptions.DEFAULT)
        }

        /*
       修改:条件修改 把电影名为速度与激情的都修改
        */
        def updateByQuery(): Unit = {
            val updateByQueryRequest = new UpdateByQueryRequest("movie_test")

            // query
            val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()
            val termQueryBuilder: TermQueryBuilder = QueryBuilders.termQuery("movie_name.keyword", "速度与激情")
            boolQueryBuilder.filter(termQueryBuilder)
            updateByQueryRequest.setQuery(boolQueryBuilder)
            // update
            val params = new util.HashMap[String, AnyRef]()
            params.put("newName", "湄公河行动")
            val script = new Script(
                ScriptType.INLINE,
                Script.DEFAULT_SCRIPT_LANG,
                "ctx._source['movie_name']=params.newName",
                params
            )
            updateByQueryRequest.setScript(script)
            client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)
        }

        /*
        删除
         */
        def delete(): Unit = {
            val deleteRequest = new DeleteRequest("movie_test", "1001")
            client.delete(deleteRequest, RequestOptions.DEFAULT)
        }

        /*
        查询:单条查询
         */
        def getById(): Unit = {
            val getRequest = new GetRequest("movie_test", "1001")
            val response: GetResponse = client.get(getRequest, RequestOptions.DEFAULT)
            println(response)
            // {"_index":"movie_test","_type":"_doc","_id":"1001","_version":2,"_seq_no":1,"_primary_term":1,"found":true,"_source":{"id":"1001","movie_name":"速度与激情1"}}
            val dataStr: String = response.getSourceAsString
            println(dataStr)
        }


        /*
        查询:条件查询
        查询doubanScore>=5.0,关键词搜索red sea
        关键词高亮显示,显示第一页,每页2条,按照查询doubanScore从大到小排序
         */
        def searchByFilter(): Unit = {
            // 这样复杂的条件搜索,先写DSL
            /*
            POST movie_index/_update_by_query{"query":{"bool":{"filter":[{"range":{"doubanScore":{"gte":5.0}}}],"must":[{"match":{"name":"redsea"}}]}},"highlight":{"fields":{"name":{}}},"from":0,"size":2,"sort":[{"doubanScore":{"order":"desc"}}]}
             */
            val searchRequest = new SearchRequest("movie_index")
            val searchSourceBuilder = new SearchSourceBuilder()
            // bool
            val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()
            // filter
            val rangeQueryBuilder: RangeQueryBuilder = QueryBuilders.rangeQuery("doubanScore").gte(5.0)
            boolQueryBuilder.filter(rangeQueryBuilder)
            // must
            val matchQueryBuilder: MatchQueryBuilder = QueryBuilders.matchQuery("name", "red sea")
            boolQueryBuilder.must(matchQueryBuilder)
            searchSourceBuilder.query(boolQueryBuilder)
            // 高亮
            val highlightBuilder = new HighlightBuilder()
            highlightBuilder.field("name")
            searchSourceBuilder.highlighter(highlightBuilder)
            // 分页
            searchSourceBuilder.from(0)
            searchSourceBuilder.size(2)
            // 排序
            searchSourceBuilder.sort("doubanScore", SortOrder.DESC)

            searchRequest.source(searchSourceBuilder)
            val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)

            // 获取总条数
            val totalDocs: Long = searchResponse.getHits.getTotalHits.value
            // 获取明细数据
            val hits: Array[SearchHit] = searchResponse.getHits.getHits
            for (hit <- hits) {
                // 提取数据
                val dataJson: String = hit.getSourceAsString()
                // 提取高亮
                val highlightFields: util.Map[String, HighlightField] = hit.getHighlightFields
                val highlightField: HighlightField = highlightFields.get("name")
                val fragments: Array[Text] = highlightField.getFragments
                val highlightValue: String = fragments(0).toString
                println("明细数据" + dataJson)
                println("高亮数据" + highlightValue)
            }
        }
        /*
        查询:聚合查询
        查询每位演员参演的电影的平均分,倒叙排列
         */
        def searchByAggs(): Unit = {
            // GET/movie_index/_search{"aggs":{"groupbyactorname":{"terms":{"field":"actorList.name.keyword","size":10,"order":{"doubanscoreavg":"desc"}},"aggs":{"doubanscoreavg":{"avg":{"field":"doubanScore"}}}}},"size":0}
            val searchRequest = new SearchRequest("movie_index")
            val searchSourceBuilder = new SearchSourceBuilder()
            // 不要明细
            searchSourceBuilder.size(0)
            // group
            val termsAggregationBuilder: TermsAggregationBuilder = AggregationBuilders
            .terms("groupbyactorname")
            .field("actorList.name.keyword")
            .size(10)
            .order(BucketOrder.aggregation("doubanscoreavg", false))
            // avg
            val avgAggregationBuilder: AvgAggregationBuilder = AggregationBuilders.avg("doubanscoreavg").field("doubanScore")
            termsAggregationBuilder.subAggregation(avgAggregationBuilder)
            searchSourceBuilder.aggregation(termsAggregationBuilder)

            searchRequest.source(searchSourceBuilder)
            val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)

            // 拿到演员与平均分
            val aggregations: Aggregations = searchResponse.getAggregations
            val groupbyactornameParsedTerms: ParsedTerms = aggregations.get[ParsedTerms]("groupbyactorname")
            val buckets: util.List[_ <: Terms.Bucket] = groupbyactornameParsedTerms.getBuckets
            import scala.collection.JavaConverters._
            for (bucket <- buckets.asScala) {
                // 演员名字
                val actorName: String = bucket.getKeyAsString
                // 电影个数
                val movieCount: Long = bucket.getDocCount
                // 平均分
                val aggregations: Aggregations = bucket.getAggregations
                val doubanscoreavgParsedAvg: ParsedAvg = aggregations.get[ParsedAvg]("doubanscoreavg")
                val avgScore: Double = doubanscoreavgParsedAvg.getValue
                println(s"${actorName} 共参演了 ${movieCount} 部,平均分为 ${avgScore}")
            }
        }
    }

    // 声明客户端对象
    var client: RestHighLevelClient = create()

    // 创建客户端对象
    def create(): RestHighLevelClient = {
        val builder: RestClientBuilder = RestClient.builder(new HttpHost("127.0.0.1", 9200))
        val esClient = new RestHighLevelClient(builder)
        esClient
    }

    // 关闭客户端对象
    def close(): Unit = {
        client.close()
        client = null
    }
    case class Movie(id: String, movie_name: String)
}

到了这里,关于Scala连接ES客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ES查询客户端初始化(RestHighLevelClient)

    另外,如果说把es当成一个数据库使用,可以看下开源项目easy-es,操作更方便。文档地址:快速开始 | Easy-Es

    2024年02月11日
    浏览(51)
  • es最大相似度检索(原生与java客户端)

    原生rest: 对“不好”进行分词, \\\"operator\\\": \\\"and\\\" 意思是同时满足。 结果: java RestHighLevelClient

    2024年02月11日
    浏览(48)
  • 袁庭新ES系列15节|Elasticsearch客户端基础操作

    上一章节我们介绍了搭建Elasticsearch集群相关的知识。那么又该如何来操作Elasticsearch集群呢?在ES官网中提供了各种语言的客户端,我们在项目开发过程中有多种Elasticsearch版本和连接客户端可以选择,那么他们有什么区别?这一章节袁老师带领大家来学习Elasticsearch客户端相关

    2024年04月25日
    浏览(48)
  • 【Elasticsearch学习笔记五】es常用的JAVA API、es整合SpringBoot项目中使用、利用JAVA代码操作es、RestHighLevelClient客户端对象

    目录 一、Maven项目集成Easticsearch 1)客户端对象 2)索引操作 3)文档操作 4)高级查询 二、springboot项目集成Spring Data操作Elasticsearch 1)pom文件 2)yaml 3)数据实体类 4)配置类 5)Dao数据访问对象 6)索引操作 7)文档操作 8)文档搜索 三、springboot项目集成bboss操作elasticsearch

    2023年04月09日
    浏览(49)
  • Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

    🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。 🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生 两百万 的数据量

    2024年02月11日
    浏览(45)
  • 【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

    🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。 🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生 两百万 的数据量

    2024年02月14日
    浏览(45)
  • es(Elasticsearch)客户端Elasticsearch-head安装使用(04Elasticsearch-head安装篇)

    elasticsearch-head是一款专门针对于elasticsearch的客户端工具,用来展示数据。elasticsearch-head是基于JavaScript语言编写的,可以使用npm部署,npm是Nodejs下的包管理器 安裝方式利用npm和nodejs进行安装启动,github中给出的安装方法也是这种,本文就是以这种方式进行解说 es(Elasticsearc

    2024年01月17日
    浏览(48)
  • spring data elasticsearch使用7.x客户端兼容es 8.x和使用ssl构建RestHighLevelClient

    es在7.x中默认加入elastic security组件所以java client需要使用ssl连接es server. es 8.x 中废弃了 RestHighLevelClient ,使用新版的 java api client ,但是spring data elasticsearch还未更新到该版本.所以需要兼容es 8.x 如下是RestHighLevelClient构建方法: spring data elasticsearch客户端依赖(基于spring boot2.7使用最新

    2024年02月13日
    浏览(49)
  • openGauss数据库客户端连接工具之Datastudio安装

    Datastudio使用前电脑必须安装jdk1.8版本或者1.11版本,如未安装可点击以下连接,参考第一步把jdk给安装成功。 点击此处查看jdk安装步骤 Datastudio 下载地址 :软件包|Datastudio 下载完成后,解压安装包,双击exe文件打开软件。 本次使用的Datastudio是3.1.0版本,电脑安装jdk是1.8.0.必

    2023年04月08日
    浏览(55)
  • AWS 中文入门开发教学 36- 连接MySQL - MySQL客户端工具

    在EC2安装 MySQL 客户端工具,连接到 MySQL RDS 数据库实例 拷贝数据库终端节点 安装 MySQL 客户端命令行工具 连接到 MySQL 服务器实例 建立数据表 添加数据

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包