꾸준함이 중요한 Lion2me의 기술블로그

데이터 중심 애플리케이션 설계 - 스트림 처리

18 Feb 2023
DE

이 글은 Martin Kleppmann의 데이터 중심 애플리케이션 설계를 읽고 기억하고자 적는 게시글입니다.

11. 스트림 처리

데이터를 처리하는 방법은 크게 3가지로 나눌 수 있습니다.

  1. 서비스 ( 온라인 시스템 )
    • 고객의 요청에 대해서 빠른 응답을 주기 위한 데이터 처리 방식입니다. 예를들면 배달의 민족 애플리케이션의 응답 등이 있습니다.
    • 가능한 빠른 응답이 필요합니다.
  2. 일괄 처리 시스템 ( 오프라인 시스템 )
    • 매우 큰 입력을 받아 처리하는 방식으로 주로 반복적인 스케줄링으로 움직입니다
    • 주요한 지표로 처리량을 사용하고, 이 뜻은 어느 정도의 양의 데이터를 처리하는데 사용 된 시간을 나타냅니다.
  3. 스트림 처리 시스템 ( 준실시간 시스템 )
    • 스트림 처리는 일괄 처리 시스템의 방식과 유사하게 요청이 아닌 입력 값을 기반으로 동작합니다.
    • 일괄 처리와 차이점으로 데이터를 입력 받는 순간 처리가 시작됩니다.

일괄 처리에서 정렬과 같은 연산을 하기 위해서는 출력을 하기 전에 모든 입력을 읽어야 합니다. 파일이나 그룹으로 묶여있는 데이터에게 이런 동작은 bulk 연산이 가능하고, 재입력 또한 가능하기 때문에 효과적입니다. 하지만 많은 경우 이러한 일괄 처리가 적절하지 않는 경우도 있습니다. 사용자는 데이터를 항상 생성 할 수 있기 때문입니다.

이러한 서비스를 일괄 처리로 해결하려면 매 시간 데이터를 모아서 특정 기간이 되었을 때(청크에 가득 차거나) 일괄적으로 처리하는 방식으로 동작해야 합니다. 하지만 사용자는 거의 실시간으로 서비스를 이용하고 싶기 때문에 이런 일괄처리가 동작하기 전까지 서비스를 이용 할 수 없게 되면 불편을 겪게 됩니다.

이런 문제를 해결하기 위해 시간을 매우 짧게 설정해서 1초마다 동작하도록 설계하거나 단순하게 이벤트가 발생할 때 마다 처리하는 방식을 스트림 처리라고 합니다.

이벤트 스트림 전송

일괄 처리 환경에서의 작업은 입출력이 ( 일반적으로 )파일입니다. 스트림에서의 입력은 하나의 레코드이며, 이 레코드는 ( 일반적으로 )타임스탬프를 포함 한 하나의 이벤트라고 볼 수 있습니다. 예를 들면 웹 로그같은 경우입니다.

여기서 이벤트는 텍스트 문자열이나 JSON, Binary 형태로 부호화됩니다. 그리고 이러한 부호화 된 이벤트를 파일의 끝이 이어쓰는 방식으로 저장하거나, RDB에 저장하거나, 문서 데이터베이스(Mongo, Elastic .. )에 기록하는 식으로 저장 할 수 있습니다. 뿐만 아니라 부호화가 되어 있기 때문에 다른 네트워크로 보내서 처리 할 수 있도록 할 수 있습니다.

일괄 처리는 파일을 기반으로 동작하기 때문에 이전 처리의 결과를 다음 처리의 입력으로 사용 할 수 있습니다. 이와 유사하게 스트림 처리는 여러 작업에 동시 사용을 지원하기 위해 생산자 - 소비자의 개념으로 스트림 시스템을 구축 할 수 있도록 동작합니다. 특정 레코드 그룹을 구분하기 위해 토픽이나 스트림으로 묶어서 사용 할 수 있습니다.

여기서 하나의 문제가 등장합니다. 바로 생산자와 소비자를 어떻게 연결 할 것인가?라는 질문입니다. 파일의 끝에 내용을 추가하면서 tail로 읽는 것도 하나의 방법이 될 수 있고, 데이터베이스에 적재하고 다시 읽으면서 처리를 할 수도 있습니다. 하지만 데이터스토어를 이러한 용도로 설계하지 않았다면, 데이터를 주기적으로 풀링(pulling)하는 것은 비용이 크고, 동작이 잦을수록 더욱 큰 오버헤드를 갖게 된다고 합니다.

메시징 시스템

위와 같이 새로운 이벤트를 소비자에게 알려주는 방법으로 메시징 시스템이 있습니다. 메시징 시스템의 가장 간단한 방법은 생산자와 소비자 사이에 유닉스 파이프나 TCP 연결 같은 직접 통신 채널을 사용하는 방법입니다. 그리고 그 이후에 나오는 메시징 시스템은 대부분 이 방법을 확장시킨 방법입니다. 유닉스 파이프와 TCP는 전송자 하나를 정확히 수신자 하나에 연결하는데에 비해 확장 된 메시징 시스템은 다수의 소비자 노드가 토픽 하나에서 메시지를 받아 갈 수 있도록 합니다.

발행(생산)/구독(소비)는 다양한 접근법을 사용하는데 다음과 같은 질문이 파생됩니다.

  1. 생산자가 소비자보다 메시지를 처리하는 속도보다 빠르게 미시지를 전송하면 어떻게 될 것인지?
    • 메시지를 버릴 수 있습니다.
    • 큐에 메시지를 버퍼링(대기) 할 수 있습니다.
    • 흐름제어하여 더 이상 보내지 않도록 합니다.
      • 유닉스 파이프나 TCP는 버퍼가 가득차면 소비 될 때까지 생산자를 막습니다.
  2. 노드가 죽거나, 일시적으로 오프라인이 되면 손실되는 메시지는 없는지?
    • 지속성을 위해서는 디스크에 저장하거나, 복제해야 합니다. 메시지를 잃어도 괜찮다면 같은 하드웨어에서 처리량을 높이고 지연 시간을 낮출 수 있습니다.

생산자에서 소비자로 메시지 직접 전달

직접 메시징 시스템은 설계 상황에서는 잘 동작하지만, 소비자 서버가 죽으면 전송 된 데이터는 처리되지 않고 네트워크 상에서 떠돌다 유실되게 됩니다.

메시지 브로커

직접 전달의 유실 문제를 해결하기 위해 메시지 브로커라는 개념이 등장했습니다. 간단히 말하면 메시지 스트림을 처리하는 데 최적화된 데이터베이스의 일종이라고 볼 수 있습니다. 메시지 브로커는 하나의 서버로 생산자와 소비자가 접속하여 생산자는 브로커에게 전송하고 소비자는 브로커에서 메시지를 읽어 전송받습니다.

브로커에게 데이터가 모이기 때문에 클라이언트의 상태에도 쉽게 대처 할 수 있습니다. 전송 받은 메시지를 메모리에 저장하거나, 장애에 대비해서 디스크에 저장 할 수 있고, 소비 속도가 느리면 큐에 계속해서 저장 할 수 있습니다.

소비자는 일반적으로 비동기로 동작하며, 생산자가 메시지를 보낼 때 생산자는 브로커가 버퍼에 메시지를 담았는지만 확인하고, 소비자가 처리하는 것까지 기다리지는 않습니다. 큐에 백로그가 있으면 소비가 늦어 질 수 있기 때문입니다.

메시지 브로커와 데이터베이스의 비교

생산자와 소비자를 연결하는데에 기능적인 부분은 데이터베이스도 가능하다고 말했었습니다. 마찬가지로 생산자가 적고 소비자가 쿼리하면 원하는 데이터를 가져 올 수 있음은 동일합니다만, 여기서 데이터베이스와 메시지 브로커의 차이를 알아보면 좋습니다.

  1. 데이터베이스는 삭제하지 않으면 영구적으로 데이터를 저장하고 메시지 브로커는 소비자가 전달 받으면 자동으로 삭제됩니다.
  2. 메시지 브로커는 메시지를 빨리 지우기 때문에 큐가 작습니다. 만약 소비자가 느려서 많은 메시지가 버퍼링하면 메시지를 디스크에 저장 할 수 있기 때문에 처리 시간이 길어지고 전체 처리량이 저하 될 수 있습니다.
  3. 데이터베이스는 보조 색인을 지원하고 메시지 브로커는 특정 패턴에 부합하는 토픽의 부분 집합을 구독하는 방식으로 데이터를 선택 할 수 있도록 합니다.
  4. 트랜잭션의 격리 수준에 따라 데이터를 변경할 때, 다른 변화가 동시에 발생하더라도 알아차리기 힘듭니다. ( 갱신 유실, 쓰기스큐 ) 하지만 메시지 브로커는 변경 내역이 순차적으로 들어오기에 알아차릴 수 있습니다.

복수 소비자

  1. 로드 밸런싱
    • 각 메시지를 소비자 중 하나로 전달하는 방식으로, 메시지를 처리하는 비용이 높은 처리를 병렬화 하기 위해 소비자를 늘릴 때 주로 사용합니다.
  2. 팬 아웃
    • 각 메시지를 모든 소비자에게 전달합니다. 여러 독립적인 작업을 하는 소비자가 경쟁없이 동일한 메시지를 받을 수 있습니다.

위 두 가지 패턴을 동시에 사용 할 수 있습니다.

확인 응답과 재전송

소비자는 언제라도 장애가 발생 할 수 있습니다. 브로커가 소비자에게 메시지를 전달해도 소비자가 메시지를 처리하지 못했거나, 중간에 장애가 생기면 메시지를 잃어버릴 수 있습니다.

이런 문제를 방지하고자 메시지 브로커는 확인 응답을 사용합니다. 소비자는 받은 메시지를 처리하면 메시지 브로커에게 알려 브로커가 큐에서 해당 메시지를 삭제 할 수 있도록 하고, 어디까지 읽었는지 체크 할 수 있습니다. 만약 확인 응답이 오지 않는다면 처리되지 않았다고 가정하고 다른 소비자에게 동일한 메시지를 보냅니다.

로드밸런싱을 사용하면서 확인 응답이 오지 않아 다른 소비자에게 동일한 메시지를 보내게 되면 한 가지 문제가 생기는데 바로 순서가 다르게 전달 될 수 있습니다.

위를 보면 3번 작업을 실행하던 중 장애가 발생하여 확인 응답을 보내지 못했고, 3번을 4번 작업 이후에 보내는 것을 볼 수 있습니다. 서로 독립 된 작업을 하는 큐라면 이렇게 보내는 순서가 달라도 서로의 작업에 영향은 없지만, 관련성 있는 작업을 수행한다면 원하지 않는 결과가 나올 수 있습니다.

파티셔닝된 로그

메시지 브로커는 메시지를 임시로 보관(심지어 디스크를 사용하더라도)하는 특징이 있기 때문에 소비자를 새로 등록하거나, 다시 읽기 시작하더라도 이전 메시지를 처리 할 수 없습니다. 이와 대조되는 데이터베이스는 영구적으로 데이터를 가지고 있기 때문에 언제든 읽을 수 있지만 지연시간이 깁니다.

이 두 가지의 특징을 모두 가질 수 있는 방법으로 로그 기반 메시지 브로커라는 개념이 등장했습니다.

로그를 사용한 메시지 저장소

여기서 로그는 추가(쓰기) 전용 로그로 생산자가 보낸 메시지는 로그의 끝에 추가되고 소비자는 순차적으로 읽어서 메시지를 받습니다. 유닉스의 tail -f라고 생각하면 편합니다.

로그 쓰기의 경우에는 디스크 IO의 처리량이 중요하기 때문에 파티셔닝을 통해 처리량을 높일 수 있습니다. 여기서 다른 파티션은 다른 장비에서 서비스 할 수 있습니다. 파티셔닝을 통해 각 파티션은 독립적으로 읽고 쓰기가 가능한 분리 된 로그가 되고, 토픽으로 파티션들의 그룹을 정하여 사용 할 수 있습니다.

각 파티션 내에서 브로커는 모든 메시지에 오프셋이라는 개념을 사용하여 단조 증가하는 순번을 부여합니다. 전체 메시지에 순서를 보장하기 위해 사용하지만 다른 파티션간의 순서를 보장하지는 않습니다.

카프카가 보통 이러한 방식을 사용하여 모든 메시지를 디스크에 저장하지만 여러 장비에 메시지를 파티셔닝하여 초당 수백만 개의 메시지를 처리 할 수 있으며, 복제를 통해 장애에 대비 할 수 있다고 합니다.

로그 방식과 전통적인 메시징 방식의 차이

로그 방식은 소비자가 서로 영향을 받지 않고 독립적으로 데이터를 전달 받을 수 있습니다. 또한 개별 메시지를 소비자 클라이언트에게 할당하지 않고, 소비자 그룹 간 로드밸런싱을 하기 위해 소비자 그룹의 노드들에게 전체 파티션을 할당 할 수 있습니다.

위의 문장은 헷갈릴 여지가 있어서 추가 설명을 하면, 컨슈머 그룹은 얼마든지 노드가 추가 제거 될 수 있기 때문에 컨슈머 그룹은 각 컨슈머들에게 작업을 균등하게 분배 할 수 있어야 합니다. 이렇게 균등하게 분배하는 것을 컨슈머 리밸런싱이라고 합니다.

컨슈머 리밸런싱컨슈머 그룹이 구독한 토픽의 파티션과 그룹 멤버들을 트래킹하기 위해 동작하며, 파티션이나 그룹 멤버에 변화가 있으면 작업을 균등하게 재분배하기 위해 동작합니다. 그룹 코디네이터라는 이름으로 각 컨슈머 그룹별로 카프카 클러스터 내의 브로커 중 하나에 위치합니다.

즉, 소비자 그룹 내의 노드들에게 해당 토픽을 저장한 파티션들의 데이터를 모두 전송함으로써 특정 데이터를 전달받은 노드의 상태 변화에 대해 안정성을 부여합니다.

하지만 이런 방식은 파티션과 소비자 간의 1:1 전달이 기본이라는 카프카의 특성상 동시에 전달하는 소비자 수가 파티션 수보다 적거나 같아야 한다는 단점이 있습니다. 유사한 이유로 Head Of Line 문제도 발생 할 수 있습니다.

그래서 저자는 굳이 순서를 보장 할 필요가 없다면, 굳이 로그 기반의 접근법을 사용 할 필요가 없다고 합니다.

소비자 오프셋

소비자 측면에서 데이터를 처리하다가 노드가 장애가 발생 한 경우, 소비자 그룹 내 다른 노드가 장애가 발생 한 노드의 파티션을 할당받고 마지막 기록 된 오프셋부터 메시지를 처리하기 시작합니다.

이 오프셋은 복제에서 리더노드의 스냅샷을 팔로워가 받아서 이후 변동 내역을 추적하는 로그와 유사합니다.

디스크 공간 사용

소비자의 처리속도가 생산자의 속도보다 늦으면 큐는 쌓이게 되어 있습니다. 디스크 공간은 유한하기 때문에 원형 버퍼를 사용해서 가득차면 오래 된 메시지 순서대로 지우는게 일반적입니다.

디스크에 저장하면 어느정도의 기간동안 데이터를 보관하여 안전하게 사용 할 수 있지만, 모든 메시지를 디스크에 기록하기 때문에 로그 처리량이 일정하지만 매우 느립니다.

그래서 메모리에 메시지를 기록하고 큐가 너무 커지면 디스크에 기록하는 메시징 시스템을 주로 사용합니다.

소비자가 생산자를 따라잡을 수 없을 때

기본적으로 로그 기반 접근법은 버퍼링 ( 처리가 늦어지면 버퍼에 보관하는 방식 )으로 볼 수 있습니다. 하지만 디스크의 보관 한도보다 넘어간 데이터는 다시 읽을 수 없는건 메시지 버리기( 처리가 늦어지면 삭제 ) 방식과 같습니다.

그런데 우리는 오프셋을 사용하기 때문에 소비자가 로그의 헤드에 비해 얼마나 뒤쳐졌는지 모니터링 할 수 있습니다. 그렇기 때문에 메시지가 버려지기 전에 먼저 소비자 처리 속도를 개선 할 수 있습니다.

무엇보다 어떤 소비자가 너무 뒤쳐져서 메시지를 잃기 시작해도, 다른 소비자는 다른 파티션에게서 전달받기 때문에 서비스 측면에서 치명적인 문제로 번지지 않습니다. 이 점은 개발이나 테스트, 디버깅 목적으로 실제 프로덕션 로그를 소비 할 수 있는 매우매우 좋은 점입니다.

오래된 메시지 재생

소비자의 출력을 제외하고, 메시지 처리의 유일한 부수 효과는 소비자 오프셋 이동입니다. 즉 소비자 오프셋을 처음으로 되돌리면 소비자는 처음부터 데이터를 처리합니다.

오래된 메시지를 재생하려면 다시 재생하고 싶은 시점의 스냅샷을 기반으로 소비자 오프셋을 되돌리면 끝입니다.

데이터베이스와 스트림

데이터베이스의 처리도 데이터 유실이 없는 처리 방안이 필요합니다. 많은 서비스들은 데이터베이스에서 조회하여 서비스를 제공하고, 다른 데이터 플랫폼에게도 동기화하는 등의 일은 자주 발생하는 일입니다.

여기서 데이터베이스가 수정 될 때 데이터베이스 기록 이벤트의 스트림으로 사용되는 복제 로그(팔로워들에게 쿼리 형태로 날려보내서 자신과 동일한 형태로 있게 하기 위한 등 .. )를 사용하는 등의 방법으로 문제를 해결하고 있습니다.

시스템 동기화 유지하기

일반적으로 데이터 저장과 질의, 처리 요구사항을 모두 만족하는 단일 시스템은 없습니다. 즉 몇 가지 기술의 조합으로 사용자에 대응하는 데이터 처리 플랫폼이나 분석을 위한 OLAP 데이터 처리 플랫폼을 구현합니다. 그리고 데이터베이스를 중간 저장소로 사용하려면 이러한 플랫폼들간의 동기화가 필수적입니다.

데이터베이스에 데이터가 수정되면 그 값을 검색 색인이나 데이터 웨어하우스에 ETL 과정을 통해 반영하는데, 이 때 데이터가 너무 많으면 덤프를 하는 것 자체가 큰 부하를 줄 수 있습니다.

위 문제를 해결하는 방안으로 데이터가 수정될 때 데이터베이스에 쓰는 것과 검색 색인에 적는 등의 동작을 동시에 수행하는 이중 기록이 있습니다만, 이 방법또한 갱신 중복과 유사한 문제가 발생 할 수 있습니다.

그러면 안정적으로 스트림 처리를 하기 어려 울 수 있습니다.

변경 데이터 캡쳐

위의 방법에서는 데이터베이스를 읽거나, 혹은 동시에 처리를 하는 방법으로 문제를 해결해보려 했지만, 아쉽게도 좋은 방안은 등장하지 않았습니다.

그러면 이야기가 나왔던 복제 로그를 사용해보면 어떨까요? 아쉽게도 데이터베이스의 복제 로그가 적히는 방법에 대해서는 문서로 제공하고 있지 않기 때문에 사용이 어려웠습니다.

하지만 데이터베이스에 기록하는 모든 데이터의 변화를 관찰해 다른 시스템으로 복제하는 변경 데이터 캡처(CDC)에 대한 관심이 높아지면서 해결방안이 하나 등장했습니다.

변경 데이터를 캡처 할 수 있다면 검색 색인이든 외부 처리 플랫폼이든 변경한 내역을 추적하면서 처리를 할 수 있습니다. 그림으로 보면 조금 더 이해가 잘 되었습니다.

CDC1.png

변경 데이터 캡처는 변경 사항을 캡처 할 데이터베이스 하나를 리더로 선정하고 나머지는 팔로워로 설정합니다. 그 후 변경 사항을 로그 기반 메시지 브로커를 통해서 메시지 순서를 유지한 채로 파생 데이터 시스템으로 전달합니다.

기본적으로 비동기로 동작하면서 불안정해보일 수 있지만, 데이터베이스가 트랜잭션(커밋)을 잘 지킨다면 안정성도 어느정도 보장됩니다. 그럼에도 복제 지연의 모든 문제가 발생하는 단점이 있기 때문에 조심스럽습니다.

초기 스냅숏

전문 색인등을 아예 새로 구축 할 때는 변경 사항만으로는 어려 울 수 있습니다. 그렇다고 모든 변경 로그(처음부터)를 가지고 있기에는 너무 클 것입니다. 그러면 변경되지 않은 정보에 대해서는 알 방법이 없기 때문에 결국 전체 데이터베이스의 복사본이 필요 한 순간이 생깁니다.

변경 로그의 위치나 오프셋이 대응되어있는 스냅숏이 필요합니다. 그래야 스냅숏으로 해당 지점의 데이터베이스를 구축을 한 뒤 변경 사항을 추적하면서 갱신 할 수 있기 때문입니다.

로그 컴팩션

이미 공부 한 적이 있는 로그 컴팩션또한 하나의 답이 될 수 있습니다. 애초에 들고있는 변경 사항이 현재까지 변경 된 데이터의 가장 최근 사항이라면 변경 된 데이터라면 일단 최근 변경 사항이 모두 모여있기 때문에 복구하는 것도 가능합니다.

로그 구조화 저장 엔진에서 사용한 툼스톤을 이용해서 모든 데이터가 기본키를 포함 한 형태로 컴팩션을 진행하면 가장 최신 정보를 유지하는 것이 가능합니다. 소비자가 처음부터 재구성해야 하는 일이 발생하면 컴팩션 되어 있는 로그에 오프셋을 0으로 두면 됩니다.

아파치 카프카의 경우 로그 컴팩션으로 동작한다고 합니다.

변경 스트림용 API 지원

여러 데이터베이스가 DCD 지원보다는 변경 스트림을 기본 인터페이스로 지원하고 있습니다. 다 알아보지는 않고, mongodb의 경우에는 oplog라는 변경사항을 알려주는데, 이 값을 사용 할 수 있습니다.

카프카 커넥트는 카프카를 광범위한 데이터 시스템용 변경 데이터 캡처 도구로 활용하고자 만들어지기도 했습니다. 카프카로 변경 데이터를 스트림하면 여러 데이터 시스템에 적용 가능하고, 스트림 처리 시스템에 이벤트 공급도 가능하다고 합니다.

이벤트 소싱

책의 내용은 제가 이해하기 어렵기 때문에 자체적으로 간추려서 정리하면 다음과 같습니다.

이벤트 단위로 불변 이벤트로 쌓아나가면서 최종 상태를 만들어나가는 방식으로 이해하고 있습니다.

이 방식의 장점은

  1. 어떤 상황이 발생 한 후, 이전 혹은 다음 액션을 추적하기가 쉽습니다. 즉 디버깅이 쉽습니다.
  2. 애플리케이션의 버그를 방지합니다
  3. 부수적으로 발생하는 이벤트에 독립적이기 때문에 쉽게 분리됩니다.

이러한 방식으로 동작하는 시스템은 데이터베이스든 로그 기반 메시지 브로커든 도구와 상관없이 구축 할 수 있습니다.

이벤트 로그에서 현재 상태 파싱하기

저는 이벤트 소싱을 요약하면서 최종 상태를 만들어나간다는 문장을 적었습니다. 사실 대부분의 시스템에서 원하는 것은 최종 상태입니다. 하지만, 이 최종 상태를 만드는 것은 쉽지 않습니다.

예를 들면, A 유저의 장바구니에 담긴 물건을 기반으로 구매 예상 품목을 추천해주는 서비스라면 A 유저의 장바구니에 대한 현재 상태가 필요합니다. 하지만, 이벤트 소싱은 (“물건을 봄”, “장바구니에 담음”, “장바구니에서 제외”) 와 같은 이벤트 단위로 저장되어 있습니다. 만약 3년전에 장바구니에 담은 물건을 알려면 어떻게 해야 할까요?

결국 모든 기간의 해당 유저의 전체 데이터를 봐야만 정답을 알 수 있습니다.

이벤트 로그는 기존의 상태를 표시하는 로그의 컴팩션은 사용 할 수 없습니다. 애초에 상태라는 값을 저장하지 않기 때문에 로그를 컴팩션하는 의미가 없습니다. 대부분의 이벤트 소싱을 지원하는 서비스는 자체적으로 스냅샷을 가지고 있지만, 이벤트 소싱의 본래의 목적은 모든 이벤트를 가지고 있어야 한다는 개념이기에 부족한 부분이 있습니다.

명령과 이벤트

이벤트 소싱의 철학은 이벤트와 명령을 구분해야 합니다. 사용자의 요청은 명령이고 이 시점에서는 명령이 실패 할 수 있습니다. 예를 들면 무결성을 위반하면 실패 할 수 있습니다. 애플리케이션은 명령이 실행가능 한지 확인하고 무결성이 검증되고 명령이 승인되면 명령은 지속성 있는 불변 이벤트로 바뀝니다.

이벤트는 생성 시점에 사실이 되고 사용자가 나중에 취소하는 명령을 수행하고 이벤트로 바뀌더라도 그 이전의 이벤트는 독립적이므로 여전히 사실입니다.

소비자는 받은 이벤트를 거절 할 수 없습니다. 이미 다른 소비자도 동일한 이벤트를 처리하고 있을테니까요. 따라서 명령이 유효한지 판단하는 것에 조심해야 합니다. 이 때 직렬성 트랜잭션등을 이용해서 유효성을 판단하거나 이벤트를 두 개로 나누어서 해당 이벤트의 유효성을 한 단계 추가하여 판단 할 수 있습니다.

불변성

불변성은 입력 파일에 손상을 주지 않고 기존 파일에 실험적인 처리를 자유롭게 할 수 있는 장점이 있습니다. 어떤 상태는 이전의 처리들의 결과이기 때문에 이벤트의 적분이 A부터 B까지의 상태로 생각 할 수 있고, 미분이 해당 지점의 상태 변화량으로 생각 할 수 있기에 더 다양하게 활용 가능합니다.

장바구니에 물건을 담는 시스템에서도 유저가 A 물건을 장바구니에 넣고 다시 취소했다는 정보 또한 유의미한 정보가 될 수 있기에, 더 세부적인 데이터 적재가 가능합니다.

동일한 이벤트 로그로 뷰를 여러개 만들기

CQRS ( 명령과 조회의 분리 )를 사용하면 쓰기와 읽기를 분리시키면서 원하는 형태로 시스템을 확장시키거나 다양한 읽기 형태를 지원 할 수 있습니다. 동일한 데이터이지만 분석의 요소는 다양하게 만들어 질 수 있고, 그러기위한 뷰도 다양하게 필요 할 수 있습니다.

누군가는 시계열로, 누군가는 어떤 이벤트에 대한 종합을 볼 수 있죠.

동시성 제어

이렇게 쓰기와 읽기를 분리시키면 아직 데이터가 제대로 쓰여지지 않은 테이블에 읽기 작업이 수행 될 수 있습니다. 이 경우에는 동시성 문제가 발생 할 수 있는데, 작업을 동기식으로 변경 할 수 있고 분산 트랜잭션을 구현해서 사용 할 수 있습니다.

그럼에도 불변성의 한계는 있다.

불변성은 많은 시스템에서 스냅숏을 지원하거나 다중 버전 등을 지원하기 위해 의존하고 있지만, 그럼에도 분명한 한계가 있습니다.

잦은 데이터 삭제 및 갱신을 하는 서비스의 경우에는 모든 이벤트가 남기 때문에 부하가 매우 커질 가능성이 높습니다. 이러한 시스템에서 영구적으로 모든 변화를 남기는 것은 매우 어려운 일이며 지나치게 커지거나 파편화 문제가 발생 할 수 있습니다. 결과적으로 가비지 컬렉션이나 컴팩션 성능이 매우 저하되고 운영에 어려움을 겪을 수 있습니다.

뿐만아니라 그럼에도 삭제해야 하는 일이 있을 수 있습니다. 최근에 있었던 보안 문제로 인해 데이터를 삭제해야 했던 것 처럼 이벤트 소싱이든 불변성을 가진 데이터에서 보안 문제로 데이터를 제거해야 할 수 도 있습니다.

스트림 처리

스트림 처리로 할 수 있는 일은 다음과 같습니다.

  1. 이벤트에서 데이터를 꺼내 데이터베이스나 캐시, 검색 색인 또는 저장소 시스템에 적재합니다.
  2. 이메일 경고나 푸시 알림과 같이 사용자에게 직접 전송 할 수 있습니다.
  3. 하나 이상의 입력 스트림을 처리해서 하나 이상의 출력 스트림을 생산하여 여러 파이프라인을 구축 할 수 있습니다.

앞으로의 내용은 3번인 여러 파이프라인은 구축하는 것에 대한 이야기입니다만, 사실 이 이야기는 배치 처리에서 다룬 적이 있습니다. 예를 들면 맵리듀스에서 중간 파일을 이용한 이후 파이프라인을 파생시키는 방법과 비슷하다고 생각 할 수 있습니다.

하지만 결정적으로 배치 처리와 크게 다른 점은 끝나지 않는 데이터 입력이라는 점 입니다. 배치처리에서는 실패하면 다시 돌리면 되었습니다. 설령 중간에 색인이 변경되거나 입력되는 값이 추가되어도 처음부터 돌리면(조금 오래 걸려도) 문제는 해결 할 수 있었습니다. 그러나 스트림에서는 3년간 실행하던 입력을 처음부터 재입력한다는 것은 무리가 있습니다.

스트림 처리의 사용

스트림 처리는 다양한 용도로 사용되어 왔습니다. 그러한 용도 중 모니터링의 경우에는 주식 거래 시스템에서는 가격의 변화를 감지해서 거래를 실행하고, 사기 거래 시스템에는 사용 패턴에 어긋나는 거래를 막아주는 등의 동작등을 해왔습니다.

CEP ( 복잡한 이벤트 처리 )

간단히 요약하면 지속적으로 데이터를 관측하다가 특정 패턴에 맞는 데이터가 읽어지면 이벤트 처리하는 방식입니다.

스트림 분석

이벤트에 대한 처리뿐만 아니라 스트림 분석을 통해 특정 유형의 이벤트 빈도 측정이나 특정 기간에 걸친 이동 평균, 이전 시간 간격과 현재 통계값의 비교 등의 동작을 할 수 있습니다. 그리고 스트림 분석은 항상 누락이 있거나 부정확하다고 하는데, 대체로 이런 말은 최적화 기법으로 확률적 알고리즘을 사용하기 때문이고 잘 활용 할 수 있다고 합니다.

구체화 뷰

스트림은 캐시, 검색 색인, 데이터 웨어하우스와 같은 파생 시스템이 원본 데이터의 최신 정보를 따라가게 하도록 사용 할 수 있는데, 이러한 예시가 구체화 뷰를 유지하는 것이라고 합니다. 질의의 효율성을 높이고 기반 데이터가 변경되면 뷰를 갱신합니다.

이벤트 소싱을 사용하는 애플리케이션 상태는 자체가 구체화 뷰라고 볼 수 있습니다. 여기서 구체적 뷰를 만드려면 잠재적으로 임의의 시간 범위에 발생한 모든 이벤트가 필요합니다.

스트림 상에서 검색하기

CEP 외에도 검색 질의와 같이 복잡한 기준을 기반으로 개별 이벤트를 검색해야 하는 경우도 있습니다. 아마 이러한 기능을 사용하게 될 프로덕트는 엘라스틱서치라는 생각이 들어서 적어보자면 엘라스틱 서치의 여과 기능이 이런 스트림 검색을 사용가능한 선택지라고 합니다.

일반적인 검색 엔진은 문서를 색인 한 후 질의를 실행하여 해당하는 정보를 얻어오는 동작을 수행하지만, 스트림 검색은 질의 자체를 먼저 저장해서 CEP처럼 질의를 지나가면서 검색한다고 합니다.

시간에 대한 추론

스트림 처리자는 종종 시간을 다뤄야 한다고 합니다. N분간의 분석과 같은 서비스는 전체 데이터를 읽는 배치보다 스트림 처리의 장점이 돋보이는데 이런 서비스는 상당히 구축이 어려운 부분이 있습니다.

일단 N분 이라는 지표 자체는 시간입니다. 하지만 이전의 포스트를 보더라도 우리는 시간이 얼마나 불안정한지 알고 있습니다. 최근 실무에서 사용 한 로그 수집기의 경우에도 국가 별 시간이 다르기에 네트워크를 통한 데이터 도달 시간을 예측하기 너무 어렵기(혹은 유실 가능성이 높기) 때문에 서버 기기 시간에 의존하지 않는 방향으로 설계했었는데, 스트림에서는 실질적인 이벤트와 관련이 있기 때문에 더 민감합니다.

대체로 국내 서비스의 경우 약간의 시간 차이는 충분히 무시 할 수 있는 수준이지만, 서버의 문제나 많은 통신을 통한 지연이 발생하면 말이 다릅니다.

그러면 어떤 시간을 쓸 것인가

스트림 처리의 특성 상 데이터는 지속적으로 들어오는 상태임을 가정하고 우리가 원하는 N분 마다 서비스를 제공하는 방식으로 동작한다고 했을 때 N분이 지연되어 다음 처리 시간에 도착 한 경우 어떻게 대처 할 것인지도 중요합니다.

가장 간단한 방법은 무시가 있습니다. 굳이 이전 처리 내용이 갱신 될 필요가 없는 서비스는 무시하면 충분합니다. 아니라면 이전 결과에 대해 업데이트를 할 수 있습니다. 대체로 모니터링은 추세를 확인하기 때문에 이런 지연에 대해서 무시를 하여 지연 자체를 감지 할 수 있고, 서비스 자체에 영향을 끼치는 모니터링이라면 업데이트를 통해 정상적인 서비스를 우선시 할 수 있습니다.

어떤 시간을 쓸 것인가는 사실 매우 어려운 문제입니다. 그래서 저자가 추천하는 방법은 다음의 정보를 모두 수집하는 것입니다.

  1. 이벤트가 발생 한 시간
    • 유저가 이벤트를 발생시킨 시간
  2. 이벤트를 서버로 보내는 시간
    • 네트워크 혹은 어떠한 방법으로 서버로 보내는 시점의 시간
  3. 서버에서 이벤트를 받은 시간
    • 서버에 이벤트가 도착 한 시점의 시간

이 3가지 시간 데이터를 모두 수집한다면 얻을 수 있는 큰 장점 중 하나는 시간간의 차이를 통해 정보를 얻을 수 있다는 점입니다.

서버 입장에서는 이벤트가 도착했을 때 이 3가지 시간이 모두 무시 할 만한 수준에 포함되어 있다면, 우리는 “이 데이터는 처리하면 되겠다!”라는 생각을 할 수 있고, 무시 할 수준이 아닌 차이라면 “우리 서버에 부하가 큰가?” 혹은 “사용자의 네트워크 상황이 안좋거나 무슨 문제가 있구나!”라는 판단으로 서비스를 거절 할 수 있습니다. 혹은 아예 거절이라기보다 해당 사용자의 이전 이후 요청이 없다는 조건하에 해당 이벤트를 처리 할 수도 있죠.

윈도우 유형

우리가 원하는 정보는 N분의 데이터를 이용 한 특정한 서비스를 제공하는 것입니다. 그러면 여기서 N분을 어떻게 판단할 것인지도 중요한 포인트입니다. 여기서 일반적으로 윈도우 방식을 사용하고 있습니다.

대부분의 윈도우는 다음과 같은 유형이 있습니다.

  1. 텀블링 윈도우
    • 고정 시간으로 N분 마다 구분 된 데이터를 처리합니다.
    • EX ) ( N = 5 ) 0~5분 / 5~10분 / 10~15분 …
  2. 홉핑 윈도우
    • 고정 시간으로 N분 마다 M분으로 구분 된 데이터를 처리합니다.
    • EX ) ( N = 1, M = 5 ) 0~5분 / 6~7분 / 7~8분 …
  3. 슬라이딩 윈도우
    • 유동적인 시간으로 N분의 데이터를 유지하면서 데이터를 처리합니다.
    • 고정 시간과는 다르게 지속적으로 변화하기 때문에 5분 내의 차이가 있는 데이터는 모두 같은 윈도우로 표현됩니다.
  4. 세션 윈도우
    • 시간의 기준이 아닌 세션의 기준으로 모든 데이터를 처리합니다.
    • EX ) 홈쇼핑의 접속 유저의 액션에 대한 처리 ( 살펴 본 상품에 대한 처리 등 )


      스트림 조인

배치 처리의 경우에는 조인 시 주요한 논점은 어떻게 대용량의 데이터를 효과적으로 조인 할 것인가 였습니다. 여기서는 맵사이드 조인을 통해 상대적으로 작은 크기를 메모리에 올리거나 브로드캐스팅 방식등을 이용해서 조인을 하거나, 리듀스 전 단계까지 별도로 얻어내서 리듀스 사이드 조인을 하는 방법을 공부했습니다.

하지만 스트림 조인은 상대적으로 적은 데이터지만 연속적으로 입력되는 데이터의 특징으로 인해 언제든 새로운 이벤트가 발생 할 수 있다는 어려운 부분이 있습니다.

스트림 스트림(윈도우) 조인

이 조인 방법은 스트림 데이터와 스트림 데이터를 조인하는 방법으로 생각하면 됩니다. 하지만 스트림 데이터는 연속적으로 입력되는 데이터 임을 알고 있기 때문에 어떤 이벤트를 사용 할 것인지 명확하게 정해야 할 필요성이 있습니다.

가장 명확한 기준은 윈도우를 사용하는 방법입니다. 책에 나온 예제가 너무 좋아서 동일하게 쇼핑 앱에서 CRT을 제공하는 서비스를 사용해보겠습니다.

유저가 검색 후 클릭(전환)하는 이벤트를 연속적으로 알아보고 싶을 때 두 이벤트의 발생 간격이 매우 길 수 있음에 주목했습니다. 3일 전에 검색을 하고 오늘 클릭을 한다고 가정했을 때, 원하는 정보를 알기 위해 윈도우를 무제한으로 늘리기에는 해당 유저가 다시는 사용하지 않을 수 있기에 위험 한 방법입니다. 이럴 때 유저의 세션ID를 기반으로 색인을 만들어놓고 이벤트가 들어 올 때마다 같은 세션 ID로 들어 온 이벤트를 확인하게 된다면 문제를 해결 할 수 있습니다.

간단하게 말하면 세션 윈도우로 구축 된 색인실시간 스트림 처리를 동시에 사용하여 두 데이터를 조인하는 방법입니다.

스트림 테이블 조인

이 조인 방법은 스트림 처리와 데이터베이스를 조인하는 방법입니다. 하지만, 스트림 처리는 자주 발생하는 이벤트이기 때문에 매번 이벤트 발생 시 마다 데이터베이스를 읽어오는 (주로 네트워크를 통해) 방법은 부하가 클 수 있습니다.

그렇기에 최신화가 가능하면서 가능한 안전성 확보를 위해 스냅샷 격리를 수행하는 데이터베이스를 동일한 서버에 복제해놓고 지속적인 업데이트를 하면서 사용하는 방법을 권장합니다. 혹은 순수하게 디스크를 이용해도 괜찮습니다.

테이블 테이블 조인

스트림 처리를 통해 실시간(혹은 마이크로 배치) 업데이트 되는 데이터베이스 테이블을 다른 테이블과 조인하는 방법도 있습니다.

내결함성

배치 처리의 내결함성의 해결 방안과는 다르게 스트림 처리의 내결함성을 유지하는 방법은 직관적이지 않을 수 있습니다. 배치처리는 특정 시점의 데이터를 기반으로 동일한 프로세스를 다시 실행하면 언제든 동일한 결과를 얻을 수 있지만, 스트림의 경우에는 끝 시점이 모호하기 때문에 어려 울 수 있습니다.

마이크로 일괄 처리와 체크포인트

전형적으로 스파크 스트리밍이 사용하는 방법은 1초 가량의 짧은 데이터를 일괄 처리함으로써 스트림 처리와 거의 동일하게 처리하는 방법입니다. 이 경우 일반적인 일괄 처리와 비슷하게 텀블링 윈도우를 지원하기 때문에 원하는 시점의 데이터를 배치 처리와 같이 재실행하는 것으로 내결함성을 유지 할 수 있습니다.

혹은 Flink에서 사용하는 방법으로 체크 포인트를 주기적으로 지속성 있는 저장소에 저장하면서 재실행 시 체크포인트부터 에러 발생 지점까지 데이터를 삭제하고 해당 체크포인트부터 다시 실행 할 수 있습니다.

참고

http://egloos.zum.com/killins/v/3025514

https://tigercoin.tistory.com/331

https://sabarada.tistory.com/231

https://always-kimkim.tistory.com/entry/kafka101-consumer-rebalance