原理
通过 Logstash 将 MySQL 数据同步到 Elasticsearch,其核心原理可以概括为:"主动轮询"。它不像 Canal 那样监听 MySQL 的 binlog 变更日志,而是由 Logstash 按照设定好的时间间隔,主动向 MySQL 发起 SQL 查询,然后将查询到的结果写入 Elasticsearch。
实现方案
实现起来很简单,下面是一个典型的ELK(Elasticsearch、Logstash、Kibana)部署方案:
version: '3.8'
services:
es:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.16
container_name: es
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms512m -Xmx512m
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- es-net
kibana:
image: docker.elastic.co/kibana/kibana:7.17.16
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://es:9200
depends_on:
- es
networks:
- es-net
logstash:
image: docker.elastic.co/logstash/logstash:7.17.16
container_name: logstash
depends_on:
- es
ports:
- "5044:5044"
- "9600:9600"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar:/usr/share/logstash/mysql-connector-j-8.3.0.jar
networks:
- es-net
volumes:
es_data:
networks:
es-net:
driver: bridge
通过这个配置文件,你就可以运行起来这三个服务了。同时你还需要把logstash.conf 和mysql-connector-j-8.3.0.jar 挂载进去。
mysql-connector-j-8.3.0 因为我这是8.2版本的MySQL,所以使用这个版本的驱动。logstash.conf 是主力配置文件。
Logstash配置
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://47.116.153.12:31218/go?useSSL=false&serverTimezone=Asia/Shanghai"
# MySQL 连接地址(Go 数据库)
jdbc_user => "canal"
# MySQL 用户名
jdbc_password => "canal"
# MySQL 密码
jdbc_driver_library => "/usr/share/logstash/mysql-connector-j-8.3.0.jar"
# MySQL JDBC 驱动 jar 包路径(Logstash 需要)
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# MySQL 8.x 驱动类
statement => "
SELECT id, name, description, max_users, background,
is_private, password, created_by,
created_at, updated_at, invitation_code
FROM chat_rooms
WHERE updated_at > :sql_last_value
AND updated_at IS NOT NULL
ORDER BY updated_at ASC
"
# 查询 SQL:
# 作用:增量同步 chat_rooms 表
# 条件:
# - 只取 updated_at > 上次同步时间的数据(增量)
# - updated_at 不为空
# - 按更新时间升序排序
use_column_value => true
# 使用字段值做增量标记(而不是时间戳文件)
tracking_column => "updated_at"
# 用 updated_at 作为增量同步字段
tracking_column_type => "timestamp"
# tracking_column 类型是 timestamp(时间类型)
schedule => "* * * * *"
# 定时任务:每分钟执行一次(cron 表达式)
clean_run => false
# false = 不清空历史 offset(继续上次同步位置)
last_run_metadata_path => "/usr/share/logstash/data/last_run"
# 保存上次同步时间的位置文件(用于断点续传)
}
}
output {
elasticsearch {
hosts => ["http://es:9200"]
# ES 地址(Docker 内部服务名)
index => "chat_rooms"
# 写入 ES 的索引名称
document_id => "%{id}"
# 用 MySQL 的 id 作为 ES document_id(保证幂等,防重复)
}
stdout { codec => json }
# 同时输出到控制台(调试用,JSON 格式)
}
注意事项
这种同步方式建议最好有一个自动更新的 update 字段并且带上索引,根据这个字段建立查询语句只同步最近更新的数据。
但是这种方式对真删不太友好,可以说是不支持,因为监听不到 delete 事件,所以使用这种方法做一下软删可以。
优点
- 简单,上手快
- 只需要 MySQL + ES + Logstash
- 不依赖 binlog(不用改 MySQL 配置)
- 稳定,适合小中型项目
- SQL 可控(想查什么就查什么)