通过Logstash将MySQL数据同步到ES

原理

通过 Logstash 将 MySQL 数据同步到 Elasticsearch,其核心原理可以概括为:"主动轮询"。它不像 Canal 那样监听 MySQL 的 binlog 变更日志,而是由 Logstash 按照设定好的时间间隔,主动向 MySQL 发起 SQL 查询,然后将查询到的结果写入 Elasticsearch。

实现方案

实现起来很简单,下面是一个典型的ELKElasticsearchLogstashKibana)部署方案:

version: '3.8'

services:
  es:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.16
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - es-net

  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.16
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://es:9200
    depends_on:
      - es
    networks:
      - es-net

  logstash:
    image: docker.elastic.co/logstash/logstash:7.17.16
    container_name: logstash
    depends_on:
      - es
    ports:
      - "5044:5044"
      - "9600:9600"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - ./mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar:/usr/share/logstash/mysql-connector-j-8.3.0.jar
    networks:
      - es-net

volumes:
  es_data:

networks:
  es-net:
    driver: bridge

通过这个配置文件,你就可以运行起来这三个服务了。同时你还需要把logstash.conf 和mysql-connector-j-8.3.0.jar 挂载进去。

mysql-connector-j-8.3.0 因为我这是8.2版本的MySQL,所以使用这个版本的驱动。logstash.conf 是主力配置文件。

Logstash配置

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://47.116.153.12:31218/go?useSSL=false&serverTimezone=Asia/Shanghai"
    # MySQL 连接地址(Go 数据库)

    jdbc_user => "canal"
    # MySQL 用户名

    jdbc_password => "canal"
    # MySQL 密码

    jdbc_driver_library => "/usr/share/logstash/mysql-connector-j-8.3.0.jar"
    # MySQL JDBC 驱动 jar 包路径(Logstash 需要)

    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # MySQL 8.x 驱动类

    statement => "
      SELECT id, name, description, max_users, background,
             is_private, password, created_by,
             created_at, updated_at, invitation_code
      FROM chat_rooms
      WHERE updated_at > :sql_last_value
        AND updated_at IS NOT NULL
      ORDER BY updated_at ASC
    "
    # 查询 SQL:
    # 作用:增量同步 chat_rooms 表
    # 条件:
    # - 只取 updated_at > 上次同步时间的数据(增量)
    # - updated_at 不为空
    # - 按更新时间升序排序

    use_column_value => true
    # 使用字段值做增量标记(而不是时间戳文件)

    tracking_column => "updated_at"
    # 用 updated_at 作为增量同步字段

    tracking_column_type => "timestamp"
    # tracking_column 类型是 timestamp(时间类型)

    schedule => "* * * * *"
    # 定时任务:每分钟执行一次(cron 表达式)

    clean_run => false
    # false = 不清空历史 offset(继续上次同步位置)

    last_run_metadata_path => "/usr/share/logstash/data/last_run"
    # 保存上次同步时间的位置文件(用于断点续传)
  }
}

output {
  elasticsearch {
    hosts => ["http://es:9200"]
    # ES 地址(Docker 内部服务名)

    index => "chat_rooms"
    # 写入 ES 的索引名称

    document_id => "%{id}"
    # 用 MySQL 的 id 作为 ES document_id(保证幂等,防重复)

  }

  stdout { codec => json }
  # 同时输出到控制台(调试用,JSON 格式)
}

注意事项

这种同步方式建议最好有一个自动更新的 update 字段并且带上索引,根据这个字段建立查询语句只同步最近更新的数据。

但是这种方式对真删不太友好,可以说是不支持,因为监听不到 delete 事件,所以使用这种方法做一下软删可以。

优点

  • 简单,上手快
  • 只需要 MySQL + ES + Logstash
  • 不依赖 binlog(不用改 MySQL 配置)
  • 稳定,适合小中型项目
  • SQL 可控(想查什么就查什么)
返回数据库分类