通过logstash实现mysql与es的双向数据同步

这篇具有很好参考价值的文章主要介绍了通过logstash实现mysql与es的双向数据同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考题目

  1. 一种基于MySQL和Elasticsearch的数据同步方法及系统
  2. 基于MySQL和Elasticsearch的数据同步方法
  3. 一种基于MySQL和Elasticsearch的数据同步系统
  4. 基于MySQL和Elasticsearch的数据同步技术

目录

1【理论调研】

方案1:使用Logstash实现数据同步

方案2:使用Canal实现数据同步

方案3:使用Debezium实现数据同步

使用其他工具

2【使用Logstash实现MySQL和ES之间的双向数据同步】

2.0【MySQL测试数据库sql导入代码】

2.1【Logstash实现MySQL数据同步至ES】

2.2【Logstash实现ES数据同步至MySQL】

2.2.1【Bug记录】

2.2.2【参考文章】


1【理论调研】

实现MySQL和ES的双向数据同步,可以考虑使用以下几种解决方案:

实现MySQL和Elasticsearch(ES)之间的双向数据同步,需要使用一些工具和技术。以下是一些可能的方法:

方案1:使用Logstash实现数据同步

Logstash是一种流处理工具,可以从不同的来源获取数据并将其转换为指定格式输出到目标存储中,它支持从MySQL数据库读取数据,并将数据写入ES中,也可以从ES中读取数据并将数据写入MySQL数据库中。使用Logstash实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Logstash;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Logstash的输入和输出插件,从MySQL中读取数据并写入ES中,同时从ES中读取数据并写入MySQL中;
  4. 启动Logstash并监控同步过程。

Logstash是一个流处理引擎,可以轻松地将数据从MySQL和ES之间传输。使用Logstash,您可以轻松地将MySQL表的数据导入到ES中,也可以将ES中的数据写回MySQL表中。您可以使用以下配置文件将数据从MySQL同步到ES:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "SELECT * FROM mytable"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    document_id => "%{id}"
  }
}

这将从MySQL的“mytable”表中选择所有行,并将它们写入名为“myindex”的ES索引中。

如果您想将ES中的数据写回MySQL表中,您可以使用类似以下的配置文件:

input {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    query => '{"query": {"match_all": {}}}'
    scroll => "5m"
    docinfo => true
  }
}

output {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "UPDATE mytable SET myfield = ? WHERE id = ?"
    prepared_statement_bind_values => ["%{myfield}", "%{[@metadata][_id]}"]
  }
}

这将从名为“myindex”的ES索引中选择所有文档,并将它们写回名为“mytable”的MySQL表中。

方案2:使用Canal实现数据同步

Canal是阿里巴巴开源的一款基于数据库增量日志解析,提供增量数据订阅和消费的组件,它支持从MySQL中读取增量数据,并将数据写入ES中,同时支持从ES中读取数据并将数据写入MySQL中。使用Canal实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Canal;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Canal的实例,包括MySQL的binlog信息、ES的索引信息等;
  4. 启动Canal并监控同步过程。

需要注意的是,在使用Logstash或Canal进行数据同步时,可能会出现数据类型不匹配、数据格式错误、数据丢失等问题,需要根据具体情况进行调整和优化。同时,为了确保数据同步的实时性和准确性,可以考虑增加监控和告警机制。

方案3:使用Debezium实现数据同步

Debezium是一个开源的分布式平台,可在数据源和目标之间实现实时数据流。它支持MySQL和ES之间的数据同步,并支持双向同步。使用Debezium,您可以在MySQL和ES之间实时同步数据更改。您可以按照以下步骤使用Debezium进行双向数据同步:

  • 下载并安装Debezium
  • 配置Debezium以监视MySQL表的更改
  • 配置Debezium以将更改写入ES
  • 配置Debezium以监视ES的更改
  • 配置Debezium以将更改写回MySQL

使用其他工具

除了Logstash和Debezium之外,还有一些其他工具可用于MySQL和ES之间的数据同步。例如,您可以使用StreamSets Data Collector或Apache Nifi来将数据从MySQL导入到ES,并将数据从ES写回MySQL。您还可以编写自己的脚本来执行此操作。无论您选择哪种方法,确保您的同步逻辑能够处理。

2【使用Logstash实现MySQL和ES之间的双向数据同步】

软件版本:

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

  1. logstash -f ../config/newsManager/mysql2es.conf
  2. logstash -f ../config/newsManager/es2mysql.conf

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

2.0【MySQL测试数据库sql导入代码】

  1. MySQL数据库名称:news_manager
  2. MySQL数据库版本:5.5.40
/*
SQLyog Ultimate v12.09 (64 bit)
MySQL - 5.5.40 : Database - news_manager
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`news_manager` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `news_manager`;

/*Table structure for table `item_user` */

DROP TABLE IF EXISTS `item_user`;

CREATE TABLE `item_user` (
  `item_user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `item_id` int(11) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_user_id`),
  KEY `FK_Reference_2` (`user_id`),
  KEY `FK_Reference_3` (`item_id`),
  CONSTRAINT `FK_Reference_2` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`),
  CONSTRAINT `FK_Reference_3` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `item_user` */

insert  into `item_user`(`item_user_id`,`user_id`,`item_id`,`create_time`,`update_time`,`status`) values (1,1,2,'2020-11-23 11:24:16','2020-11-25 10:27:54',1),(2,2,4,NULL,'2020-11-25 09:38:17',1),(3,1,1,'2020-11-24 09:19:58','2020-11-25 09:38:21',1),(5,1,18,NULL,'2020-11-25 09:44:16',1),(6,1,27,'2020-11-25 11:11:35','2020-11-25 11:11:35',1),(7,1,28,'2020-11-25 11:17:59','2020-11-25 11:17:59',1),(8,1,29,'2020-11-25 11:29:14','2020-11-25 11:29:14',1),(9,1,30,'2020-11-25 11:30:54','2020-11-25 11:30:54',1),(10,1,31,'2020-11-25 11:36:51','2020-11-25 11:36:51',1),(11,1,32,'2020-11-25 16:26:23','2020-11-25 16:26:23',1),(12,1,33,'2020-11-25 16:26:37','2020-11-25 16:26:37',1),(13,1,34,'2020-11-26 10:01:29','2020-11-26 10:01:29',1),(14,1,35,'2020-11-26 10:28:53','2020-11-26 10:28:53',1);

/*Table structure for table `logs_info` */

DROP TABLE IF EXISTS `logs_info`;

CREATE TABLE `logs_info` (
  `logs_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `logs_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`logs_id`),
  KEY `FK_Reference_1` (`user_id`),
  CONSTRAINT `FK_Reference_1` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `logs_info` */

insert  into `logs_info`(`logs_id`,`user_id`,`logs_content`,`create_time`,`update_time`) values (1,1,NULL,NULL,'2020-11-24 09:27:05'),(2,2,NULL,NULL,'2020-11-24 09:27:12'),(3,4,NULL,NULL,'2020-11-23 11:29:06'),(14,1,'woshishenren','2020-11-24 09:24:52','2020-11-24 09:24:52'),(15,1,'woshishenren','2020-11-24 09:25:58','2020-11-24 09:25:58');

/*Table structure for table `news_info` */

DROP TABLE IF EXISTS `news_info`;

CREATE TABLE `news_info` (
  `new_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_id` int(11) DEFAULT NULL,
  `news_title` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `news_image` varchar(255) COLLATE utf8mb4_hungarian_ci DEFAULT NULL,
  `news_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`new_id`),
  KEY `FK_Reference_4` (`item_id`),
  CONSTRAINT `FK_Reference_4` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_info` */

insert  into `news_info`(`new_id`,`item_id`,`news_title`,`news_image`,`news_content`,`create_time`,`update_time`) values (1,2,'蓝桥杯比赛',NULL,NULL,NULL,'2020-11-23 09:27:39'),(2,3,'新学期学费',NULL,NULL,NULL,'2020-11-23 09:28:10'),(3,1,'拔河比赛',NULL,'拔河比赛要使劲!!!','2020-11-25 14:57:28','2020-11-25 14:57:32'),(4,18,'街舞比赛',NULL,'一起摇摆~','2020-11-25 15:54:09','2020-11-25 15:54:11'),(10,27,'数学建模',NULL,'一起加油!','2020-11-25 16:10:02','2020-11-25 22:17:19'),(11,29,'班班唱',NULL,'《走向复兴》','2020-11-25 16:12:23','2020-11-25 16:12:23'),(12,1,'篮球比赛',NULL,'冲冲冲~','2020-11-25 16:13:04','2020-11-25 16:13:04'),(13,1,'NECCS',NULL,'冲呀~','2020-11-25 16:27:22','2020-11-26 08:38:03'),(14,18,'卓见杯',NULL,'啦啦啦~','2020-11-25 17:41:32','2020-11-25 22:17:56'),(15,33,'动则升阳',NULL,'年轻不养生,年老养医生!','2020-11-26 00:12:42','2020-11-26 00:12:42'),(16,33,'11月26日',NULL,'筑基修士','2020-11-26 10:02:20','2020-11-26 10:02:20'),(17,35,'大家好',NULL,'333','2020-11-26 10:29:35','2020-11-26 10:29:35'),(18,35,'大家好!!!',NULL,'333','2020-11-26 10:29:45','2020-11-26 10:29:45'),(19,35,'我是新增数据!',NULL,'我是新增数据!','2023-03-15 16:43:01','2023-03-15 16:43:02');

/*Table structure for table `news_item` */

DROP TABLE IF EXISTS `news_item`;

CREATE TABLE `news_item` (
  `item_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=36 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_item` */

insert  into `news_item`(`item_id`,`item_name`,`create_time`,`update_time`,`status`) values (1,'呵呵哒','2020-11-24 15:47:00','2020-11-26 10:28:39',1),(2,'党支部','2020-11-24 15:47:03','2020-11-25 14:44:31',0),(3,'分团委','2020-11-24 15:47:05','2020-11-25 14:43:54',1),(4,'院团委','2020-11-24 15:47:08','2020-11-25 14:44:38',1),(5,'111','2020-11-23 15:22:54','2020-11-25 14:45:55',1),(6,'学生会','2020-11-24 09:27:36','2020-11-25 14:46:01',1),(8,'党支部','2020-11-24 13:51:13','2020-11-25 14:46:07',1),(18,'党支部','2020-11-25 09:11:51','2020-11-25 15:49:06',1),(19,'院团委','2020-11-25 10:42:54','2020-11-25 14:46:16',1),(20,'111','2020-11-25 10:54:12','2020-11-25 14:46:19',1),(21,'学生会','2020-11-25 10:56:21','2020-11-25 14:46:35',1),(22,'党支部','2020-11-25 10:57:35','2020-11-25 14:46:43',1),(23,'分团委','2020-11-25 11:00:20','2020-11-25 14:46:48',1),(24,'院团委','2020-11-25 11:00:47','2020-11-25 14:46:55',1),(25,'qweqwe','2020-11-25 11:01:37','2020-11-25 11:01:37',1),(26,'eqweqweqwe','2020-11-25 11:01:53','2020-11-25 11:01:53',1),(27,'分团委','2020-11-25 11:11:35','2020-11-25 15:49:18',1),(28,'sadsads','2020-11-25 11:17:59','2020-11-25 11:18:40',0),(29,'院团委','2020-11-25 11:29:13','2020-11-25 15:49:25',1),(30,'789','2020-11-25 11:30:54','2020-11-25 11:37:19',0),(31,'zyk','2020-11-25 11:36:51','2020-11-25 11:37:19',0),(32,'委员会','2020-11-25 16:26:23','2020-11-26 08:37:48',0),(33,'委员会~~~','2020-11-25 16:26:37','2020-11-26 10:01:40',1),(34,'演示~','2020-11-26 10:01:29','2020-11-26 10:01:33',0),(35,'筑基修士!!!修改!','2020-11-26 10:28:53','2023-03-15 16:44:39',1);

/*Table structure for table `user_info` */

DROP TABLE IF EXISTS `user_info`;

CREATE TABLE `user_info` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `user_pwd` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=121 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `user_info` */

insert  into `user_info`(`user_id`,`user_name`,`user_pwd`,`create_time`,`update_time`,`status`) values (1,'宋书航','1','2020-11-23 09:30:16','2020-11-25 22:16:25',1),(2,'雨柔子','1','2020-11-23 11:25:41','2020-11-25 22:16:25',1),(4,'王五','1','2020-11-23 11:25:58','2020-11-25 22:16:26',0),(5,'赵柳','1','2020-11-23 11:26:12','2020-11-25 22:16:26',0),(8,'田七','1','2020-11-23 11:26:29','2020-11-25 22:16:27',0),(9,'田七','1','2020-11-23 15:03:23','2020-11-25 22:16:28',0),(10,'田七','1','2020-11-23 15:03:43','2020-11-25 22:16:28',0),(11,'戴沐白','1','2020-11-24 10:45:06','2020-11-25 22:16:29',1),(12,'张小凡','1','2020-11-24 10:45:29','2020-11-25 22:16:29',1),(13,'userName2','1','2020-11-24 10:45:29','2020-11-25 22:16:30',0),(15,'碧瑶','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(16,'赵恋凡','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(17,'李长寿','1','2020-11-24 10:45:29','2020-11-25 22:16:32',1),(18,'蓝梦娥','1','2020-11-24 10:45:29','2020-11-25 22:16:33',1),(22,'路明非','123456','2020-11-24 10:45:29','2020-11-25 17:44:31',1),(23,'楚子航','123456','2020-11-24 10:45:29','2020-11-25 22:14:26',1),(33,'乔微尼','123456','2020-11-24 10:45:29','2020-11-25 23:05:48',1),(97,'userName86','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(98,'userName87','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(99,'userName88','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(100,'userName89','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(101,'2020年好运来~','123456','2020-11-24 10:45:32','2020-11-26 10:01:06',1),(102,'333','123456','2020-11-24 10:45:32','2020-11-26 10:28:14',1),(103,'666','888','2020-11-24 10:45:32','2020-11-26 10:28:26',1),(104,'userName93','123456','2020-11-24 10:45:32','2020-11-26 10:00:54',0),(105,'userName94','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(106,'userName95','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(107,'userName96','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(108,'userName97','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(109,'userName98','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(110,'userName99','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(111,'userName100','123456','2020-11-24 10:45:33','2020-11-25 10:12:58',0),(115,'萧潜','1','2020-11-25 23:00:16','2020-11-25 23:00:16',1),(116,'演示视频','haha','2020-11-26 10:00:33','2020-11-26 10:27:37',0),(117,'啦啦啦','1','2020-11-26 10:27:14','2020-11-26 10:27:37',0),(118,'演示视频','222','2020-11-26 10:27:23','2020-11-26 10:27:37',0),(119,'实训小组hyy','111','2020-11-26 14:37:14','2020-11-26 14:37:14',1),(120,'我是新增数据!修改!','1111','2023-04-18 20:38:41','2023-04-18 20:48:13',1);

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

2.1【Logstash实现MySQL数据同步至ES】

先启动es,在启动logstash。

大数据周会-本周学习内容总结06【Linux启动ELK步骤】

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

input {
	stdin {
    }

    jdbc { # 01
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from item_user"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "item_user"
	}

    jdbc { # 02
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from logs_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "logs_info"
	}

    jdbc { # 03
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_item"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_item"
	}

    jdbc { # 04
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_info"
	}

    jdbc { # 05
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from user_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "user_info"
	}
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
	if[type] == "item_user" { # 01
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_item_user"
			# document_id => "%{id}"
		}
	}

	if[type] == "logs_info" { # 02
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_logs_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_item" { # 03
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_item"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_info" { # 04
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "user_info" { # 05
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_user_info"
			# document_id => "%{id}"
		}
	}

    stdout {
        codec => json_lines
    }
}

2.2【Logstash实现ES数据同步至MySQL】

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

  1. logstash-plugin install --no-verify logstash-output-jdbc   # Logstash安装插件logstash-output-jdbc
  2. logstash-plugin list   # 查看Logstash已安装的插件

【es与mysql双向同步-通过logstash将es同步至mysql】功能已实现,但是只进行了简单测试。问题包括但不限于:中文乱码、时间戳字段插入错误等。

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

input {
	elasticsearch {
		hosts => ["hadoop100:9200"]
		index => "test_user_info"
		query => '{ "query": { "match_all": {} } }'
		schedule => "* * * * *"
	}
}

output {
	jdbc {
		driver_jar_path => "/opt/jar/mysql-connector-java-5.1.31.jar"
		driver_class => "com.mysql.jdbc.Driver"
		# user => "root"
		# password => "root"
		# "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/douyin?autoReconnect=true&user=xxxx@xxxx&password=xxxxx"
		connection_string => "jdbc:mysql://1.2.3.4:3306/school_matriculate?autoReconnect=true&user=root&password=root&useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"
		# statement => ["insert into tb_videos(md5, Id, view, timestamp) values(?,?,?,?)","[md5]", "[Id]", "[view]", "[timestamp]"]
		# statement => ["INSERT INTO user_info (user_name, user_pwd, create_time, update_time, status) VALUES (?, ?, ?, ?, ?)", "[user_name]", "[user_pwd]", "[create_time]", "[update_time]", "[status]"]
		statement => ["INSERT INTO user_info (user_name, user_pwd) VALUES (?, ?)", "[user_name]", "[user_pwd]"]
	}
}

2.2.1【Bug记录】

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

Unknown setting 'jdbc_user' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'prepared_statement_bind_values' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_password' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_library' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_connection_string' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_class' for jdbc

[2023-04-19T20:27:23,669][ERROR][logstash.agent           ] Failed to execute action

java.sql.SQLException: Access denied for user ''@'upward' (using password: NO)

        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1094) ~[mysql-connector-java-5.1.31.jar:?]

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

[2023-04-19T21:48:53,751][ERROR][com.zaxxer.hikari.pool.HikariPool][main] HikariPool-1 - Exception during pool initialization.

com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.

logstash 导入mysql数据,ElasticSearch,logstash,mysql,es,elasticSearch,linux

2.2.2【参考文章】

  1. Java:Logstash如何安装插件logstash-output-jdbc_netyeaxi的博客-CSDN博客
  2. logstash的logstash-output-jdbc插件安装_logstash output jdbc_&捕风的汉子&的博客-CSDN博客
  3. logstash-output-jdbc使用
  4. https://github.com/theangryangel/logstash-output-jdbc

  5. https://www.elastic.co/guide/en/logstash/current/index.html文章来源地址https://www.toymoban.com/news/detail-610680.html

到了这里,关于通过logstash实现mysql与es的双向数据同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 通过logstash(6.8.6)将es(6.8.6)数据导入clickhouse

    编写logstash数据转换的配置文件 export-csv.yml 需要根据es中数据和导出的原始数据格式对应的clickhouse中字段类型对数据的要求在filter中对数据的处理 启动logstash, -f 指定配置文件不使用logstash默认配置文件路径 ./logstash-6.8.6/bin/logstash -f test.yml clickhouse中创建表, 字段类型需要注

    2024年02月13日
    浏览(43)
  • logstash同步mysql数据到es(三、es模板问题)

     相关问题汇总: logstash同步mysql数据到es(一、es模板问题,请求返回400) logstash同步mysql数据到es(二、jdbc_driver_library问题)_(please check user and group permissions for the p-CSDN博客 logstash同步mysql数据到es(三、es模板问题)-CSDN博客 使用docker实现logstash同步mysql到es-CSDN博客 [INFO ] 2023-12-11 09

    2024年01月17日
    浏览(79)
  • Docker部署Logstash同步Mysql数据到ES

    页面访问 ip:9200端口,出现下面页面部署成功 成功日志

    2024年04月13日
    浏览(41)
  • 通过kafka connector实现mysql数据自动同步es

    整体思路: 1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列 2、通过listener监听消息队列,代码控制数据插入es ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后

    2024年02月08日
    浏览(45)
  • 使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程

    使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程包含多个步骤。请注意,首先你需要准备好的JDBC驱动,Logstash实例,Elasticsearch实例,以及你希望导入的MySQL数据。 安装Logstash JDBC Input Plugin :Logstash包含大量插件,其中一个就是JDBC Input Plugin,可以用于从JDBC兼容的数据库

    2024年02月15日
    浏览(44)
  • 补充:es与mysql之间的数据同步 2 使用分页导入的方式把大量数据从mysql导入es

    本片文章只是对之前写的文章的补充, es与mysql之间的数据同步 http://t.csdn.cn/npHt4 补充一: 之前的文章对于交换机、队列、绑定,使用的是@bean, 而这里使用的是纯注解版 在消费方,声明交换机: 补充二: 之前的文章是直接使用es操作数据,新增和修改,这样做不是很合适

    2024年02月12日
    浏览(51)
  • ELK(Elasticsearch、Kibana、Logstash)以及向ES导入mysql数据库数据或CSV文件数据,创建索引和可视化数据

    地址:Past Releases of Elastic Stack Software | Elastic 在Products和version处分别选择需要下载的产品和版本,E(elasticsearch)L(logstash)K(kibana)三者版本必须相同 将下载好的elk分别解压到相同路径下 本文中elasticsearch=E=ES=es;L=logstash;K=kibana 一般情况下使用默认配置即可,下面对我的

    2024年02月15日
    浏览(59)
  • 使用logstash把mysql同步到es,Kibana可视化查看

    Logstash下载地址:https://www.elastic.co/cn/downloads/logstash

    2024年02月02日
    浏览(42)
  • logstash同步数据从kafka到es集群

    背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想

    2024年02月15日
    浏览(42)
  • Logstash同步MySQL数据到ElasticSearch

    当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。 测试环境 Windows系统 MySQL 5.7 Logstash 7.0.1 ElasticSearch 7.0.1 Kibana 7.0.1 ELK工具下载可访问:https://www.elastic

    2024年02月01日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包