elasticsearch 实现与mysql 数据同步
一.安装elasticsearch
#因为elasticsearch 是基于java平台,所以需要先安装java
[root@localhost ~]# java -version #查看是否安装java
[root@localhost ~]# yum search java #查找java 版本
[root@localhost ~]# yum install java-17-openjdk-headless.x86_64 #安装java-17-openjdk-headless 版本
[root@localhost ~]# java -version
openjdk version "17.0.6" 2023-01-17 LTS
OpenJDK Runtime Environment (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS, mixed mode, sharing)
#安装elasticsearch
[root@localhost home]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.2-linux-x86_64.tar.gz #下载elasticsearch,也可以直接官网下载https://www.elastic.co/cn/elasticsearch 安装包
[root@localhost home]# tar -zxvf elasticsearch-8.12.2-linux-x86_64.tar.gz #解压
#启动elasticsearch
[root@localhost home]# cd /home/elasticsearch-8.12.2/
[root@localhost elasticsearch-8.12.2]# ./bin/elasticsearch
CompileCommand: exclude org/apache/lucene/util/MSBRadixSorter.computeCommonPrefixLengthAndBuildHistogram bool exclude = true
CompileCommand: exclude org/apache/lucene/util/RadixSelector.computeCommonPrefixLengthAndBuildHistogram bool exclude = true
二月 23, 2024 11:25:43 上午 sun.util.locale.provider.LocaleProviderAdapter <clinit>
WARNING: COMPAT locale provider will be removed in a future release
[2024-02-23T11:25:44,075][ERROR][o.e.b.Elasticsearch ] [localhost] fatal exception while booting Elasticsearchjava.lang.RuntimeException: can not run elasticsearch as root
at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initializeNatives(Elasticsearch.java:282)
at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initPhase2(Elasticsearch.java:167)
at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:72)
See logs for more details.
ERROR: Elasticsearch did not exit normally - check the logs at /home/elasticsearch-8.12.2/logs/elasticsearch.log
ERROR: Elasticsearch exited unexpectedly, with exit code 1
#查看elasticsearch的日志 /home/elasticsearch-8.12.2/logs/elasticsearch.log,我这边提示是不能以root用户启动elasticsearch
[root@localhost home]# useradd es #添加用户
[root@localhost home]# passwd es #密码
更改用户 es 的密码 。
新的密码:
无效的密码: 密码少于 8 个字符
重新输入新的密码:
passwd:所有的身份验证令牌已经成功更新。
[root@localhost home]# chown -R es:es /home/elasticsearch-8.12.2 #更改文件所在的用户和组
[root@localhost home]# chmod -R 777 /home/elasticsearch-8.12.2 #修改权限
[root@localhost home]# su es #切换es用户
[es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #启动elasticsearch (-d 是后台启动)
[2024-02-23T14:17:51,625][WARN ][o.e.h.n.Netty4HttpServerTransport] [localhost] received plaintext http traffic on an https channel, closing connection Netty4HttpChannel{localAddress=/192.168.243.134:9200, remoteAddress=/192.168.243.1:62306} #浏览器访问http://ip:9200 提示此错误,因为elasticsearch 默认开启了ssl认证,修改(config/elasticsearch.yml 中xpack.security.enabled:false 改为false)
[es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #重新启动
二.安装mysql
mysql8相关的安装可以看下另一篇博客
https://editor.csdn.net/md/?articleId=135905811
三.安装logstash
1.下载安装logstash
[root@localhost home]# wget https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz #与elasticsearch 同版本
[root@localhost home]# tar -zxvf logstash-8.12.2-linux-x86_64.tar.gz #解压
2.logstash 配置
logstash.yml
# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
# 使用分层形式设置管道批处理大小和批处理延迟
# pipeline:
# batch:
# size: 125 #管道批处理大小
# delay: 5 #管道批处理延迟
#
# Or as flat keys:
# 要表达与平面关键点相同的值
# pipeline.batch.size: 125
# pipeline.batch.delay: 5
#
# ------------ Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test #节点名称,在集群中具备唯一性,默认为logstash主机的主机名
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
# path.data: #Logstash及其插件使用目录
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main #管道id,默认为main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2 #并行执行管道的筛选器和输出阶段的工作者的数量,默认值为CPU的核数
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125 #单个工作进程从输入中收集的最大事件数
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50 #当创建管道事件批处理时,在向管道工作人员发送一个较小的批处理之前,等待每个事件的时间为多少毫秒
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false #当设置为true时,即使内存中仍然存在游离事件,也会在关闭期间强制Logstash退出,默认情况下,Logstash将拒绝退出,直到所有接收到的事件都被推送到输出,启用此选项可能导致关闭期间的数据丢失
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto #1.pipeline.ordered: auto并且pipeline.workers: 1,会自动启用事件排序;2.设置为true,强制排序;3.false 禁止排序
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config: #管道的Logstash配置路径
#
# Pipeline configuration string for the main pipeline
#
# config.string: #包含要用于主管道的管道配置的字符串
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false #设置为true,启动时测试配置是否有效并退出,默认false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false #定期检查配置是否已更新并重新加载,默认false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60)
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s #间隔多少秒检查管道中的配置是否更改
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false #设置为true时,将完整编译的配置显示为debug日志消息,你还必须设置log.level: debug
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false #是否开启字符串转义
#
# ------------ API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
#
# api.enabled: true #是否开启http访问
#
# By default, the HTTP API is not secured and is therefore bound to only the
# host's loopback interface, ensuring that it is not accessible to the rest of
# the network.
# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces
# unless configured otherwise.
#
# api.http.host: 127.0.0.1 #http访问地址
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# api.http.port: 9600-9700 #http访问端口
#
# The HTTP API includes a customizable "environment" value in its response,
# which can be configured here.
#
# api.environment: "production" #http响应环境值
#
# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide
# the path to a password-protected keystore in p12 or jks format, along with credentials.
#
# api.ssl.enabled: false #是否开启ssl
# api.ssl.keystore.path: /path/to/keystore.jks #ssl key证书路径
# api.ssl.keystore.password: "y0uRp4$$w0rD" #ssl key密码
#
# The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are
# disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms`
# in the $JDK_HOME/conf/security/java.security configuration file.
#
# api.ssl.supported_protocols: [TLSv1.2,TLSv1.3]
#
# The HTTP API can be configured to require authentication. Acceptable values are
# - `none`: no auth is required (default)
# - `basic`: clients must authenticate with HTTP Basic auth, as configured
# with `api.auth.basic.*` options below
# api.auth.type: none
#
# When configured with `api.auth.type` `basic`, you must provide the credentials
# that requests will be validated against. Usage of Environment or Keystore
# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which
# resolves to the value stored in the keystore's `HTTP_PASS` variable if present
# or the same variable from the environment)
#
# api.auth.basic.username: "logstash-user"
# api.auth.basic.password: "s3cUreP4$$w0rD"
#
# When setting `api.auth.basic.password`, the password should meet
# the default password policy requirements.
# The default password policy requires non-empty minimum 8 char string that
# includes a digit, upper case letter and lower case letter.
# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't
# meet the password policy requirements.
# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).
#
# api.auth.basic.password_policy.mode: WARN
#
# ------------ Module Settings ---------------
# Define modules here. Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#模块定义,必须为数组
# 模块变量名格式必须为var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
# - name: MODULE_NAME
# var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
# var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
# 事件缓冲的内部排队模型,值可以指定为内存memory或磁盘persisted,默认为内存memory
# queue.type: memory
#
# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.
# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.
# Default is path.data/queue.
#使用持久化队列(queue.type: persisted)时,存储管道数据文件的目录路径。默认值为path.data/queue
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
# 使用持久化队列(queue.type: persisted)时,页面数据文件的大小。默认值为64mb
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#使用持久化队列(queue.type: persisted)时,队列中未读事件的最大数目。默认为0
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#使用持久化队列(queue.type: persisted)时,队列的总容量(以字节为单位)。默认为1024mb
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#使用持久化队列(queue.type: persisted)时,强制检查点之前已确认事件的最大数量。默认值为1024,0表示无限制
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#使用持久化队列(queue.type: persisted)时,强制检查点之前写入的最大事件数。默认值为1024,0表示无限制
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#使用持久化队列(queue.type: persisted)时,强制执行检查点时的间隔(以毫秒为单位)。默认值为1000,0表示没有定期检查点
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#是否启用死信队列。默认false
# dead_letter_queue.enable: false
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
#每个死信队列的最大大小,超过该值,则会被删除,默认1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#启用死信队列,写入延迟的时间间隔,默认5000ms
# dead_letter_queue.flush_interval: 5000
# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#启用死信队列时,应控制删除哪些条目以避免超过大小限制。将值设置为“drop_newer”(默认值)以停止接受会使死信队列大小超过限制的新事件,将值设置为“drop_older”可删除包含最旧事件的队列页面,为新事件腾出空间。
# dead_letter_queue.storage_policy: drop_newer
# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#死信队列保存数据的有效时间,超时则从死信队列删除。
# dead_letter_queue.retain.age: 1d
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#死信队列的存储路径
# path.dead_letter_queue:
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
# * fatal
# * error
# * warn
# * info (default)
# * debug
# * trace
#日志等级
# log.level: info
#日志路径
# path.logs:
#
# ------------ Other Settings --------------
#
# Allow or block running Logstash as superuser (default: true)
# 是否运行超级用户运行Logstash
# allow_superuser: false
#
# Where to find custom plugins
#自定义插件的路径
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# 是否启用在不同日志文件中每个管道的日志
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file
#xpack.monitoring.elasticsearch.ssl.key: /path/to/key
#xpack.monitoring.elasticsearch.ssl.verification_mode: full
#xpack.monitoring.elasticsearch.ssl.cipher_suites: []
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.certificate: /path/to/file
#xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file
#xpack.management.elasticsearch.ssl.cipher_suites: []
#xpack.management.elasticsearch.ssl.verification_mode: full
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s
# X-Pack GeoIP Database Management
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.downloader.enabled: true
#xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"
3.pipelines.yml 配置
# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
# - pipeline.id: test
# pipeline.workers: 1
# pipeline.batch.size: 1
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
# queue.type: persisted
# path.config: "/tmp/logstash/*.config"
#
# Available options:
#
# # name of the pipeline
# pipeline.id: mylogs
#
# # The configuration string to be used by this pipeline
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#
# # The path from where to read the configuration text
# path.config: "/etc/conf.d/logstash/myconfig.cfg"
#
# # How many worker threads execute the Filters+Outputs stage of the pipeline
# pipeline.workers: 1 (actually defaults to number of CPUs)
#
# # How many events to retrieve from inputs before sending to filters+workers
# pipeline.batch.size: 125
#
# # How long to wait in milliseconds while polling for the next event
# # before dispatching an undersized batch to filters+outputs
# pipeline.batch.delay: 50
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on a pipeline and prevents logstash from starting
# a pipeline with multiple workers allocated.
# "false" disable any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# # Internal queuing model, "memory" for legacy in-memory based queuing and
# # "persisted" for disk-based acked queueing. Defaults is memory
# queue.type: memory
#
# # If using queue.type: persisted, the page data files size. The queue data consists of
# # append-only data files separated into pages. Default is 64mb
# queue.page_capacity: 64mb
#
# # If using queue.type: persisted, the maximum number of unread events in the queue.
# # Default is 0 (unlimited)
# queue.max_events: 0
#
# # If using queue.type: persisted, the total capacity of the queue in number of bytes.
# # Default is 1024mb or 1gb
# queue.max_bytes: 1024mb
#
# # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
# queue.checkpoint.acks: 1024
#
# # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
#
# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# # Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
#
# # Enable Dead Letter Queueing for this pipeline.
# dead_letter_queue.enable: false
#
# If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
#
# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000
# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#
# dead_letter_queue.storage_policy: drop_newer
# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#
# dead_letter_queue.retain.age: 1d
#
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
四.mysql数据同步到es
同步方式:
1.logstash
2.go-mysql-elasticsearch
3.canal(阿里云)
一.logstash
1.安装mysql-connector-java 插件(需与mysql 版本一致)
[root@localhost home]# wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm
--2024-02-23 16:01:15-- https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm
正在解析主机 downloads.mysql.com (downloads.mysql.com)... 23.66.135.36, 2600:1406:3c00:18b::2e31, 2600:1406:3c00:189::2e31
正在连接 downloads.mysql.com (downloads.mysql.com)|23.66.135.36|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 302 Moved Temporarily
位置:https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm [跟随至新的 URL]
--2024-02-23 16:01:16-- https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm
正在解析主机 cdn.mysql.com (cdn.mysql.com)... 23.42.93.135, 2600:1406:3a00:293::1d68, 2600:1406:3a00:282::1d68
正在连接 cdn.mysql.com (cdn.mysql.com)|23.42.93.135|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度:2425346 (2.3M) [application/x-redhat-package-manager]
正在保存至: “mysql-connector-j-8.0.33-1.el9.noarch.rpm”
mysql-connector-j-8.0.33-1.el9.noarch.rpm 100%[==========================================================================================================================================>] 2.31M 736KB/s 用时 3.2s
2024-02-23 16:01:21 (736 KB/s) - 已保存 “mysql-connector-j-8.0.33-1.el9.noarch.rpm” [2425346/2425346])
[root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm #安装
警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY
错误:依赖检测失败:
java-headless >= 1:1.8.0 被 mysql-connector-j-1:8.0.33-1.el9.noarch 需要
[root@localhost home]# yum -y install java-headless #直接重装java-openjdk
[root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm
警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:mysql-connector-j-1:8.0.33-1.el9 ################################# [100%]
[root@localhost home]# 安装完成,文件默认放在/usr/share/java/mysql-connector-java.jar
2.配置logstash.conf
#单表
input {
jdbc {
# mysql 数据库连接
jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 驱动
jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"
# 是否分页
jdbc_paging_enabled => true
jdbc_page_size => "1000"
# 是否清除last_run_metadata_path 的记录,如果为真那么每次相当于从头开始查询所有的数据库记录
clear_run =>false
#是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
#此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
#如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
tracking_column => "unix_ts_in_secs"
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => "true"
# 字段类型
tracking_column_type => "numeric"
# 设置监听间隔
schedule => "*/5 * * * * *"
# 执行sql
statement => "SELECT * FROM wine_address"
# 索引类型
type => "es_table"
}
}
filter {
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]
}
}
output {
if [type]=="es_table" {
elasticsearch {
hosts => ["192.168.243.134:9200"] (es ip与端口)
index => "es_table_idx" (索引名称)
document_id => "%{[@metadata][_id]}"
}
}
}
#多表
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"
jdbc_paging_enabled => true
jdbc_page_size => "1000"
clean_run =>false
use_column_value => true
tracking_column => "unix_ts_in_secs"
record_last_run => "true"
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT * FROM wine_address"
type => "es_table"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"
jdbc_paging_enabled => true
jdbc_page_size => "1000"
clean_run =>false
use_column_value => true
tracking_column => "unix_ts_in_secs"
record_last_run => "true"
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT * FROM wine_area"
type => "es_table1"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]
}
}
output {
if [type]=="es_table" {
elasticsearch {
hosts => ["192.168.243.134:9200"]
index => "es_table_idx"
document_id => "%{address_id}"
}
}
if [type]=="es_table1" {
elasticsearch {
hosts => ["192.168.243.134:9200"]
index => "es_table_idx1"
document_id => "%{area_id}"
}
}
}
3.启动logstash
[root@localhost logstash-8.12.2]# ./bin/logstash -f /home/logstash-8.12.2/config/logstash.conf &
4.插入数据
INSERT INTO `wine`.`wine_address` ( `address_id`, `member_id`, `area_id`, `city_id`, `province_id`, `area_info`, `address`, `mob_phone`, `reciver_name`, `is_default`, `dis_mode`, `add_time` )
VALUES( 5, 5, 5, 5, 5, '测试', '测试', 14512456789, '10', 0, '1', 0 );
5.查看数据
二.go-mysql-elasticsearch
1.安装go环境
[root@localhost /]# wget https://golang.google.cn/dl/go1.15.4.linux-amd64.tar.gz #下载
[root@localhost /]# tar -zxvf go1.15.4.linux-amd64.tar.gz #解压
[root@localhost /]# mv go /usr/local/
[root@localhost river]# vim /etc/profile #添加环境变量
export GOROOT=/usr/local/go
export GOPATH=/root/go
export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
[root@localhost /]# suorce /etc/profile #更新
2.下载go-mysql-elasticsearch
[root@localhost /] go get github.com/siddontang/go-mysql-elasticsearch #下载
[root@localhost river]# cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
[root@localhost go-mysql-elasticsearch]# make #编译,编译成功后 go-mysql-elasticsearch/bin 目录下会生成名为 go-mysql-elasticsearch 的可执行文件
3.配置($GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml)
my_addr = "192.168.243.134:3306" #数据库ip地址
my_user = "root" #数据库用户名
my_pass = "123456" #数据库密码
es_addr = "192.168.243.134:9200" #es 地址
es_user = "" #es账号
es_pass = "" #es密码
data_dir = "/root/go/src/github.com/siddontang/go-mysql-elasticsearch/data" #数据存储目录
stat_addr = "192.168.243.134:12800" #内部地址加端口
stat_path = "/metrics"
server_id = 1001
flavor = "mysql"
mysqldump = "mysqldump "
bulk_size = 128
flush_bulk_time = "200ms"
skip_no_pk_table = false
[[source]]
schema = "wine" #数据库名称
tables = ["wine_role"] #数据表名称
[[rule]]
schema = "wine" #数据库名称
table = "wine_role" #数据表名称
index = "" #生成es数据索引名称,对应schema
type = "" #生成es数据类型,对应table
filter = ["id", "name"] #只同步的数据字段
4.启动
# 官网提示mysql版本小于8,es 版本小于6
[root@localhost go-mysql-elasticsearch]# ./bin/go-mysql-elasticsearch -config=./etc/river.toml
5.查看elasticsearch 数据(可以通过google elasticsearch-head 插件)
三.canal(https://github.com/alibaba/canal/releases)
1.下载安装canal
#下载1.17 版本,支持MySQL 8.0。因为本人mysql是8.0.36
[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz #是canal的客户端适配器,可将其看作canal client
[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.admin-1.1.7.tar.gz #canal 操作界面
[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz #canal server 端
#创建文件夹
[root@localhost home]# madir canal
[root@localhost home]# cd canal
[root@localhost canal]# mkdir canal-adapter
[root@localhost canal]# mkdir canal-admin
[root@localhost canal]# mkdir canal-server
#解压
[root@localhost home]# tar -zxvf canal.adapter-1.1.7.tar.gz -C ./canal/canal-adapter/
[root@localhost home]# tar -zxvf canal.admin-1.1.7.tar.gz -C ./canal/canal-admin/
[root@localhost home]# tar -zxvf canal.deployer-1.1.7.tar.gz -C ./canal/canal-server/
#修改权限
[root@localhost home]# chmod -R 777 canal
2.配置
#canal-server/conf/canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip = 127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf
# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =
/home/canal/canal-server/conf/example/instance.properties
#canal-server/conf/example
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=200
# enable gtid use true/false
canal.instance.gtidon=false
# position info
#mysql地址加端口
canal.instance.master.address=127.0.0.1:3306
#开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000045
#开始同步的binlog文件位置
canal.instance.master.position=237
#开始同步时间点 时间戳形式
canal.instance.master.timestamp=1709112558000
#开始同步gtid
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root #数据库用户名
canal.instance.dbPassword=123456 #数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
#配置不同步mysql库
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
#/home/canal/canal-adapter/conf/application.yml
server:
port: 8081 #adapter 端口
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp canal server读取模式 kafka rocketMQ rabbitMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts:#集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
syncBatchSize: 1000 #批处理大小
retries: -1 #重试次数,-1时表示一致阻塞
timeout: #获取数据的时长
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.243.134:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources: #数据来源
defaultDS:
url: jdbc:mysql://192.168.243.134:3306/wine?useUnicode=true
username: root
password: 123456
canalAdapters: #数据去处
- instance: example # canal instance Name or mq topic name #对应对应canal destination或者 mq topic
groups: #适配器组
- groupId: g1 #组id
outerAdapters: #适配器列表
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# druid.stat.enable: false
# druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es8 #本人是es8
hosts: http://192.168.243.134:9200 # 127.0.0.1:9200 for rest mode,rest模式ip前要加http,transport 值端口为9300
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch #集群名称
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password:
#//home/canal/canal-adapter/conf/es8/mytest_user.yml
dataSourceKey: defaultDS #源数据源的key, 对应application.yml配置srcDataSources中的值
destination: example
groupId: g1 #组id
esMapping:
_index: mytest_user #索引
_id: _id #id,如不配置该项必须配置下面的pk项_id否则会由es自动分配
_type: _doc #类型
upsert: true #支持不存在新增操作
# pk: id
sql:
"select a.album_id as _id, a.album_name, a.class_name, a.mark,a.goods_sku,a.add_time,a.merchant_id
from wine_album a"
# objFields:
# _labels: array:;
etlCondition: "where a.add_time>={}"
commitBatch: 3000 #批量提交大小
3.启动
[root@localhost canal-server]# ./bin/startup.sh
4.查看日志文章来源:https://www.toymoban.com/news/detail-848836.html
#查看server日志 (/home/canal/canal-server/logs)
[root@localhost canal]# tail -5f canal.log
2024-02-28 14:47:50.369 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-02-28 14:47:50.375 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-02-28 14:47:50.383 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-02-28 14:47:50.414 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
2024-02-28 14:47:51.318 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
#查看example日志
[root@localhost example]# tail -5f example.log
2024-02-28 14:47:51.284 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-02-28 14:47:51.289 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2024-02-28 14:47:51.749 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000044,position=4,serverId=1,gtid=<null>,timestamp=1709092749000] cost : 403ms , the next step is binlog dump
#查看adapter日志(/home/canal/canal-adapter/logs/adapter)
[root@localhost logs]# tail -10f ./adapter/adapter.log
2024-02-28 18:56:58.798 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2024-02-28 18:56:59.017 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2024-02-28 18:56:59.028 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2024-02-28 18:56:59.242 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es8 succeed
2024-02-28 18:56:59.248 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-adapter/plugin
2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2024-02-28 18:56:59.267 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2024-02-28 18:56:59.273 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 2.381 seconds (JVM running for 2.951)
2024-02-28 18:56:59.356 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-02-28 18:58:32.556 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"}
2024-02-28 18:58:32.805 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"}
Affected indexes: mytest_user
5.查看es 索引数据
文章来源地址https://www.toymoban.com/news/detail-848836.html
到了这里,关于elasticsearch 实现与mysql 数据同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!