개성있는 개발자 되기

Debezium MySql CDC 카프카 커넥트 본문

Open Source/Kafka

Debezium MySql CDC 카프카 커넥트

정몽실이 2020. 3. 26. 09:12

Debezium MySQL CDC Connector

 

디비지움 커넥터는 카프카 개발자들이 만든 커넥터이다. MySql의 모든 Row-Level 변경사항을 모니터링하고 기록할 수 있다. MySql 서버에 접속한뒤, 일정하게 Database의 스탭샷을 읽어 들인다.

 

스냅샷을 읽은 후에는 MySQL 커밋된 변화를 감지하고 insert, update, delete 이벤트를 생성한다.

각각의 테이블에 일어나는 모든 이벤트들은 카프카 토픽으로 분산되서 쌓이고, application이랑 services 에서 이 토픽을 간편하게 컨슘할 수 있게 된다.

 

공식홈페이지 : https://www.confluent.io/hub/debezium/debezium-connector-mysql

 

Confluent: Apache Kafka & Event Streaming Platform for the Enterprise

Confluent is a fully managed Kafka service and enterprise stream processing platform. Real-time data streaming for AWS, GCP, Azure or serverless. Try free!

www.confluent.io


Debezium Connector 다운로드

 

1. 아래 링크에서 debezium Connector jar 파일들을 다운로드 받는다.

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/

 

2. 위의 jar 파일들이 있는 경로를 connect-distributed.properties의 plugin.path로 설정해준다.

plugin.path=share/java,../plugin/connect

 

3. RestAPI로 커넥터를 등록한다.

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://카프카IP:8083/connectors/ \
    -d '{
      "name": "mysql-connector",
      "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "database.hostname": "mysql ip",
            "database.port": "3306",
            "database.user": "db",
            "database.password": "db",
            "database.server.id": "42",
            "database.server.name": "demo",
            "database.history.kafka.bootstrap.servers": "카프카 IP",
            "database.history.kafka.topic": "dbhistory.demo" , --
            "include.schema.changes": "true" ,
            "database.whitelist": "dev_itemmsadb" , -- dev_itemmsadb 데이터베이스만 Listen
            "database.serverTimezone": "Asia/Seoul" -- MySql locale이 KST로 되어 있기때문에, 명시해줘야 한다.
       }
    }'

정상적으로 등록되었는 지 확인. 등록되었다면 배열 리스트로 보여진다.

curl -X GET http://카프카IP:8083/connectors
["mysql-connector"]

관련 REST API 명령어는 아래 링크에서 확인 가능하다.

https://docs.confluent.io/3.2.0/connect/restapi.html#connect-userguide-rest

 

REST Interface — Confluent Platform

Version 3.2.0 Docs Kafka Connect » Reference » REST Interface View page source REST Interface Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When execu

docs.confluent.io


카프카 커넥터 실행

 

커넥터를 재실행한다.

bin/connect-distributed.sh config/connect-distributed.properties

 

실행하니, 에러가 났었다. 

java.lang.ClassNotFoundException: io.debezium.util.IoUtil
[2018-09-27 14:30:18,783] INFO WorkerSourceTask{id=mysql-cdc-0} Finished commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:496)  
[2018-09-27 14:30:18,785] ERROR WorkerSourceTask{id=mysql-cdc-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)  
org.apache.kafka.connect.errors.ConnectException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF instead of ON. Error code: 1236; SQLSTATE: HY000.  
    at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)  
    at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:167)  
    at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:957)  
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:921)  
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)  
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)  
    at java.lang.Thread.run(Thread.java:748)  
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF instead of ON.  
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)  
    ... 3 more  
[2018-09-27 14:30:18,789] ERROR WorkerSourceTask{id=mysql-cdc-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) 

 

구글링을 해보니 Path 가 잘못되었으니 상위루트로 설정해보라는 거였음

Problem  

   plugin.path=/opt/kafka_2.11-2.0.0/connect/debezium-connector-mysql 

Please try this instead:  

    plugin.path=/opt/kafka_2.11-2.0.0/connect 


I.e. plug-in.path should point to the parent directory of individual connect plug-ins (directories or JARs). In your case it points to a specific plug-in's directory, so Connect tries to load plug-ins from each of the JARs contained within that directory instead of dealing the entire directory as a single plug-in, as it should. 

 

plugin Path를 루트로 변경해주니 문제해결  

참조 :  https://groups.google.com/forum/#!topic/debezium/6k_PVrWsdYI 

 

 

멀티 브로커 연결

 

카프카가 멀티 Broker로 클러스터가 구성되어 있고 커넥터가 이 클러스터를 읽도록 하고 싶으면

connector propoerties 파일의 bootstrap-server를 넣어주면 된다.

 

커넥터 실행 시, Error : NOT_ENOUGH_REPLICAS 에러 발생

2020-03-31 17:45:33,030] WARN [Producer clientId=producer-3] Got error produce response with correlation id 303 on topic-partition connect-configs-0, retrying (2147483346 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)

위 에러가 쉴틈없이 발생했다....

 

  •  브로커 설정을 살펴보자

    min.insync.replicas : 프로듀서가 acks=all로 설정하여 메시지를 보낼 때, write를 성공하기 위한 최소 복제본의 수를 의미

    우리는 min.insync.replicas = 2로 설정되어 있었음
  • connect-distributed.properties 를 살펴보자

    커넥터가 실행되면 connect-configs, connect-offsets, connect-status 토픽을 발생하는데, 커넥터 실행을 위한 설정 내용이라고 이해하면 되겠다.

    이들도 Topic이기 때문에, replicas factor와 partition이 있다. 나의 경우 이 토픽들의 Replica Factor 가 1로 설정되어 있었다.

결국, 커넥터가 실행될때 connect-configs, connect-offsets, connect-status 토픽의 메시지를 보고 어디부터 읽을 지 정하는데 해당 토픽에 메시지를 보낼때 min.insync.replicas가 2라서 2번의 응답을 받아야 하는데,  해당 토픽들은 Replicas가 1이라 2개의 응답을 보낼 수 없어서 Error: NOT_ENOUGH_REPLICAS 에러가 발생한 것이었다.

 

토픽의 Replication Factor를 브로커의 min.insync.replicas보다 크게 만들어줘야  위의 에러를 해결할 수 있다.


카프카 커넥터 LOG

 

커넥터 Work Processor 가 잘 Listen 하고 있는지 확인하기 위해 로깅이 필요했다.

이 또한 카프카가 설치되어 있다면 properties 파일이 디폴트로 저장되어 있기 때문에, 여기서 path만 추가해주면 된다.

 

connect-log4j.properties

log4j.rootLogger=INFO, stdout, connectAppender,file -- file 추가
log4j.appender.connectAppender.File=../logs-2/connect.log -- 로그를 저장할 Paht 추가

 


레퍼런스

 

debezium 중요 부분만 번역 및 요약

https://debezium.io/blog/2019/02/05/debezium-0-9-0-final-released/ (구글 번역기를 돌리고 중요한 부분만 발췌 및 정리) 직접 debezium을 사용해보고 테스트 한 후, 이 부분을 안 읽고 넘어가는게 좋은 것 같..

knight76.tistory.com

 

 

Debezium 0.9.0.Final Released

After some drinks to celebrate this release, the plan is to do a 0.9.1 release rather quickly (probably in two weeks from now), providing improvements and potential bug fixes to the features and changes done in 0.9. We’ll also begin the work on Debezium 0.

debezium.io

 

Deploying the MySQL connector :: Debezium Documentation

The MBean is debezium.mysql:type=connector-metrics,context=schema-history,server= .

debezium.io

 

 

| 카프카 커넥터 Properties 관련 설정 가이드 문서

https://docs.confluent.io/current/connect/userguide.html

 

Getting Started with Kafka Connect — Confluent Platform

Getting Started with Kafka Connect This document provides information about how to get started with Kafka Connect. You should read and understand Kafka Connect Concepts before getting started. The following topics are covered in this document: Deployment C

docs.confluent.io

 

Comments