Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

这篇具有很好参考价值的文章主要介绍了Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict


class ESClient:

    def __init__(self, host="127.0.0.1",index="", http_auth = None):
        self.index = index
        if http_auth is None:
            self.es = Elasticsearch(hosts=host)
        else:
            self.es = Elasticsearch(hosts=host, http_auth=http_auth)
        print("success to connect " + host)

    def close(self):
        self.es.close()

    # 设置索引
    def set_index(self,index:str):
        self.index = index

    # 创建索引
    def create_index(self, index_name: str, mappings=None):
        res = self.es.indices.create(index=index_name, mappings=mappings)
        return res

    # 删除索引
    def delete_index(self, index_name: str):
        res = self.es.indices.delete(index=index_name)
        return res

    # 获取索引
    def get_index(self, index_name: str):
        res = self.es.indices.get(index=index_name)
        return res

    # 创建文档(单个)
    def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
        res = self.es.create(index=self.index, body=body, id=_id)
        return res

    # 创建文档(批量)
    def create_doc_bulk(self, docs: List[Dict]):
        actions = []
        for doc in docs:
            action = {
                "_index": self.index,
                "_op_type": "create",
                "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
            }
            for k,v in doc.items():
                action[k] = v
            actions.append(action)
        res = bulk(client=self.es, actions=actions)
        return res

    # 删除文档
    def delete_doc(self, doc_id):
        res = self.es.delete(index=self.index, id=doc_id)
        return res

    # 更新文档
    def update_doc(self, doc_id, doc:Dict):
        body = {
            "doc" : doc
        }
        res = self.es.update(index=self.index, id=doc_id, body=body)
        return res

    # 分页获取超过100000的文档
    def get_doc_scroll(self,query:Dict):
        res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
        data_list = []
        hits = res.get("hits")
        scroll_id = res.get('_scroll_id')
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'),Dict):
            total_value= hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value>0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return scroll_id,data_list

    # 通过scroll_id分页获取后序文档
    def get_doc_by_scroll_id(self,scroll_id):
        page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
        data_list = []
        scroll_id = page.get('_scroll_id')
        for data in page.get('hits').get('hits'):
            data_list.append(data)
        return scroll_id,data_list

    # 清空scroll_id,防止服务端不够用
    def clear_scroll(self,scroll_id):
        self.es.clear_scroll(scroll_id)

    # 获取索引的hits内容(一般用于获取文档id、总数)
    def get_doc_all(self):
        res = self.es.search(index=self.index)
        return res['hits']

    # 获取一个文档
    def get_doc_by_id(self, id_):
        res = self.es.get(index=self.index, id=id_)
        return res["_source"]

    # 获取所有文档的_source内容(小于100000)
    def get_doc(self,query:Dict,size:int=100000):
        query['size'] = size
        res = self.es.search(index=self.index,body=query)
        data_list = []
        hits = res.get("hits")
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'), Dict):
            total_value = hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value > 0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return data_list

    # 聚合查询(分组条件名为group_by,返回buckets)
    def get_doc_agg(self, query):
        res = self.es.search(index=self.index, body=query)
        return res['aggregations']['group_by'].get('buckets')

    # 统计查询(统计条件为stats_by,返回最值、平均值等)
    def get_doc_stats(self,query):
        res = self.es.search(index=self.index,body=query)
        return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClient

cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
    res = cli.create_index(index_name="test")
    print(res)

def test_delete_index():
    res = cli.delete_index(index_name="test")
    print(res)

def test_get_index():
    res = cli.get_index(index_name="test")
    print(res)

def test_set_index():
    cli.set_index(index="test")

def test_create_doc():
    body = {
        "name": "lady_killer9",
        "age": 19
    }
    res = cli.create_doc(body=body)
    print(res)

def test_create_doc_bulk():
    from copy import deepcopy
    body = {
        "name": "lady_killer9"
    }
    users = []
    for i in range(100001):
        tmp = deepcopy(body)
        tmp["age"] = i
        users.append(tmp)
    res = cli.create_doc_bulk(docs=users)
    print(res)


def test_get_doc_all():
    res = cli.get_doc_all()
    print(res)


def test_get_doc_by_id():
    res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc():
    query = {
        "query": {
            "match_all": {

            }
        }
    }
    res = cli.get_doc(query=query,size=20)
    print(res)

def test_update_doc():
    body={
        "name": "lady_killer_after_update"
    }
    res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
    print(res)


def test_delete_doc():
    res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc_agg():
    query = {
            "aggs": {
                "group_by": {
                    "terms": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_agg(query=query)
    print(res)

def test_get_doc_stats():
    query = {
            "aggs": {
                "stats_by": {
                    "stats": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_stats(query=query)
    print(res)

def test_get_doc_scroll():
    query = {
        "query": {
            "match_all": {}
        }
    }
    scroll_id,data_list = cli.get_doc_scroll(query=query)
    res = []
    while data_list:
        res.extend(data_list)
        scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
    print(len(res))


if __name__ == '__main__':
    # test_delete_index()
    test_create_index()
    test_get_index()
    # test_set_index()
    # test_create_doc()
    # test_create_doc_bulk()
    # test_get_doc_all()
    # test_update_doc()
    # test_get_doc_by_id()
    # test_get_doc()
    # test_delete_doc()
    # test_get_doc_agg()
    # test_get_doc_stats()
    # test_get_doc_scroll()
    cli.close()

测试截图
Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据),python,# Python进阶教程,python,elasticsearch
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。

参考

github-elasticsearch
github-elasticsearch-dsl文章来源地址https://www.toymoban.com/news/detail-620073.html

到了这里,关于Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python-Python高阶技巧:HTTP协议、静态Web服务器程序开发、循环接收客户端的连接请求

    当前版本号[20231114]。 版本 修改说明 20231114 初版 1.1 网址的概念 网址又称为URL,URL的英文全拼是(Uniform Resoure Locator),表达的意思是 统一资源定位符 ,通俗理解就是网络资源地址。 URL地址:https://www.itcast.com/18/1122/10/E178J2O4000189FH.html 1.2 URL的组成 域名 : IP地址的别名 ,它是用

    2024年02月04日
    浏览(65)
  • python-kafka客户端封装

    本文对python的kafka包做简单封装,方便kafka初学者使用。包安装: kafka_helper.py kafka_test.py Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

    2024年02月09日
    浏览(42)
  • samba客户端的使用与服务端的搭建

    samba是SMB文件共享协议的应用软件,可以让Linux系统和Windows系统之间相互共享资源。 在Linux系统中使用smbd服务器提供提供共享资源;使用smbclient去访问其他smbd服务器所提供的共享资源。 在Windows中有SMB/CIFS服务器和客户端。 SMB(Server Message Block)协议是一种在局域网上共享资

    2024年02月02日
    浏览(34)
  • TCP通信—客户端与客户端的双向通信

    1.实现客户端与客户端之间的TCP双向通信; 2.服务器记录客户端实名连接,并显示客户端数据记录; 3.客户端退出,服务器和另一客户端显示相应提示; 1.定义数组保存客户端文件描述符; 2.主程序负责接收客户端1的数据发送给客户端2; 3.开展一个线程负责接收到客户端2的

    2024年02月15日
    浏览(44)
  • 邮件客户端的配置使用

    winmail收到新邮件不会提示用户,这样用户就容易错过消息 下载foxmail可以解决这个问题 1.实体机进入官网https://www.foxmail.com/,点击下载版本随意 下载好后拖到虚拟机上 1.开始创建账号 因为这几种邮箱都不是我们上个实验的邮箱选择其他邮箱 公网上的邮箱直接输入地址和密码

    2024年03月19日
    浏览(46)
  • git客户端的使用

    分布式版本控制工具。 具有中央服务器仓库和本地仓库。 客户端下载:GitHub Desktop | Simple collaboration from your desktop 左上角: File - New repository 本地的两个仓库: 查看当前仓库的路径并打开: 注意: 路径①是用户可以手动操作文件的工作目录;路径②存放该仓库所有的操作、

    2024年02月06日
    浏览(43)
  • 配置 Git 客户端的代理设置

    如果有用户名密码按照下面命令配置 取消代理 查看配置列表 打开配置界面如下   参考链接 https://m.php.cn/faq/506377.html

    2024年02月13日
    浏览(40)
  • Transport Client 客户端的使用

    官方文档:TransportClient 这里描述操作elasticsearch采用TransportClient这种方式,官方明确表示在ES 7.0版本中将弃用TransportClient客户端,且在8.0版本中完全移除它. properties配置 1)通过配置类 2)使用@PostConstruct注解 如果整合其他客户端时使用这种方式 3)测试代码的时候使用这种 参

    2024年02月09日
    浏览(46)
  • ios客户端学习笔记(七):iOS客户端的UI设计

    iOS客户端的UI设计是指在iOS操作系统上开发应用程序时所涉及的用户界面设计,包括应用程序的布局、颜色、字体、图标等元素的设计。良好的UI设计应该能够提高用户体验,使用户能够轻松地使用应用程序。 在iOS客户端的UI设计中,需要考虑以下几个方面: 应用程序布局应

    2023年04月26日
    浏览(49)
  • JavaWeb 获取客户端的真实IP地址

    通常我们在JavaWeb中获取客户端IP地址只需要使用 request.getRemoteAddr(); 方法即可 如果前端使用了Nginx等反向代理的话,我们使用 request.getRemoteAddr(); 方法获取到的IP地址就是 127.0.0.1 因为经过代理以后,在客户端和服务器之间增加了中间层,因此服务器无法直接拿到客户端的 IP 但

    2024年02月15日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包