개성있는 개발자 되기

카프카 커넥트 실행 본문

Open Source/Kafka

카프카 커넥트 실행

정몽실이 2020. 3. 25. 17:35

카프카 커넥트

 

카프카 커넥트는 아파치 카프카의 일부로 포함되어 있으며, 카프카와 데이터스토어 간에 데이터를 이동하기 위해 확장성과 신뢰성 있는 방법을 제공한다.

또한, 커넥터 플러그인을 개발하고 실행하기위해 RestAPI를 사용할 수 있다.

 

카프카 커넥트는 여러 개의 작업 프로세스(worker process)들로 실행된다. 그리고 커넥터 플러그인을 작업 프로세스에 설치한 후, RestAPI를 사용해서 특정 구성으로 실행되는 커넥터를 구성하고 관리한다.

 

커넥터에는 소스(source) 커넥터와 싱크(sink) 커넥터 두 종류가 있다. 

   - 소스 커넥터 : 소스 시스템으로부터 데이터를 읽어서 커넥트 데이터 객체로 work process에 제공

   - 싱크 커넥터 : work process로부터 커넥트 데이터 객체를 받아서 대상 시스템에 쓴다.

 

  •    컨버터를 사용해서 다양한 형식의 데이터 객체들을 저장할 수 있다. (JSON 등)

 

http://kafka.apache.org/documentation/#connect 

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

https://docs.confluent.io/current/connect/index.html 

 

Kafka Connect — Confluent Platform

Kafka Connect Kafka Connect, an open source component of Apache Kafka®, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Using Kafka Connect, you can deploy connector implement

docs.confluent.io


카프카 실행

 

커넥트를 실제 업무에서 사용할 경우, 하나 이상의 별도 서버에서 커넥트 작업 프로세스를 실행해야 한다. 

일부 서버에서는 브로커를 시작시키고, 나머지 다른 서버에서는 커넥트 작업 프로세스를 실행시키면 된다.

 

-- 기본
bin/connect-distributed.sh config/connect-distributed.properties
-- 소스/싱크 포함
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

connect-distributed.sh [connect 설정정보][소스 커넥터 설정정보][싱크 커넥터 설정정보]

 

connect-distributed.properties

 

  • bootstrap.servers - List of Kafka servers used to bootstrap connections to Kafka
  • key.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • group.id (default connect-cluster) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs
  • config.storage.topic (default connect-configs) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, compacted topic. You may need to manually create the topic to ensure the correct configuration as auto created topics may have multiple partitions or be automatically configured for deletion rather than compaction
  • offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction
  • status.storage.topic (default connect-status) - topic to use for storing statuses; this topic can have multiple partitions, and should be replicated and configured for compaction
  • ★ rest.host.name, rest.port - REST API에서 사용할 REST 호스트와 포트를 설정할 수 있다.

 

| 현재 커넥터가 살아있는지 확인

curl -s localhost:8083
//
{"version":"2.4.0","commit":"77a89fcf8d7fa018","kafka_cluster_id":"1jIwhLS_RQ240eIpqpj4mw"}

 

| 커넥터에 연결이 안된다면 8083 포트로 띄어져있는지 확인

-- 현재 띄어져 있는 서버 목록 확인
netstat -lnp

 

| 사용할 수 있는 커넥터 플러그인

$ curl http://localhost:8083/connector-plugins
[{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.1.0.Final"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

 

| 기타 명령어

GET /connectors - return a list of active connectors
POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
GET /connectors/{name} - get information about a specific connector
GET /connectors/{name}/config - get the configuration parameters for a specific connector
PUT /connectors/{name}/config - update the configuration parameters for a specific connector
GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
POST /connectors/{name}/restart - restart a connector (typically because it has failed)
POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration

 

| 카프카 커넥터 Distributed vs Standalone

 

Standalone 모드 - 하나의 work process에서 모든 커넥터와 태스크가 실행된다. 커넥터와 태스크가 특정 컴퓨터에서 실행되어야 할 경우 좋다. Rest API를 사용하는 대신 Command Line으로 실행할 수 있다.

 

The standalone mode works perfectly for development and testing, as well as smaller setups. However, if we want to make full use of the distributed nature of Kafka, we have to launch Connect in distributed mode.
By doing so, connector settings and metadata are stored in Kafka topics instead of the file system. As a result, the worker nodes are really stateless.

공식문서에 따르면 개발, 테스트 시에는 Standalone이 좋지만, 대용량 데이터를 처리하기 위해선 Distributed를 사용하라고 한다.


REST API로 커넥터 등록

 

json 파일로 커넥터를 등록할 수 있다.

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://카프카IP:8083/connectors/ \
    -d '{
      "name": "mysql-connector",
      "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "database.hostname": "mysql ip",
            "database.port": "3306",
            "database.user": "db",
            "database.password": "db",
            "database.server.id": "42",
            "database.server.name": "demo",
            "database.history.kafka.bootstrap.servers": "카프카 IP",
            "database.history.kafka.topic": "dbhistory.demo" ,
            "include.schema.changes": "true" ,
            "database.serverTimezone": "Asia/Seoul"
       }
    }'

Connector properties

The following configuration properties are required unless a default value is available.

https://debezium.io/documentation/reference/0.10/connectors/mysql.html#connector-properties

PropertyDefaultDescription

더보기

name

 

Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)

connector.class

 

The name of the Java class for the connector. Always use a value of io.debezium​.connector.mysql.MySqlConnector for the MySQL connector.

tasks.max

1

The maximum number of tasks that should be created for this connector. The MySQL connector always uses a single task and therefore does not use this value, so the default is always acceptable.

database.hostname

 

IP address or hostname of the MySQL database server.

database.port

3306

Integer port number of the MySQL database server.

database.user

 

Name of the MySQL database to use when connecting to the MySQL database server.

database.password

 

Password to use when connecting to the MySQL database server.

database.server.name

 

Logical name that identifies and provides a namespace for the particular MySQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names emanating from this connector.

database.server.id

random

A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.

database.history.kafka.topic

 

The full name of the Kafka topic where the connector will store the database schema history.

database.history​.kafka.bootstrap.servers

 

A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.

database.whitelist

empty string

An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with database.blacklist.

database.blacklist

empty string

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with database.whitelist.

table.whitelist

empty string

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form databaseName.tableName. By default the connector will monitor every non-system table in each monitored database. May not be used with table.blacklist.

table.blacklist

empty string

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form databaseName.tableName. May not be used with table.whitelist.

column.blacklist

empty string

An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.

column.truncate.to.length.chars

n/a

An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be truncated in the change event message values if the field values are longer than the specified number of characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer. Fully-qualified names for columns are of the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.

column.mask.with.length.chars

n/a

An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk (*) characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.

column.propagate.source.type

n/a

An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters __debezium.source.column.type, __debezium.source.column.length and _debezium.source.column.scale will be used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are of the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.

time.precision.mode

adaptive_time​_microseconds

Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type, with the exception of TIME type fields, which are always captured as microseconds; adaptive (deprecated) captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type; or connect always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision. See Temporal values.

decimal.handling.mode

precise

Specifies how the connector should handle values for DECIMAL and NUMERIC columns: precise (the default) represents them precisely using java.math.BigDecimal values represented in change events in a binary form; or double represents them using double values, which may result in a loss of precision but will be far easier to use. string option encodes values as formatted string which is easy to consume but a semantic information about the real type is lost. See Decimal values.

bigint.unsigned.handling.mode

long

Specifies how BIGINT UNSIGNED columns should be represented in change events, including: precise uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type; long (the default) represents values using Java’s long, which may not offer the precision but will be far easier to use in consumers. long is usually the preferable setting. Only when working with values larger than 2^63, the precise setting should be used as those values can’t be conveyed using long. See Data types.

include.schema.changes

true

Boolean value that specifies whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value includes the DDL statement(s). This is independent of how the connector internally records database history. The default is true.

include.query

false

Boolean value that specifies whether the connector should include the original SQL query that generated the change event.
Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from the snapshot process.
WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason this option is defaulted to 'false'.

event.deserialization​.failure.handling.mode

fail

Specifies how the connector should react to exceptions during deserialization of binlog events. fail will propagate the exception (indicating the problematic event and its binlog offset), causing the connector to stop.
warn will cause the problematic event to be skipped and the problematic event and its binlog offset to be logged (make sure that the logger is set to the WARN or ERROR level).
ignore will cause problematic event will be skipped.

inconsistent.schema.handling.mode

fail

Specifies how the connector should react to binlog events that relate to tables that are not present in internal schema representation (i.e. internal representation is not consistent with database) fail will throw an exception (indicating the problematic event and its binlog offset), causing the connector to stop.
warn will cause the problematic event to be skipped and the problematic event and its binlog offset to be logged (make sure that the logger is set to the WARN or ERROR level).
ignore will cause the problematic event to be skipped.

max.queue.size

8192

Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the binlog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.

max.batch.size

2048

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

poll.interval.ms

1000

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.

connect.timeout.ms

30000

A positive integer value that specifies the maximum time in milliseconds this connector should wait after trying to connect to the MySQL database server before timing out. Defaults to 30 seconds.

gtid.source.includes

 

A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching one of these include patterns will be used. May not be used with gtid.source.excludes.

gtid.source.excludes

 

A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching none of these exclude patterns will be used. May not be used with gtid.source.includes.

gtid.new.channel.position

latest

When set to latest, when the connector sees a new GTID channel, it will start consuming from the last executed transaction in that GTID channel. If set to earliest, the connector starts reading that channel from the first available (not purged) GTID position. earliest is useful when you have a active-passive MySQL setup where Debezium is connected to master, in this case during failover the slave with new UUID (and GTID channel) starts receiving writes before Debezium is connected. These writes would be lost when using latest.

tombstones.on.delete

true

Controls whether a tombstone event should be generated after a delete event.
When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent.
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

message.key.columns

empty string

A semi-colon list of regular expressions that match fully-qualified tables and columns to map a primary key.
Each item (regular expression) must match the <fully-qualified table>:<a comma-separated list of columns> representing the custom key.
Fully-qualified tables could be defined as DB_NAME.TABLE_NAME or SCHEMA_NAME.TABLE_NAME, depending on the specific connector.

Comments