개성있는 개발자 되기

Debezium 카프카 커넥트 메시지 Transform 본문

Open Source/Kafka

Debezium 카프카 커넥트 메시지 Transform

정몽실이 2020. 3. 31. 15:34

커넥터에서 보내는 메시지는 Key와 Value로 구성되어 있다.

Key는 스키마 정보, Value는 변경된 실제 데이터로 보면된다.

 

| Key : 스키마 정보 - 스키마에 대한 변경이 없으면 null

{
  "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key",
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1001
  }
}

| Value : 실제 변경된 데이터

{
  "schema": { ... },
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": {
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "0.10.0.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u",
    "ts_ms": 1465581029523
  }
}

 

이렇게 Debezoum은 굉장히 복잡한, 많은 정보를 발생하는데, Debezium의 UnwrapFromEnvelope single message transformation (SMT)로 간단한 메시지로 변형할 수 있다.

 

1. before, after 값을 제외하고 after 값만 보고자할 때

-- AS-IS
{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}
-- TO-BE
{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}
-- 해당 옵션을 커넥터에 추가
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

 

2. 따로 값을 추가하고자 할때

"transforms": "unwrap" ,
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" ,
"transforms.unwrap.add.fields": "op,table" ,	

아래 항목이 추가된다.

    "__op": "u",
    "__table": "item_kafka_lnkg_tgt"

3. 특정 Field를 추출 (테스트 못해봄)

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}
  1. extracting only the new row’s state from Debezium’s change data message

  2. extracting the id field from the key struct, then the same key is used for the source and both destinations. This is to address the fact that the Elasticsearch connector only supports numeric types and string as keys. If we do not extract the id the messages will be filtered out by the connector because of unknown key type.

  3. use key from the event instead of generating a synthetic one

  4. type under which the events will be registered in Elasticsearch

카프카 커넥트의 Transform 옵션은 아래 페이지에서 자세하게 확인 가능하다.

https://docs.confluent.io/current/connect/transforms/extractfield.html

 

ExtractField — Confluent Platform

ExtractField The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ExtractField. Description ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire ke

docs.confluent.io

 

Comments