꾸준함이 중요한 Lion2me의 기술블로그

아파치 카프카

02 Oct 2022
DE

Apache Kafka

데이터 엔지니어링 관련 교육을 수강하면서 알게 된 데이터를 수집 및 처리하는 중간에서 확장성과 안정성을 보장해주는 카프카에 대한 글을 적고자 합니다.

Kafka란

카프카는 데이터 파이프라인 중 Producer에서 전송한 데이터를 Consumer가 원할 때 받을 수 있도록 도와주는 메세지 브로커입니다. 간단히 설명하면 과자가 많이 놓여있는 테이블에서 먹고싶은 사람이 찾아와 원하는 만큼 들고가는 모습을 상상하시면 쉬울 것 같습니다.

파티시에는 과자를 만들고 테이블에 놓으면, 먹고 싶은 사람은 테이블 위의 먹고 싶은 과자를 선택해서 원하는 만큼 들고가는 동작이죠.

카프카는 그 중 테이블의 역할을 하는 브로커입니다.

이제 이 예시를 이용해서 카프카의 동작 원리에 대해 알아보겠습니다.

Producer

처음의 예시에서 우리는 파티시에과자를 만든다고 했습니다. 여기서 파티시에에 해당하는 것이 바로 Producer입니다.

과자가 의미하는 것은 어느 형태의 데이터이고, 이 데이터는 로그, 영상이나 글과 같은 비정형 데이터 일 수도 있습니다. 마찬가지로 parquet형태이든, json형태이든 어떠한 방식으로든 부호화되어 있을수도 있죠.

Producer가 하는 일은 다양하지만, Kafka의 입장에서는 어떠한 데이터를 어떤 Topic으로 전달하려는 생산자정도의 의미를 갖습니다.

파티시에는 여러 종류의 과자(데이터)를 만듭니다. 테이블(Kafka)에 “A과자”라는 타이틀(토픽)을 달고 진열을 합니다.

Producer의 동작 과정

Kafka Producer는 Kafka Producer API 혹은 이 API로 구성된 프로그램을 말합니다. Fluentd나 Logstash와 같이 유명한 로그 수집기의 경우에는 이러한 API가 정의되어 있기 때문에 쉽게 사용 할 수 있습니다.

Kafka Producer가 Kafka에게 전달하는 메세지는 다음과 같습니다.

kafka_producer_data.png

  1. 토픽 : 어느 토픽인지
  2. 토픽 중 특정 파티션 위치 : 어떤 파티션을 이용할 지
  3. 메시지 생성 시간 : 생성 시간 ( 재전송과 같은 문제 발생시 순서 특정을 위함 혹은 스냅샷? )
  4. 메시지 키 : RA에 데이터를 보관 할 때 기준이 되는 키
  5. 메시지 값 : 데이터

이러한 값을 가지고 Kafka에 전달하기 위해서는 다음의 과정을 거치게 됩니다.

  1. 직렬화
  2. 파티셔닝
  3. 압축
  4. 레코드 모음
  5. Sender에서 브로커로 전달

1. 직렬화

직렬화는 일반적인 직렬화와 같습니다. 외부에서 데이터를 읽을 수 있도록 특정 형식으로 데이터를 변형시키는 동작입니다. String 직렬화나 바이트 형식의 직렬화를 사용한다고 합니다.

대상이 되는 개체는 메시지 키와 값입니다.

2. 파티셔닝

처음 파티셔닝을 들었을 때 “Producer가 파티셔닝을 왜?”라는 생각이 들었습니다. 하지만, Kafka에 대한 많은 자료를 찾아보니 중요한 이유가 포함되어 있었습니다.

일단 파티셔닝이 무엇인지 말씀드리면 다음과 같은 동작입니다.

Record Accumulator는 브로커로 보내기 전에 데이터를 저장해놓는 공간입니다. 즉 아직 브로커로 전달하지 않은 Producer내에 존재하는 Partition입니다.

이렇게 Record Accumulator를 써야하는 이유는 네트워크 IO의 최적화에 관련이 있습니다.

일단 우리는 통신을 하기 위해서는 TCP든 UDP는 통신 프로토콜을 이용해야 하는 것을 알고 있고, 일반적으로 제어에 용이한 프로토콜은 TCP임을 알고 있습니다. 즉 전송하는데에 있어서 많은 헤더가 포함된 패킷을 보내야 하는 것이죠.

그러면 1초에 50개의 데이터를 전송하고자 하는 Producer가 각 데이터마다 요청을 날리면 어떻게 될까요?

1초에 50개의 데이터에 각각의 데이터에는 헤더가 붙게되며, 수많은 Connection이 열리고 닫히거나, 혹은 계속되는 전송을 위해 계속 Connection이 열려 있어야 하는 문제가 발생 할 수 있습니다. 즉, 엄청난 오버헤드를 낳을 수 있죠.

그래서 Kafka는 어느정도의 데이터를 모아서 보내는 방식을 사용합니다. 물론 이 시간이 짧기 때문에 거의 실시간에 가까운 데이터를 전송하는 것으로 생각 할 수 있지만요. 이러한 이유로 Kafka는 Record Accumulator를 사용합니다.

그런데 한 가지 의문이 남습니다. 왜 굳이 내부를 파티셔닝으로 나눌까요?

이유는 효율과 순서입니다. 위에서 말한 것 처럼 TCP를 이용한 통신에는 헤더가 붙고 네트워크 통신 자체가 매우 무거운 작업이기 때문에 매 입력마다 실행하기에는 오버헤드가 큽니다. 그래서 통신을 수행 할 때 함께 전송 될 토픽에 대해서 특정 파티션의 크기가 어느정도 채워지게 되면 함께 전송 될 토픽의 특정 파티션도 한꺼번에 모아서 전송해버리는 거죠.

그러면 문제가 되는 부분이 있습니다. 바로 순서죠. Producer의 파티션 내의 데이터는 순서대로 Kafka에게 전달되리라는 보장이 없습니다. 왜냐하면 이전에 Producer가 요청한 데이터보다 이후의 데이터가 먼저 전달 될 수 있으니까요.

그래서 우리는 key를 이용해서 특정 파티션을 먼저 사용하도록 강제하는 방법을 사용합니다. 그렇게 되면 파티션의 내용을 전송하는 로직이 여러 번 수행되더라도 결국에는 동일한 순서를 유지 할 수 있습니다.

3. 압축

Producer는 결국 Kafka Broker에게 데이터를 전달해야하는 운명입니다. 그러면 데이터의 크기를 줄이고, 전송 효율을 최적화하면 당연히 좋은 결과를 얻을 수 있습니다.

다음은 Kafka에서 압축 할 수 있는 형태입니다.

Kafka Broker는 클러스터 내의 다른 브로커의 서버에 전달받은 데이터를 복제하는 동작도 수행하기 때문에 복제 효율성도 높여주기 때문에 압축 형식을 잘 선택하는 것은 매우 중요합니다.

4. Accumulate Record

Record들을 하나의 Batch로 모아주는 것을 Accumulate Record라고 합니다. producer의 batch size property에 따라서 얼마나 큰 사이즈로 모을 것 인지를 설정 할 수 있습니다.

이러한 방식을 이용하는 이유는 Partition에 정리해두었습니다.

5. Sender

Sender Threads는 실제로 Kafka 브로커에 전달하기 위한 thread입니다. batch로 모아놓은 데이터를 실제로 kafka로 전달하기 위해 동작합니다.

Accumulate Record 단계에서 우리는 Record들을 하나의 Batch로 묶어주는 동작을 수행했습니다. 하지만 Sender에서는 이러한 batch를 모아서 브로커에 보내기 위해 그룹화 합니다. 카프카 브로커의 설정값인 batch.size 와 linger.ms properties 의 값에 따라 둘 중 하나의 방법으로 모아져서 보내지게 됩니다.

추가적으로 Send하는 규칙 중 어디까지 확인 할 것인지에 대한 설정이 있습니다.

acks 옵션은 1,0,-1(all)의 설정값을 가지고 있는데

  • 1은 전송만 하고 확인하지 않는 설정입니다. 어느정도의 유실이 있더라도 빠른 전송을 원할 때 사용 할 수 있습니다.
  • 0은 리더 파티션에게 받았다는 ack를 받는 설정입니다. 어느정도의 안전성을 보장하지만, 만약 리더노드가 죽는다면 해당 데이터가 유실 될 가능성이 있습니다.
  • -1은 복제 노드에게까지 ack를 받는 설정 입니다. 복제 노드까지 해당 데이터를 가지고 있음을 보장받기 때문에 안전하지만 성능이 느립니다.

Kafka Broker

Kafka Broker는 Kafka를 구축한다고 했을 때 Broker를 구축한다고 말하는 것과 같습니다. 즉 Kafka가 곧 Broker인거죠. 사실 이 부분은 말하기 나름입니다만, 애초에 브로커이기 때문에 이렇게 나누어서 말하겠습니다.

Kafka 브로커는 우리가 Producer를 통해 받은 데이터를 원하는 Consumer에게 전달해주는 중간자 역할을 수행합니다. 그 Consumer는 특정 저장소가 될 수 있고, 다른 Kafka 브로커가 될 수 있으며 Spark나 Flink와 같은 분산 처리 프레임워크의 Input이 될 수 있습니다.

Zookeeper

Kafka를 공부하기 전에 Kafka Cluster를 관리하기 위해 현재 가장 많이 사용되고 있는 Zookeeper의 역할을 알고 갈 필요가 있습니다. Hadoop Cluster 관련한 그림에서 항상 등장했지만, 제대로 공부해 본 적이 없어서 잘 모르고 있지만, 이 부분은 나중에 제대로 알아보겠습니다.

Zookeeper는 Kafka Broker들을 Cluster로 묶어주는 역할을 하는 중요한 분산 코디네이터 입니다.

Kafka Broker는 기본적으로 다른 서버로 분산되어 있을 가능성이 높고, 서로 다른 주소를 가진 별도의 서비스입니다. 하지만 그렇다면 A Broker와 B Broker는 서로의 존재를 모르고, 파티셔닝이 불가능합니다.

이런 부분에서 Zookeeper는 Broker들에게 서로의 존재를 알려주고, 그 중 리더 파티션과 팔로워 파티션을 설정해주는 역할을 해줌으로써 고가용성이 포함 된 Kafka 서비스를 이용 할 수 있도록 도와줍니다.

Zookeeper의 역할은 다음과 같습니다.

  1. 파티션 수와 같은 토픽의 메타데이터 관리, 정보 공유
  2. 리더 파티션의 선출
  3. 브로커의 장애 감지 및 새로운 브로커 추가

요즘 들어서 K8s으로 대신하려는 움직임이 있지만, 아직까지는 주력으로 사용되는 코디네이터입니다.

Kafka Broker의 동작 과정

  1. 가장 먼저 Producer는 Kafka Cluster로 특정 토픽 데이터를 전송하고 Kafka Cluster에 있는 Kafka Broker 중 해당 토픽의 리더 파티션이 그 토픽 데이터를 받습니다.

  2. 그 후 리더 파티션은 팔로워 파티션들에게 기존에 Replication 설정에 맞춰서 복제 데이터를 전달합니다.

  3. Kafka Broker와 매칭되어 있는 Consumer는 해당 Broker가 가지고 있는 데이터를 받아옵니다.

Kafka Broker의 특징

Kafka Broker를 공부하면서 Kafka의 매력을 많이 알게 되었습니다. 사실 데이터를 전송하는데에 있어서 중간 메시지 브로커를 사용해야하나? 라고 생각하면 그 이유를 알기 어려웠습니다.

네트워크 IO를 최소화 하는게 낫지 않아? 라는게 저의 첫 생각이였습니다.

다양한 인메모리의 브로커들에도 해당하는 의문이지만, 이 의문을 해결하면서 동시에 Kafka만이 가진 특징에 대해서 공부해보겠습니다.

1. 디스크를 이용한 데이터 재입력

특정 데이터를 수집했을 때 해당 데이터를 유실하지 않기 위해서는 몇 가지의 방법이 있습니다. 가장 대표적인 방법은 디스크를 이용하는 방법입니다. 사실 큰 규모의 데이터를 디스크에 저장하고 다시 읽어오는 것은 당연하지만, 이 방법에는 문제가 있습니다.

내가… 어디까지.. 가져왔더라?

정말 무시하게 데이터를 저장하는 방법으로 일 단위 데이터를 저장하는 방법이 있습니다. 예를 들면 2022년 10월 23일의 데이터를 통째로 저장해놓으면 해당 날짜의 데이터에 유실이 발생하면 통째로 넣으면 됩니다.

이 과정에서 테라바이트 급 이상의 데이터라면 엄청난 오버헤드가 발생하는건 당연합니다. 그러면 어떻게 해결 할 수 있을까요?

이러한 문제를 해결하기 위해서 Kafka에서는 Offset이라는 개념을 사용합니다. 메모리 관련 학습을 할 때 자료형에 따라 Offset을 이용해 값을 읽어오던 바로 그 Offset입니다.

Consumer가 데이터를 가져올 때 “나 여기까지 가져왔어!”라는 Offset을 파티션에게 넘겨주는 것입니다. 그러면 파티션은 Consumer가 어디까지 가져왔는지 알기 때문에 이후의 데이터만 전송하다가, 이 과정에서 데이터가 제대로 전달 되지 않으면 특정 위치부터 새로 받아버리면, 쉽게 데이터를 가져 올 수 있죠.

물론 이 부분도 큰 오버헤드는 발생하지만, 전체 데이터를 가져오는 것 보다는 훨씬 나은 성과를 가져 올 것은 쉽게 알 수 있습니다.

2. 단일 리더 복제

Kafka는 이전에 공부 한 내용 중 단일 리더 복제방식을 도입하고 있습니다. 이 방식으로 Producer에게서 받은 데이터는 리더 파티션이 먼저 받아서 저장하고 난 뒤 팔로워 파티션에게 전송하여서 동일한 데이터를 가지고 있도록 유지하는 방식입니다.

Kafka는 서버의 안전성을 위해서 혹은 같은 토픽을 여러 Consumer에게 전송하기 위해서 이러한 파티션 차원의 단일 리더 복제를 사용하고 있고 관련한 대표적인 설정들이 있어서 쉽게 관리 할 수 있습니다.

  1. Producer에게 복제의 성공 유무를 리턴 ( Producer 쪽의 acks 속성 )
  2. Replicate 관련 옵션 ( 몇 개의 파티션에 복제 할 것인지 )

3. 1:1 Consumer 매칭

여기서 1:1은 Broker와 Consumer의 매칭이 아닌 파티션과의 매칭임을 알아야 합니다. 이렇게 전송하는 이유는 이전에 알아본 Offset입니다. 특정 Consumer가 데이터를 전송받을 때 어디까지 전송받았는지를 알고 있기 때문에 중간에 파티션이 바뀌면 정상적인 동작이 되지 않을 수 있습니다.

이러한 이유로 만약 특정 Consumer가 장애가 발생하여 데이터를 가져올 수 없게 되면, 매칭되었던 파티션은 다른 Consumer에게 붙게 됩니다. 마찬가지로 파티션의 개수가 Consumer보다 많으면 아무( 조건을 몰라서 ) Consumer와 매칭됩니다. 하지만 Consumer가 더 많다면, 파티션 수를 초과하는 Consumer는 유휴 자원이 됩니다.

Consumer의 확장이 예상된다면 파티션의 수를 여유있게 줘야 할 수 있지만, Kafka 파티션은 늘릴수는 있어도 줄일수는 없기 때문에 확장에 주의를 기울여야 합니다.

4. 그럼에도 좋은 효율

위의 3가지의 안전성을 위한 조치를 취함에도 불구하고 효율은 매우 좋은 Kafka의 비결은 적절한 배치와 부하가 생길 수 있는 부분은 Producer와 Consumer에게 넘기는 것입니다.

Kafka에는 효율에 관련이 있는 설정 값들이 있는데, 크게는 다음과 같습니다.

1. linger.ms

얼마나 배치로 묶어서 보낼 것인지에 대한 설정 값입니다. 실제로 많은 사용기에서 이 설정 값을 충분히 크게해서 약간의 지연시간을 가지지만, 확실히 높은 효율을 가졌다는 의견이 많습니다.

이 방식은 네이글 알고리즘과 유사한 방식으로 동작하는데, 데이터를 개별적으로 보내는건 TCP 헤더를 포함한 네트워크 IO 상에서 매우 비효율적입니다. 그래서 어느정도의 데이터는 묶어서 보내는 배치 방식을 적용하는거죠.

2. batchsize

어느정도의 크기까지 데이터를 모아서 보낼 것인지에 대한 부분입니다. linger.ms는 일정 시간이기 때문에 그 시간내에 데이터가 얼마나 쌓였는지는 신경쓰지 않지만, batch size 설정은 크기에 신경쓰고 시간에는 신경쓰지 않습니다.

둘 다 장단점이 있으니 필요에 따라 적용하면 될 것 같습니다. 참고로 둘 중에 하나만 사용하는 것으로 알고 있습니다.


Consumer

Consumer는 Kafka에서 데이터를 받아오는 곳입니다. 대체적인 설명은 다른 부분에 대해서 이야기 한 것 같아서 크게 이야기 할 부분은 없는 듯 합니다.

Consumer는 Kafka에서 데이터를 전달받으면, 얼마나 받았는지에 대한 정보 Offset으로 Kafka에 다시 알려줌으로써 다음 데이터를 받을 준비를 하고 있습니다.

주의 할 부분은 데이터가 순차적으로 입력되는지는 확실하지 않다는 점입니다. Consumer는 그저 Kafka에 등록된 Topic으로 저장된 파티션 내 데이터를 읽어오기 때문에 실제 데이터의 순차적인지는 확실하지 않습니다.

이 외에는 대부분의 설정이 Zookeeper 혹은 Kafka Broker에게 해당 Consumer의 상태를 알리기 위한 하트비트 정도입니다. 필요 할 때 보고 파악하면 될 것 같습니다.

참고

https://www.linkedin.com/pulse/kafka-producer-overview-sylvester-daniel%20/

https://always-kimkim.tistory.com/entry/kafka101-producer

https://dol9.tistory.com/277

https://forum.confluent.io/t/what-should-i-use-as-the-key-for-my-kafka-message/312

https://always-kimkim.tistory.com/entry/kafka101-broker

https://sungjk.github.io/2021/01/10/kafka-consumer.html