- Today
- Total
개성있는 개발자 되기
Debezium MySql CDC 카프카 커넥트 본문
Debezium MySQL CDC Connector
디비지움 커넥터는 카프카 개발자들이 만든 커넥터이다. MySql의 모든 Row-Level 변경사항을 모니터링하고 기록할 수 있다. MySql 서버에 접속한뒤, 일정하게 Database의 스탭샷을 읽어 들인다.
스냅샷을 읽은 후에는 MySQL 커밋된 변화를 감지하고 insert, update, delete 이벤트를 생성한다.
각각의 테이블에 일어나는 모든 이벤트들은 카프카 토픽으로 분산되서 쌓이고, application이랑 services 에서 이 토픽을 간편하게 컨슘할 수 있게 된다.
공식홈페이지 : https://www.confluent.io/hub/debezium/debezium-connector-mysql
Debezium Connector 다운로드
1. 아래 링크에서 debezium Connector jar 파일들을 다운로드 받는다.
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/
2. 위의 jar 파일들이 있는 경로를 connect-distributed.properties의 plugin.path로 설정해준다.
plugin.path=share/java,../plugin/connect
3. RestAPI로 커넥터를 등록한다.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://카프카IP:8083/connectors/ \
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql ip",
"database.port": "3306",
"database.user": "db",
"database.password": "db",
"database.server.id": "42",
"database.server.name": "demo",
"database.history.kafka.bootstrap.servers": "카프카 IP",
"database.history.kafka.topic": "dbhistory.demo" , --
"include.schema.changes": "true" ,
"database.whitelist": "dev_itemmsadb" , -- dev_itemmsadb 데이터베이스만 Listen
"database.serverTimezone": "Asia/Seoul" -- MySql locale이 KST로 되어 있기때문에, 명시해줘야 한다.
}
}'
정상적으로 등록되었는 지 확인. 등록되었다면 배열 리스트로 보여진다.
curl -X GET http://카프카IP:8083/connectors
["mysql-connector"]
관련 REST API 명령어는 아래 링크에서 확인 가능하다.
https://docs.confluent.io/3.2.0/connect/restapi.html#connect-userguide-rest
카프카 커넥터 실행
커넥터를 재실행한다.
bin/connect-distributed.sh config/connect-distributed.properties
실행하니, 에러가 났었다.
java.lang.ClassNotFoundException: io.debezium.util.IoUtil
[2018-09-27 14:30:18,783] INFO WorkerSourceTask{id=mysql-cdc-0} Finished commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:496)
[2018-09-27 14:30:18,785] ERROR WorkerSourceTask{id=mysql-cdc-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF instead of ON. Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:167)
at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:957)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:921)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF instead of ON.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)
... 3 more
[2018-09-27 14:30:18,789] ERROR WorkerSourceTask{id=mysql-cdc-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
구글링을 해보니 Path 가 잘못되었으니 상위루트로 설정해보라는 거였음
Problem
plugin.path=/opt/kafka_2.11-2.0.0/connect/debezium-connector-mysql
Please try this instead:
plugin.path=/opt/kafka_2.11-2.0.0/connect
I.e. plug-in.path should point to the parent directory of individual connect plug-ins (directories or JARs). In your case it points to a specific plug-in's directory, so Connect tries to load plug-ins from each of the JARs contained within that directory instead of dealing the entire directory as a single plug-in, as it should.
plugin Path를 루트로 변경해주니 문제해결
참조 : https://groups.google.com/forum/#!topic/debezium/6k_PVrWsdYI
멀티 브로커 연결
카프카가 멀티 Broker로 클러스터가 구성되어 있고 커넥터가 이 클러스터를 읽도록 하고 싶으면
connector propoerties 파일의 bootstrap-server를 넣어주면 된다.
커넥터 실행 시, Error : NOT_ENOUGH_REPLICAS 에러 발생
2020-03-31 17:45:33,030] WARN [Producer clientId=producer-3] Got error produce response with correlation id 303 on topic-partition connect-configs-0, retrying (2147483346 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
위 에러가 쉴틈없이 발생했다....
- 브로커 설정을 살펴보자
min.insync.replicas : 프로듀서가 acks=all로 설정하여 메시지를 보낼 때, write를 성공하기 위한 최소 복제본의 수를 의미
우리는 min.insync.replicas = 2로 설정되어 있었음 - connect-distributed.properties 를 살펴보자
커넥터가 실행되면 connect-configs, connect-offsets, connect-status 토픽을 발생하는데, 커넥터 실행을 위한 설정 내용이라고 이해하면 되겠다.
이들도 Topic이기 때문에, replicas factor와 partition이 있다. 나의 경우 이 토픽들의 Replica Factor 가 1로 설정되어 있었다.
결국, 커넥터가 실행될때 connect-configs, connect-offsets, connect-status 토픽의 메시지를 보고 어디부터 읽을 지 정하는데 해당 토픽에 메시지를 보낼때 min.insync.replicas가 2라서 2번의 응답을 받아야 하는데, 해당 토픽들은 Replicas가 1이라 2개의 응답을 보낼 수 없어서 Error: NOT_ENOUGH_REPLICAS 에러가 발생한 것이었다.
토픽의 Replication Factor를 브로커의 min.insync.replicas보다 크게 만들어줘야 위의 에러를 해결할 수 있다.
카프카 커넥터 LOG
커넥터 Work Processor 가 잘 Listen 하고 있는지 확인하기 위해 로깅이 필요했다.
이 또한 카프카가 설치되어 있다면 properties 파일이 디폴트로 저장되어 있기 때문에, 여기서 path만 추가해주면 된다.
connect-log4j.properties
log4j.rootLogger=INFO, stdout, connectAppender,file -- file 추가
log4j.appender.connectAppender.File=../logs-2/connect.log -- 로그를 저장할 Paht 추가
레퍼런스
| 카프카 커넥터 Properties 관련 설정 가이드 문서
https://docs.confluent.io/current/connect/userguide.html
'Open Source > Kafka' 카테고리의 다른 글
카프카 이슈 - kafka-consumer-groups.sh --describe 에 정보가 없는 경우 (0) | 2020.05.20 |
---|---|
카프카 명령어 (0) | 2020.04.01 |
Debezium 카프카 커넥트 메시지 Transform (0) | 2020.03.31 |
Debezium Connector for MySQL (0) | 2020.03.25 |
카프카 커넥트 실행 (0) | 2020.03.25 |