1.前言
大家好,我是楚生辉,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
本文详细的介绍了如何使用Scala语言连接上Elasticsearch客户端,有需要的小伙伴可以自行获取与学习~文章来源:https://www.toymoban.com/news/detail-812341.html
2.ES工具类
package com.xxxx
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeConfig
import org.apache.http.HttpHost
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.search.{SearchRequest, SearchResponse}
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.search.{SearchHit, SearchHits}
import org.elasticsearch.search.builder.SearchSourceBuilder
import java.util
import scala.collection.mutable.ListBuffer
/**
* ES工具类
* 用于对ES读写操作
*/
object MyEsutils {
def searchField(indexName: String, fieldName: String): List[String] = {
// 先判断索引是否存在
val request = new GetIndexRequest(indexName)
val bool: Boolean = esClient.indices().exists(request, RequestOptions.DEFAULT)
if (!bool){
return null
}
// 正常从ES中提取指定的字段
val mids: ListBuffer[String] = ListBuffer[String]()
val searchRequest = new SearchRequest(indexName)
val searchSourceBuilder = new SearchSourceBuilder()
searchSourceBuilder.fetchSource(fieldName,null).size(10000)
searchRequest.source(searchSourceBuilder)
val searchResponse: SearchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT)
val hits: Array[SearchHit] = searchResponse.getHits.getHits
for (hit <- hits) {
val sourceAsMap: util.Map[String, AnyRef] = hit.getSourceAsMap
val mid: String = sourceAsMap.get(fieldName).toString
mids.append(mid)
}
mids.toList
}
// ES客户端对象
val esClient : RestHighLevelClient = build()
// 创建ES客户端
def build():RestHighLevelClient = {
val host: String = "localhost"
val port: String = "9200"
val builder: RestClientBuilder = RestClient.builder(new HttpHost(host,port.toInt))
val client = new RestHighLevelClient(builder)
client
}
// 关闭ES对象
def close():Unit={
if (esClient != null){
esClient.close()
}
}
/*
* 1.批量写
* 2.幂等写
*/
def bulkSave(indexName:String,docs: List[(String,AnyRef)]):Unit = {
val bulkRequest = new BulkRequest()
for ((docId,docObj) <- docs) {
val indexRequest = new IndexRequest(indexName)
val dataJson: String = JSON.toJSONString(docObj, new SerializeConfig(true))
indexRequest.source(dataJson,XContentType.JSON)
indexRequest.id(docId)
bulkRequest.add(indexRequest)
}
esClient.bulk(bulkRequest,RequestOptions.DEFAULT)
}
}
- 使用方法
MyEsutils.bulkSave(indexName,orderWides)
MyEsutils.searchField(indexName,orderWides)
写入ES中,就要规划,是写入到一个索引中,还是分割索引(依据什么进行分割),建索引字段的类型,模板是什么,以及方便后续查询起索引别名文章来源地址https://www.toymoban.com/news/detail-812341.html
3.ES客户端常见API使用
package com.xxxx
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeConfig
import org.apache.http.HttpHost
import org.apache.lucene.search.TotalHits
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.get.{GetRequest, GetResponse}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.search.{SearchRequest, SearchResponse}
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}
import org.elasticsearch.common.text.Text
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.query.{BoolQueryBuilder, MatchQueryBuilder, QueryBuilder, QueryBuilders, RangeQueryBuilder, TermQueryBuilder}
import org.elasticsearch.index.reindex.UpdateByQueryRequest
import org.elasticsearch.script.{Script, ScriptType}
import org.elasticsearch.search.SearchHit
import org.elasticsearch.search.aggregations.bucket.terms.{ParsedTerms, Terms, TermsAggregationBuilder}
import org.elasticsearch.search.aggregations.metrics.{AvgAggregationBuilder, ParsedAvg}
import org.elasticsearch.search.aggregations.{Aggregation, AggregationBuilder, AggregationBuilders, Aggregations, BucketOrder}
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.search.fetch.subphase.highlight.{HighlightBuilder, HighlightField}
import org.elasticsearch.search.sort.SortOrder
import org.elasticsearch.search.suggest.term.TermSuggestion.Score
import java.util
/**
* 测试ES客户端
*/
object EsTest {
def main(args: Array[String]): Unit = {
// post()
// bulk()
// getById()
// searchByFilter()
searchByAggs()
close()
/*
增:幂等
*/
def put(): Unit = {
val indexRequest = new IndexRequest()
// 指定索引
indexRequest.index("movie_test")
// 指定doc
val movie: Movie = Movie("1001", "速度与激情1")
val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
indexRequest.source(movieJson, XContentType.JSON)
indexRequest.index("movie_test")
indexRequest.id("1001")
client.index(indexRequest, RequestOptions.DEFAULT)
}
/*
增:非幂等写,不指定id
*/
def post(): Unit = {
val indexRequest = new IndexRequest()
// 指定索引
indexRequest.index("movie_test")
// 指定doc
val movie: Movie = Movie("1002", "速度与激情2")
val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
indexRequest.source(movieJson, XContentType.JSON)
indexRequest.index("movie_test")
client.index(indexRequest, RequestOptions.DEFAULT)
}
/*
批量写
*/
def bulk(): Unit = {
val bulkRequest = new BulkRequest()
val movies: List[Movie] = List[Movie](
Movie("1002", "长津湖"),
Movie("1003", "熊出没"),
Movie("1004", "狙击手"),
Movie("1005", "长门桥")
)
for (movie <- movies) {
// 指定索引
val indexRequest = new IndexRequest("movie_test")
val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))
indexRequest.source(movieJson, XContentType.JSON)
// 如果是幂等,就指定id,不是幂等就不指定
indexRequest.id(movie.id)
// 将indexRequest加入到bulk批次中
bulkRequest.add(indexRequest)
}
// 最后一次批次执行
client.bulk(bulkRequest, RequestOptions.DEFAULT)
}
/*
修改:单条修改
*/
def update(): Unit = {
val updateRequest = new UpdateRequest()
updateRequest.index("movie_test")
updateRequest.id("1001")
// 把docid为1001的数据,修改movie_name的值
updateRequest.doc("movie_name", "功夫")
client.update(updateRequest, RequestOptions.DEFAULT)
}
/*
修改:条件修改 把电影名为速度与激情的都修改
*/
def updateByQuery(): Unit = {
val updateByQueryRequest = new UpdateByQueryRequest("movie_test")
// query
val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()
val termQueryBuilder: TermQueryBuilder = QueryBuilders.termQuery("movie_name.keyword", "速度与激情")
boolQueryBuilder.filter(termQueryBuilder)
updateByQueryRequest.setQuery(boolQueryBuilder)
// update
val params = new util.HashMap[String, AnyRef]()
params.put("newName", "湄公河行动")
val script = new Script(
ScriptType.INLINE,
Script.DEFAULT_SCRIPT_LANG,
"ctx._source['movie_name']=params.newName",
params
)
updateByQueryRequest.setScript(script)
client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)
}
/*
删除
*/
def delete(): Unit = {
val deleteRequest = new DeleteRequest("movie_test", "1001")
client.delete(deleteRequest, RequestOptions.DEFAULT)
}
/*
查询:单条查询
*/
def getById(): Unit = {
val getRequest = new GetRequest("movie_test", "1001")
val response: GetResponse = client.get(getRequest, RequestOptions.DEFAULT)
println(response)
// {"_index":"movie_test","_type":"_doc","_id":"1001","_version":2,"_seq_no":1,"_primary_term":1,"found":true,"_source":{"id":"1001","movie_name":"速度与激情1"}}
val dataStr: String = response.getSourceAsString
println(dataStr)
}
/*
查询:条件查询
查询doubanScore>=5.0,关键词搜索red sea
关键词高亮显示,显示第一页,每页2条,按照查询doubanScore从大到小排序
*/
def searchByFilter(): Unit = {
// 这样复杂的条件搜索,先写DSL
/*
POST movie_index/_update_by_query{"query":{"bool":{"filter":[{"range":{"doubanScore":{"gte":5.0}}}],"must":[{"match":{"name":"redsea"}}]}},"highlight":{"fields":{"name":{}}},"from":0,"size":2,"sort":[{"doubanScore":{"order":"desc"}}]}
*/
val searchRequest = new SearchRequest("movie_index")
val searchSourceBuilder = new SearchSourceBuilder()
// bool
val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()
// filter
val rangeQueryBuilder: RangeQueryBuilder = QueryBuilders.rangeQuery("doubanScore").gte(5.0)
boolQueryBuilder.filter(rangeQueryBuilder)
// must
val matchQueryBuilder: MatchQueryBuilder = QueryBuilders.matchQuery("name", "red sea")
boolQueryBuilder.must(matchQueryBuilder)
searchSourceBuilder.query(boolQueryBuilder)
// 高亮
val highlightBuilder = new HighlightBuilder()
highlightBuilder.field("name")
searchSourceBuilder.highlighter(highlightBuilder)
// 分页
searchSourceBuilder.from(0)
searchSourceBuilder.size(2)
// 排序
searchSourceBuilder.sort("doubanScore", SortOrder.DESC)
searchRequest.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
// 获取总条数
val totalDocs: Long = searchResponse.getHits.getTotalHits.value
// 获取明细数据
val hits: Array[SearchHit] = searchResponse.getHits.getHits
for (hit <- hits) {
// 提取数据
val dataJson: String = hit.getSourceAsString()
// 提取高亮
val highlightFields: util.Map[String, HighlightField] = hit.getHighlightFields
val highlightField: HighlightField = highlightFields.get("name")
val fragments: Array[Text] = highlightField.getFragments
val highlightValue: String = fragments(0).toString
println("明细数据" + dataJson)
println("高亮数据" + highlightValue)
}
}
/*
查询:聚合查询
查询每位演员参演的电影的平均分,倒叙排列
*/
def searchByAggs(): Unit = {
// GET/movie_index/_search{"aggs":{"groupbyactorname":{"terms":{"field":"actorList.name.keyword","size":10,"order":{"doubanscoreavg":"desc"}},"aggs":{"doubanscoreavg":{"avg":{"field":"doubanScore"}}}}},"size":0}
val searchRequest = new SearchRequest("movie_index")
val searchSourceBuilder = new SearchSourceBuilder()
// 不要明细
searchSourceBuilder.size(0)
// group
val termsAggregationBuilder: TermsAggregationBuilder = AggregationBuilders
.terms("groupbyactorname")
.field("actorList.name.keyword")
.size(10)
.order(BucketOrder.aggregation("doubanscoreavg", false))
// avg
val avgAggregationBuilder: AvgAggregationBuilder = AggregationBuilders.avg("doubanscoreavg").field("doubanScore")
termsAggregationBuilder.subAggregation(avgAggregationBuilder)
searchSourceBuilder.aggregation(termsAggregationBuilder)
searchRequest.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
// 拿到演员与平均分
val aggregations: Aggregations = searchResponse.getAggregations
val groupbyactornameParsedTerms: ParsedTerms = aggregations.get[ParsedTerms]("groupbyactorname")
val buckets: util.List[_ <: Terms.Bucket] = groupbyactornameParsedTerms.getBuckets
import scala.collection.JavaConverters._
for (bucket <- buckets.asScala) {
// 演员名字
val actorName: String = bucket.getKeyAsString
// 电影个数
val movieCount: Long = bucket.getDocCount
// 平均分
val aggregations: Aggregations = bucket.getAggregations
val doubanscoreavgParsedAvg: ParsedAvg = aggregations.get[ParsedAvg]("doubanscoreavg")
val avgScore: Double = doubanscoreavgParsedAvg.getValue
println(s"${actorName} 共参演了 ${movieCount} 部,平均分为 ${avgScore}")
}
}
}
// 声明客户端对象
var client: RestHighLevelClient = create()
// 创建客户端对象
def create(): RestHighLevelClient = {
val builder: RestClientBuilder = RestClient.builder(new HttpHost("127.0.0.1", 9200))
val esClient = new RestHighLevelClient(builder)
esClient
}
// 关闭客户端对象
def close(): Unit = {
client.close()
client = null
}
case class Movie(id: String, movie_name: String)
}
到了这里,关于Scala连接ES客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!