mysql同步ES操作流程
一、mysql开启log_bin
//查看数据库是否已经开启,具体开启方法见另一篇文章 show variables like 'log_bin';
二、ES和kibana的安装
官网下载就好,没什么说明,需要注意的是这两个系统需要同一版本,kibana的config里的kibana.yml是配置文件,需要注意下ES的链接配置,ES如果通过docker安装也可以见另一篇文章。
# =================== System: Elasticsearch =================== # The URLs of the Elasticsearch instances to use for all your queries. elasticsearch.hosts: ["http://localhost:9200"] # If your Elasticsearch is protected with basic authentication, these settings provide # the username and password that the Kibana server uses to perform maintenance on the Kibana # index at startup. Your Kibana users still need to authenticate with Elasticsearch, which # is proxied through the Kibana server. #elasticsearch.username: "kibana_system" #elasticsearch.password: "pass"
三、配置ES(数据类型)
1、Text(文本类型):
- Text:用于索引全文内容,会被分析器处理成单词。
- Keyword:用于索引结构化内容,如标签、状态等,不会被分析器处理,整体作为一个索引项。
2、Numeric(数值类型):
- Integer:整数类型,支持32位和64位。
- Long:长整数类型,64位有符号整数。
- Short:短整数类型,16位有符号整数。
- Byte:字节类型,8位有符号整数。
- Float:单精度浮点数类型。
- Double:双精度浮点数类型。
- Half Float:半精度浮点数类型。
- Scaled Float:根据指定的缩放因子存储浮点数。
3、Date(日期类型):
- Date:支持日期和时间类型的存储,可以配置格式化和解析规则。
4、Boolean(布尔类型):
- Boolean:存储true或false。
5、Binary(二进制类型):
- Binary:存储二进制数据,如图片、文件等。
6、
- Integer Range:整数范围。
- Float Range:浮点数范围。
- Long Range:长整数范围。
- Double Range:双精度浮点数范围。
- Date Range:日期范围。
7、Geo(地理位置类型):
- Geo-point:存储地理点的经纬度。
- Geo-shape:存储复杂的地理形状,如多边形、线条等。
8、Specialized(专用类型):
- IP:存储IPv4或IPv6地址。
- Completion:用于自动完成功能的特殊字段类型。
四、ES配置(创建索引)更详细内容见其他文档
PUT /a { "settings":{ "number_of_shards": 1, "number_of_replicas": 0 }, "mappings":{ "properties":{ "id":{"type":"integer"}, "username":{"type":"text"}, "phone":{"type":"text"} } } } }
五、配置canal.deployer监听log-bin日志
修改配置文件canal.deployer\conf\example\instance.properties
# position info #配置mysql数据库链接地址 canal.instance.master.address=127.0.0.1:3832 # username/password #配置用户名和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # table regex #配置数据库名.表名 #canal.instance.filter.regex=.*\\..* #所有表 canal.instance.filter.regex=a.u
有时候数据库结构变动导致报错,需要删除该目录下的meta.dat和h2.mv.db重新运行deployer
canal.deployer-1.1.7\bin里是启动文件,根据系统环境运行对应的方法,运行后查看日志logs\example\example.log确认运行成功。
六、配置canal.adapter把deployer监听结果同步到ES建立好的索引中
配置文档canal.adapter\conf\application.yml
这里注意mysql数据源的配置和ES的配置
server: port: 8088 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer kafka.bootstrap.servers: 127.0.0.1:9092 kafka.enable.auto.commit: false kafka.auto.commit.interval.ms: 1000 kafka.auto.offset.reset: latest kafka.request.timeout.ms: 40000 kafka.session.timeout.ms: 30000 kafka.isolation.level: read_committed kafka.max.poll.records: 1000 # rocketMQ consumer rocketmq.namespace: rocketmq.namesrv.addr: 127.0.0.1:9876 rocketmq.batch.size: 1000 rocketmq.enable.message.trace: false rocketmq.customized.trace.topic: rocketmq.access.channel: rocketmq.subscribe.filter: # rabbitMQ consumer rabbitmq.host: rabbitmq.virtual.host: rabbitmq.username: rabbitmq.password: rabbitmq.resource.ownerId: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3832/a?useUnicode=true&useSSL=false username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es8 hosts: http://127.0.0.1:9200 properties: mode: rest # # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch
配置sql,canal.adapter\conf\es8对应的版本目录中,新建一个yml文件,需要注意的是sql中必须使用别名,否则就会莫名报错
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: a _type: _doc _id: id upsert: true sql: "select uu.id,uu.username,uu.phone from u uu" # etlCondition: "where user_id>0" commitBatch: 3000
六、其他说明
全量更新请求adapter接口,这里我把端口改为8088了,注意真实项目的端口是多少进行修改
curl http://127.0.0.1:8088/etl/es8/customer.yml-X POST
–查看bin_log日志是否开启,ON表示开启
show variables like ‘log_bin’;
–查看bin_log日志格式是否为ROW
show variables like ‘binlog_format’;
— 创建用户,赋予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’localhost’ IDENTIFIED BY ‘canal’;
— 刷新权限
flush PRIVILEGES;
七、相关端口
ES:9200、9300
canal.deployer:11111
canal.adapter:8081
kibana:5601