꾸준함이 중요한 Lion2me의 기술블로그

Debezium 구축기

21 Apr 2023
DE

CDC를 이용한 FCM(FireBase Cloud Message)시스템 자동화 프로세스 개발기

0. CDC는 무엇인가?

CDC는 Change Database Capture의 약자로 데이터베이스의 변경 사항을 실시간으로 얻어오는 시스템을 말합니다. 여기서 변경사항을 가져온다는 말은 곧 특정 데이터의 변경에 대해서 말하며, 쿼리 자체가 아님을 주의해야 합니다.

대부분의 데이터베이스 시스템은 변경 사항을 저장하는 로그가 있습니다. 특히나 관계형 데이터베이스는 Redo를 위해서라도 거의 무조건적으로 ( bulk를 위해 트랜잭션을 포기하고 밀어넣는 등의 방향성을 제외하면 ) 로그를 쌓게 됩니다. 저는 mariaDB의 데이터를 캡쳐하고 싶었고, 앞으로 MongoDB로 확장 할 수 있다는 생각으로 개발을 시작했습니다.

MariaDB는 DML로 변경 된 데이터에 대해서 binlog라는 형태로 로그를 쌓습니다. 하지만 실제로 binlog를 cat해보면 가관인데, 다음과 같은 화면을 보실 수 있으십니다.

[binlog 이미지]

bin은 말 그대로 binary이고, 이 정보는 적절하게 파싱하여 확인해야 합니다. 일일이 구축하기는 어렵다고 생각하여 공개되어 있는 CDC 플랫폼을 활용하기로했고 저는 debezium을 선택했습니다.

1. 왜 debezium을 사용했는가?

“RabbitMQ를 쓰고 있는데 왜 Debezium을 쓰게 된거야!!!!” 라고 스스로 화를 냈었는데, 가장 중요한 점은 구축과 확장입니다. 상당수의 CDC 프로젝트는 Debezium과 마찬가지로 KAfka 환경에서의 플러그인 개념으로 개발 되어 있었고, 그렇지 않은 애플리케이션은 다양한 데이터베이스를 지원하지 않는 문제가 있었습니다. 거의 RAW한 레벨로 구축 된 시스템도 있었는데 관리면에서 debezium on kafka를 생각하게 되었습니다.

무엇보다 “서버는 충분하다 하고 싶은대로 해봐라”는 감사한 계시가 떨어져서 주요 메시지 브로커는 카프카로 전환하여 구축 할 예정이라 debezium을 활용하기로 했습니다.

하지만 이번 FCM은 지연 전송이 필요하고 Kafka는 지연 전송이 어렵기 때문에 RabbitMQ도 stand-alone으로 하나 띄워서 사용 할 예정입니다.

2. debezium을 사용하려면?

debezium은 일종의 kafka connect를 이용한 하나의 플러그인 개념입니다. 즉 기존의 Connect 기능에 추가 기능을 제공하기 때문에 kafka 서버와 kafka를 오케스트레이션해주는 Zookeeper 그리고 debezium connect 서버를 열어야합니다. 저는 1차적으로 FCM 서비스를 실제로 구축가능 한 지에 대한 연구를 위해 Kafka를 single로 열었고, 주키퍼도 별도의 docker image로 하나 열었습니다.

2-1. 공개 된 docker_compose.yml

싱글 서버에 테스트라 컨슈머가 1개라 복제나 HA 설정은 제외하고 곧바로 debezium으로 connect를 생성했습니다.

version: '3'

services:
  zookeeper:
    image: debezium/zookeeper:2.1
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: debezium/kafka:2.1
    ports:
      - 9092:9092
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - ADVERTISED_HOST_NAME=domain
    volumes:
      - ./kafka-config:/kafka/config
    links:
      - zookeeper

  debezium:
    image: debezium/connect:2.1
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=debezium_connect_config
      - OFFSET_STORAGE_TOPIC=debezium_connect_offsets
      - STATUS_STORAGE_TOPIC=debezium_connect_statuses
    links:
      - kafka
    volumes:
      - ./debezium/mysql-connector-java-8.0.16.jar:/kafka/connect/debezium-mysql-connector/mysql-connector-java-8.0.16.jar
      - ./debezium/debezium.properties:/kafka/connect/debezium-mysql-connector/debezium.properties

docker-compose 파일 예시입니다.

volumes로 debezium connect 이미지의 위치에 설치한 mysql-connector를 공유했지만 사실 debezium/kafka 이미지에 대부분의 connector는 이미 있기 때문에 별도로 설치하고 공유하지 않아도 괜찮습니다. 만약 없다면 개별적으로 설치해서 경로에 jar파일을 넣어주시면 됩니다.

2-2. debezium Connector 추가

docker-compose 설정에 맞게 docker container를 생성하셨다면 debezium connect 서버로 다음의 POST 요청을 보내시면 됩니다.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" {localhost}:8083/connectors/ -d '{ "name":"idb-connector", "config":{   "connector.class":"io.debezium.connector.mysql.MySqlConnector",  "tasks.max":"1",  "database.hostname":"{DB 서버 도메인}",  "database.port":"{DB 서버 포트}",  "database.user":"{DB 유저 이름}",  "database.password":"{DB 유저 비밀번호}",  "database.server.id":"124054",  "database.server.name":"idb",  "database.include.list":"{추적하고자 하는 Database 명을 , 구분}",  "database.history.kafka.bootstrap.servers":"{localhost}:9092",  "database.history.kafka.topic":"schema-changes.idb", "schema.history.internal.kafka.bootstrap.servers":"{localhost}:9092", "schema.history.internal.kafka.topic":"schema-changes-history", "topic.prefix":"idb"}}'

위처럼 POST Request를 전송하면 kafka는 mysql의 binlog를 추적하여 변경 사항을 캡쳐합니다.

2-3. 읽어들인 내용 읽기

캡처한 변경 사항의 topic 이름은 “{database}.{table}”입니다. 실제로 kafka cli로 이것저것 뒤져보면 내용들을 알 수 있습니다.

kafka-topics.sh --bootstrap-server {도메인:포트} --list

위 명령을 입력해보면 topic list를 보실 수 있습니다. 성공적으로 Connection이 등록되었다면 자동으로 해당 topic이 생겼을 것 입니다.

kafka-console-consumer.sh --bootstrap-server {도메인:포트} --topic {토픽} --from-beginning

위 명령은 데이터를 처음 위치부터 읽을 수 있습니다. 이 경우에는 소비자 오프셋과 상관없이 데이터를 읽어들이기 때문에 변경사항이 많은 경우 –from-beginning을 빼주세요.

여기까지 제대로 동작한다면 이제 debezium 설정은 끝났습니다.

각 속성에 대해서 공부 한 내용도 조금 있지만 대략적인건 Chat GPT에게 양보하겠습니다.

주의 할 점이 몇가지 있습니다.

  • debezium은 jdbc를 기반으로 데이터베이스에 접근합니다.
    • JDBC는 5.1버전 부터 KST Timestamp를 제대로 읽지 못합니다.
  • debezium은 각 레코드 단위에서의 변경 사항을 기록합니다.
    • Bulk insert 시, “어떤 동작을 수행했나?”가 아닌 “어떤 데이터가 변경(입력)되었나”를 중점으로 보기 때문에 트랜잭션 단위의 추적을 하려면 general.log를 봐야 합니다.

사용이 정말 편하게 되어 있어서 적용하는데에 큰 어려움은 없습니다.


추가적으로 Binlog를 얻기 위해서는 connection을 연결하는 계정이 그에 맞는 권한이 필요합니다.

필요한 권한은 다음과 같습니다. ( GPT가 잘 알려줍니다만 )

  • REPLICATION SLAVE
  • REPLICATION CLIENT
  • SELECT
  • SHOW VIEW
  • RELOAD

GPT가 잘 알려주지 못하는 부분으로 MYSQL의 Timezone에 대한 이슈는 현재 해결중입니다. 기본적으로 설정을 System으로 정하고 System default를 KST로 설정해놓은 현재 서버에서 이 문제를 해결하면 추가로 적도록 하겠습니다.