数据库➡ES,数据更新同步

这篇具有很好参考价值的文章主要介绍了数据库➡ES,数据更新同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、环境:

        mysql5.7(自己部署),elasticsearch:7.4.2(自己部署),centos7.0(虚拟环境),python3.8(自己部署),pycharm(自己安装);--建议可用docker部署mysql和es

2、实现

        ①、mysql数据,estable表字段(ID--unique,VALUE--str is json format)唯一ID,json格式的字符串,(表中存的为需要更新的数据)

        ②、python3.8新建py,操作es和mysql

                 py文件放同一目录下

         a、UpdateEsClass.py
# -*- coding: utf-8 -*-

# 引入json库
import json

# 引入es库
from elasticsearch import Elasticsearch as es
from elasticsearch import helpers as esh

# 引入mysql库
import pymysql as ps


# ES操作
class esoper():

    # 连接
    def __init__(self,host,http_auth,scheme,port):

        self.host = host
        self.http_auth = http_auth
        self.scheme = scheme
        self.port = port

        self.es = es(self.host,http_auth=self.http_auth,scheme=self.scheme,port=self.port)

   # 创建索引
    def create(self,index):

        # 如果不存在创建
        if not self.es.indices.exists(index=index):
            self.es.indices.create(index=index)

    # 查询
    def search(self,index,body,filter_path):

        result = self.es.search(index=index,body=body,filter_path=filter_path)
        return result

    # 批量更新
    def update(self,actions):
        '''
            批量更新样式
            action_3 = {
                "_index": "exam",   # 索引名称
                "_id": 24,  # id
                "_source": {"age": 21, "name": "张无忌", "address": "武当"}, # 需更新的值
            }
            action_list = [action_3]
            esh.bulk(self.es, actions=action_list)
        '''

        esh.bulk(self.es, actions=actions)

        return '更新已完成'


    # 删除
    def delete(self,index,ids):

        for i in ids:
            # self.es.delete(index=index,id=i[0])
            self.es.delete(index=index,id=i[0], ignore=[400, 404])
        # self.es.delete(index=index, id=ids)

        print('删除已完成')

    # 删除索引
    def truncate(self,index):

        self.es.indices.delete(index=index)

        print('删除索引成功')

    # 插入
    def insert(self,index,body):

        for i in body:

            js = json.loads(i[1])

            self.es.index(index=index,body=js,id=i[0])

        print('插入已完成')

    # 关闭连接
    def close(self):

        self.es.transport.close()

# mysql数据库操作
class databasesoper():

    # 数据库初始化
    def __init__(self,host,port,user,password,dataname):

        self.host=host
        self.port=port
        self.user=user
        self.password=password
        self.dataname=dataname
        self.conn = None
        self.cursor = None

    # 连接数据库
    def connect(self):

        # 连接
        self.conn = ps.connect(host=self.host,port=self.port,user=self.user,password=self.password,database=self.dataname)

        # 游标
        self.cursor = self.conn.cursor()

    # 执行查询 查出需要更新的数据
    def exec(self,sql):
        """
        执行sql语句
        :param sql:  sql语句
        :param params: 参数
        :return: sql查询出的结果
        """

        # 定义一个空列表
        list1 = []

        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchall()

            for i in result:
                list1.append(list(i))

            return list1
        except Exception as e:
            print(e)

    # 获取游标
    def getCursor(self):
        """
        获取操作游标
        :return: 操作游标
        """
        return self.cursor

    # 关闭数据库 关闭游标
    def close(self):
        """
        关闭数据库连接和操作游标
        """
        self.cursor.close()
        self.conn.close()

                  b、UpdateEsData.py

# -*- coding: utf-8 -*-

# 时间
import time
# 自定义模块
import UpdateEsClass as uc

# mysql数据库
def mysqlconn():
    # 创建数据库对象,替换为自己的数据库
    mysqlco = uc.databasesoper(host='192.168.10.221', port=3306, user='lwd', password='000000', dataname='esquery')

    # 创建数据库连接
    mysqlco.connect()

    # 查询数据库数据进入内存
    sqlid = 'select user_id from estable'
    sqlvalue = 'select * from estable where user_id = 62295'

    # 执行sql 拿到数据
    resultid = mysqlco.exec(sqlid)
    resultvalue = mysqlco.exec(sqlvalue)

    # 关闭连接
    mysqlco.close()

    # 返回ID和值
    return resultid ,resultvalue
    # 数据格式  (55555,[55555,'{"10001":"你是谁?","10002":"这是一条测试数据。","10003":"标志符号(1-是、2-否)"}'])


def escoon(resultvalue):

    # 创建ES对象,替换为自己的es服务
    es = uc.esoper(['192.168.10.221:9200'], \
                   # 有就设置 账号 密码
                   http_auth=('', ''), \
                   scheme="http", \
                   port=9200, )

    # 创建索引
    es.create(index='person')

    # 需更新的列表
    actions = []

    for action in resultvalue:
        actionformat = {
            "_index": "person",  # 索引名称
            "_id": action[0],  # id
            "_source": eval(action[1]),  # 需更新的值 需要为字典类型 eval转换字符串字典为字典类型
        }

        actions.append(actionformat)

    # 更新数据
    es.update(actions=actions)

    # 查询范式
    body = {
        "query": {
            "match": {
                "_id": "62295"
            }
        },
        "_source": "100083"

    }

    # 查询字段
    filter_path = ['hits.hits._source',
                   'hits.hits._id']

    # 查询
    esquery = es.search(index='person', body=body, filter_path=filter_path)

    # 关闭es
    es.close()

    return esquery

if __name__ == '__main__':

    # 开始时间
    print('开始时间',time.time())

    # 获取数据库数据
    resultid, resultvalue = mysqlconn()

    # 更新查询
    esquery = escoon(resultvalue)

    #
    print(esquery)

    # 结束时间
    print('结束时间', time.time())

3、效果

es 更新数据,sql,elasticsearch文章来源地址https://www.toymoban.com/news/detail-799163.html

到了这里,关于数据库➡ES,数据更新同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【es数据库】python 使用Elasticsearch数据库

    Elasticsearch是一个开源的高扩展性搜索引擎,它可以快速地存储、搜索和分析大量的数据。 使用Python语言和Elasticsearch,可以轻松地创建和操作“数据库”和“数据库表”,而且具备分布式和高扩展性的特点,适用于大规模数据存储与搜索场景。 ES是一种文档数据库,它并不像

    2024年02月12日
    浏览(42)
  • 【ES数据库】Elasticsearch安装使用

    Elasticsearch 和 MongoDB/Redis 类似,是非关系型数据库,从索引文档到文档能被搜索到只有一个轻微的延迟,是采用Restful API标准的可扩展和高可用的实时数据分析的全文搜索工具 Elastic Search 的实现原理是,利用内置分词器(Analyzer)对数据库文本进行分词,将解析出的和数据

    2024年02月04日
    浏览(42)
  • elasticsearch 安装 (es数据库安装详细)

    以下操作在debian11下,其它linux版本相同 安装的是8.6.2版本(2023.3),可以直接复制使用 以下操作默认在root下进行,如果切换用户会说明 1.下载安装包 注意:如果要安装kibana,版本尽量要一致 主体程序从这里下载 链接: es 2.创建es的用户 3.创建es存储位置 存放在/var/es(根据自

    2024年02月05日
    浏览(48)
  • datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控) 2.datax版本:自己编译的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个

    2023年04月23日
    浏览(51)
  • 【SpringBoot笔记28】SpringBoot集成ES数据库之操作doc文档(创建、更新、删除、查询)

    这篇文章,主要介绍SpringBoot集成ES数据库之操作doc文档(创建、更新、删除、查询)。 目录 一、SpringBoot操作ES文档数据 1.1、创建文档 1.2、更新文档 1.3、删除文档

    2024年02月08日
    浏览(51)
  • ELK(Elasticsearch、Kibana、Logstash)以及向ES导入mysql数据库数据或CSV文件数据,创建索引和可视化数据

    地址:Past Releases of Elastic Stack Software | Elastic 在Products和version处分别选择需要下载的产品和版本,E(elasticsearch)L(logstash)K(kibana)三者版本必须相同 将下载好的elk分别解压到相同路径下 本文中elasticsearch=E=ES=es;L=logstash;K=kibana 一般情况下使用默认配置即可,下面对我的

    2024年02月15日
    浏览(59)
  • Java SpringBoot API 实现ES(Elasticsearch)搜索引擎的一系列操作(超详细)(模拟数据库操作)

    小编使用的是elasticsearch-7.3.2 基础说明: 启动:进入elasticsearch-7.3.2/bin目录,双击elasticsearch.bat进行启动,当出现一下界面说明,启动成功。也可以访问http://localhost:9200/ 启动ES管理:进入elasticsearch-head-master文件夹,然后进入cmd命令界面,输入npm run start 即可启动。访问http

    2024年02月04日
    浏览(57)
  • DB SQL 转 ES DSL(支持多种数据库常用查询、统计、平均值、最大值、最小值、求和语法)...

    1. 简介   日常开发中需要查询 Elasticsearch 中的数据时,一般会采用 RestHighLevelClient 高级客户端封装的API。项目中一般采用一种或多种关系型数据库(如: Mysql 、 PostgreSQL 、 Oracle 等) + NoSQL(如: Elasticsearch )存储方案;不同关系数据库可以采用 Mybatis-Plus 方案屏蔽数据库的方言

    2024年01月17日
    浏览(51)
  • Elasticsearch 系列(六)- ES数据同步和ES集群

    本章将和大家分享ES的数据同步方案和ES集群相关知识。废话不多说,下面我们直接进入主题。 1、数据同步问题 Elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,Elasticsearch也必须跟着改变,这个就是Elasticsearch与mysql之间的数据同步。 在微服务中,负责酒

    2024年04月28日
    浏览(83)
  • Elasticsearch实战-数据同步(解决es数据增量同步)

    之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。 缺点: 耦合性高,服务之间会相互影响 依赖消息队列的可靠性 启动:端口8099

    2024年02月11日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包