mysql同步ES操作流程

作者: gavin 分类: ES 发布时间: 2024-07-10 10:17

一、mysql开启log_bin

//查看数据库是否已经开启,具体开启方法见另一篇文章
show variables like 'log_bin';

二、ES和kibana的安装

官网下载就好,没什么说明,需要注意的是这两个系统需要同一版本,kibana的config里的kibana.yml是配置文件,需要注意下ES的链接配置,ES如果通过docker安装也可以见另一篇文章。

# =================== System: Elasticsearch ===================
# The URLs of the Elasticsearch instances to use for all your queries.
elasticsearch.hosts: ["http://localhost:9200"]

# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "kibana_system"
#elasticsearch.password: "pass"

三、配置ES(数据类型)

1、Text(文本类型)

  • Text:用于索引全文内容,会被分析器处理成单词。
  • Keyword:用于索引结构化内容,如标签、状态等,不会被分析器处理,整体作为一个索引项。

2、Numeric(数值类型)

  • Integer:整数类型,支持32位和64位。
  • Long:长整数类型,64位有符号整数。
  • Short:短整数类型,16位有符号整数。
  • Byte:字节类型,8位有符号整数。
  • Float:单精度浮点数类型。
  • Double:双精度浮点数类型。
  • Half Float:半精度浮点数类型。
  • Scaled Float:根据指定的缩放因子存储浮点数。

3、Date(日期类型)

  • Date:支持日期和时间类型的存储,可以配置格式化和解析规则。

4、Boolean(布尔类型)

  • Boolean:存储true或false。

5、Binary(二进制类型)

  • Binary:存储二进制数据,如图片、文件等。

6、Range(范围类型)

  • Integer Range:整数范围。
  • Float Range:浮点数范围。
  • Long Range:长整数范围。
  • Double Range:双精度浮点数范围。
  • Date Range:日期范围。

7、Geo(地理位置类型)

  • Geo-point:存储地理点的经纬度。
  • Geo-shape:存储复杂的地理形状,如多边形、线条等。

8、Specialized(专用类型)

  • IP:存储IPv4或IPv6地址。
  • Completion:用于自动完成功能的特殊字段类型。

四、ES配置(创建索引)更详细内容见其他文档

PUT /a
{
	"settings":{
		"number_of_shards": 1,
		"number_of_replicas": 0
	},
	"mappings":{
		"properties":{
		  "id":{"type":"integer"},
			"username":{"type":"text"},
			"phone":{"type":"text"}
		}
	}
	}
}

五、配置canal.deployer监听log-bin日志

修改配置文件canal.deployer\conf\example\instance.properties

# position info #配置mysql数据库链接地址
canal.instance.master.address=127.0.0.1:3832

# username/password #配置用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# table regex #配置数据库名.表名
#canal.instance.filter.regex=.*\\..* #所有表
canal.instance.filter.regex=a.u

有时候数据库结构变动导致报错,需要删除该目录下的meta.dat和h2.mv.db重新运行deployer

canal.deployer-1.1.7\bin里是启动文件,根据系统环境运行对应的方法,运行后查看日志logs\example\example.log确认运行成功。

六、配置canal.adapter把deployer监听结果同步到ES建立好的索引中

配置文档canal.adapter\conf\application.yml

这里注意mysql数据源的配置和ES的配置

server:
  port: 8088
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3832/a?useUnicode=true&useSSL=false
      username: canal
      password: canal

  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger

      - name: es8
        hosts: http://127.0.0.1:9200 
        properties:
          mode: rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

配置sql,canal.adapter\conf\es8对应的版本目录中,新建一个yml文件,需要注意的是sql中必须使用别名,否则就会莫名报错

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: a
  _type: _doc
  _id: id
  upsert: true
  sql: "select uu.id,uu.username,uu.phone from u uu"
#  etlCondition: "where user_id>0"
  commitBatch: 3000

六、其他说明

全量更新请求adapter接口,这里我把端口改为8088了,注意真实项目的端口是多少进行修改

curl http://127.0.0.1:8088/etl/es8/customer.yml-X POST

–查看bin_log日志是否开启,ON表示开启
show variables like ‘log_bin’;

–查看bin_log日志格式是否为ROW
show variables like ‘binlog_format’;

— 创建用户,赋予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’localhost’ IDENTIFIED BY ‘canal’;

— 刷新权限
flush PRIVILEGES;

七、相关端口

ES:9200、9300

canal.deployer:11111

canal.adapter:8081

kibana:5601