1、环境:
mysql5.7(自己部署),elasticsearch:7.4.2(自己部署),centos7.0(虚拟环境),python3.8(自己部署),pycharm(自己安装);--建议可用docker部署mysql和es
2、实现
①、mysql数据,estable表字段(ID--unique,VALUE--str is json format)唯一ID,json格式的字符串,(表中存的为需要更新的数据)
②、python3.8新建py,操作es和mysql
py文件放同一目录下
a、UpdateEsClass.py
# -*- coding: utf-8 -*-
# 引入json库
import json
# 引入es库
from elasticsearch import Elasticsearch as es
from elasticsearch import helpers as esh
# 引入mysql库
import pymysql as ps
# ES操作
class esoper():
# 连接
def __init__(self,host,http_auth,scheme,port):
self.host = host
self.http_auth = http_auth
self.scheme = scheme
self.port = port
self.es = es(self.host,http_auth=self.http_auth,scheme=self.scheme,port=self.port)
# 创建索引
def create(self,index):
# 如果不存在创建
if not self.es.indices.exists(index=index):
self.es.indices.create(index=index)
# 查询
def search(self,index,body,filter_path):
result = self.es.search(index=index,body=body,filter_path=filter_path)
return result
# 批量更新
def update(self,actions):
'''
批量更新样式
action_3 = {
"_index": "exam", # 索引名称
"_id": 24, # id
"_source": {"age": 21, "name": "张无忌", "address": "武当"}, # 需更新的值
}
action_list = [action_3]
esh.bulk(self.es, actions=action_list)
'''
esh.bulk(self.es, actions=actions)
return '更新已完成'
# 删除
def delete(self,index,ids):
for i in ids:
# self.es.delete(index=index,id=i[0])
self.es.delete(index=index,id=i[0], ignore=[400, 404])
# self.es.delete(index=index, id=ids)
print('删除已完成')
# 删除索引
def truncate(self,index):
self.es.indices.delete(index=index)
print('删除索引成功')
# 插入
def insert(self,index,body):
for i in body:
js = json.loads(i[1])
self.es.index(index=index,body=js,id=i[0])
print('插入已完成')
# 关闭连接
def close(self):
self.es.transport.close()
# mysql数据库操作
class databasesoper():
# 数据库初始化
def __init__(self,host,port,user,password,dataname):
self.host=host
self.port=port
self.user=user
self.password=password
self.dataname=dataname
self.conn = None
self.cursor = None
# 连接数据库
def connect(self):
# 连接
self.conn = ps.connect(host=self.host,port=self.port,user=self.user,password=self.password,database=self.dataname)
# 游标
self.cursor = self.conn.cursor()
# 执行查询 查出需要更新的数据
def exec(self,sql):
"""
执行sql语句
:param sql: sql语句
:param params: 参数
:return: sql查询出的结果
"""
# 定义一个空列表
list1 = []
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
for i in result:
list1.append(list(i))
return list1
except Exception as e:
print(e)
# 获取游标
def getCursor(self):
"""
获取操作游标
:return: 操作游标
"""
return self.cursor
# 关闭数据库 关闭游标
def close(self):
"""
关闭数据库连接和操作游标
"""
self.cursor.close()
self.conn.close()
b、UpdateEsData.py文章来源:https://www.toymoban.com/news/detail-799163.html
# -*- coding: utf-8 -*-
# 时间
import time
# 自定义模块
import UpdateEsClass as uc
# mysql数据库
def mysqlconn():
# 创建数据库对象,替换为自己的数据库
mysqlco = uc.databasesoper(host='192.168.10.221', port=3306, user='lwd', password='000000', dataname='esquery')
# 创建数据库连接
mysqlco.connect()
# 查询数据库数据进入内存
sqlid = 'select user_id from estable'
sqlvalue = 'select * from estable where user_id = 62295'
# 执行sql 拿到数据
resultid = mysqlco.exec(sqlid)
resultvalue = mysqlco.exec(sqlvalue)
# 关闭连接
mysqlco.close()
# 返回ID和值
return resultid ,resultvalue
# 数据格式 (55555,[55555,'{"10001":"你是谁?","10002":"这是一条测试数据。","10003":"标志符号(1-是、2-否)"}'])
def escoon(resultvalue):
# 创建ES对象,替换为自己的es服务
es = uc.esoper(['192.168.10.221:9200'], \
# 有就设置 账号 密码
http_auth=('', ''), \
scheme="http", \
port=9200, )
# 创建索引
es.create(index='person')
# 需更新的列表
actions = []
for action in resultvalue:
actionformat = {
"_index": "person", # 索引名称
"_id": action[0], # id
"_source": eval(action[1]), # 需更新的值 需要为字典类型 eval转换字符串字典为字典类型
}
actions.append(actionformat)
# 更新数据
es.update(actions=actions)
# 查询范式
body = {
"query": {
"match": {
"_id": "62295"
}
},
"_source": "100083"
}
# 查询字段
filter_path = ['hits.hits._source',
'hits.hits._id']
# 查询
esquery = es.search(index='person', body=body, filter_path=filter_path)
# 关闭es
es.close()
return esquery
if __name__ == '__main__':
# 开始时间
print('开始时间',time.time())
# 获取数据库数据
resultid, resultvalue = mysqlconn()
# 更新查询
esquery = escoon(resultvalue)
#
print(esquery)
# 结束时间
print('结束时间', time.time())
3、效果
文章来源地址https://www.toymoban.com/news/detail-799163.html
到了这里,关于数据库➡ES,数据更新同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!