이 글은 Martin Kleppmann의 데이터 중심 애플리케이션 설계를 읽고 기억하고자 적는 게시글입니다.
10. 일괄 처리
데이터를 처리하는 방법은 크게 3가지로 나눌 수 있습니다.
- 서비스 ( 온라인 시스템 )
- 고객의 요청에 대해서 빠른 응답을 주기 위한 데이터 처리 방식입니다. 예를들면 배달의 민족 애플리케이션의 응답 등이 있습니다.
- 가능한 빠른 응답이 필요합니다.
- 일괄 처리 시스템 ( 오프라인 시스템 )
- 매우 큰 입력을 받아 처리하는 방식으로 주로 반복적인 스케줄링으로 움직입니다
- 주요한 지표로 처리량을 사용하고, 이 뜻은 어느 정도의 양의 데이터를 처리하는데 사용 된 시간을 나타냅니다.
- 스트림 처리 시스템 ( 준실시간 시스템 )
- 스트림 처리는 일괄 처리 시스템의 방식과 유사하게 요청이 아닌 입력 값을 기반으로 동작합니다.
- 일괄 처리와 차이점으로 데이터를 입력 받는 순간 처리가 시작됩니다.
일괄 처리는 일반적으로 가장 안전한 방식의 데이터 처리라고 볼 수 있습니다. 실제로 회사에서 대량의 데이터를 다루는데에 있어서 필수적이기 때문에 자주 사용하는 방식 중 하나 입니다.
유닉스 도구로 일괄처리
간단한 예시와 유닉스 도구만으로도 좋은 일괄처리가 된다고 말하고 있지만, 기존에도 사용한 경험이 많기 때문에 유용하다라는 사실만 알고 좋은 명령어에 대한 내용만을 적어보겠습니다.
- awk - 조건자로 문장을 분할하여 특정 필드($n)를 출력 할 수 있습니다.
- sort - 특정 조건에 맞는 정보를 알파벳 순으로 정렬합니다.
- uniq - 인접한 두 줄이 같은지를 판별하고 중복을 제거합니다.
- sed - 원본 파일에 영향을 주지 않고 라인 단위로 데이터를 출력하는 명령어 입니다.
사용하면 단일 프로세스 내에서 매우 빠른 결과를 출력해주는 것을 볼 수 있습니다. 개인적으로 쉘 스크립트는 거의 모든 언어 중 간단한 작업에 대해서는 가장 빠른게 아닌가 조심스럽게 말해봅니다.
하지만 이해하기도 어렵습니다. 특히 명령어 파이프가 길어지면 그보다 이해하기 어려운 라인은 없을 것처럼 어렵습니다.
유닉스의 철학
유닉스를 사용하는 개발자와 사용자 사이에서 가지고 있는 철학은 다음과 같습니다.
- 각 프로그램은 한가지의 일만 한다.
- 모든 프로그램의 출력은 다른 프로그램의 입력으로 쓰일 수 있다고 생각한다.
- 소프트웨어를 빠르게 써볼 수 있게 설계하고 구축한다.
- 프로그래밍 작업을 줄이려면 도구를 써라 ?
여기서 개인적으로 가장 유닉스스러운 철학은 1번과 2번이라고 생각합니다. 앞으로 Hadoop에 대해서 알아 볼 텐데 그때 이러한 철학을 어느정도 보존한 모습을 볼 수 있습니다.
Hadoop
하둡에 관련한 내용은 별도의 포스트를 참고해주시면 될 것 같습니다.
아까전의 유닉스 도구를 이용한 일괄처리가 단일 노드였다면, 다중 노드로 일괄 처리를 하는 도구를 알아봐야 합니다. 지금은 Spark, Flink등 다양한 도구의 발전으로 인해 핵심으로 사용되지는 않지만 ( 아마 HIVE도 Spark 위에 띄운다면 ) 기본적으로 분산 처리라면 맵리듀스를 떠올릴 것 같습니다.
위의 그림을 보면 Map-Reduce의 동작원리를 볼 수 있습니다. 이러한 동작원리은 1가지의 일을 해결하기 위한 Task임과 mapping, shuffle, reduce와 같은 모든 동작은 이전의 결과를 인풋으로 받음을 이해하면 이 과정이 유닉스 파이프와 유사함을 알 수 있습니다. 이러한 이유로 저자는 유닉스 파이프라인과 맵리듀스를 함께 말하는 듯 합니다.
하지만 Map-Reduce는 적어도 2가지는 확실하게 설계를 해야합니다. 바로 Splitting된 데이터 중 키-값 형식으로 변경해주는 Mapper와 정렬 된 키-값 형식으로 변경된 데이터를 기반으로 어떤 연산을 수행 할 것인지(어떤 값을 리턴 할 것인지)를 정하는 Reducer입니다.
전체 데이터를 나누고, 모아서(map), 연산하기 좋게 정렬한뒤(sort, shuffle), 연산한다(reduce)가 Map-Reduce의 동작원리 입니다.
여기서 데이터 셋이 클 경우에 일반적인 정렬 알고리즘으로 한 장비에서 모두 정렬하기에는 어렵기 때문에 키의 해시값을 이용해서 파티셔닝을 하고 그 정보를 매퍼의 로컬디스크에 기록합니다. 기록이 완료되면 리듀서에게 완료되었음을 알리고, 리듀서는 담당하는 파티션에 해당하는 키-값 쌍의 파일을 다운받아 이어서 작업을 진행합니다. 이것이 셔플입니다.
Map-Reduce 작업을 한번 수행해서 해결할 수 있는 문제는 상당히 제한적이고, 더 상세한 문제를 해결하기 위해서는 여러 개의 워크플로를 이어야합니다. 마치 유닉스 연산자를 파이프라인으로 엮어 하나의 Input데이터를 연산한 결과를 연이어 연산하는 것과 같지만, 문제는 Map-Reduce는 파일을 기반으로 쓰고 읽어서 연산을 처리하는 방식입니다. 이 경우는 유닉스처럼 이어서한다기보다 쓰고 읽기를 반복한다고 말할 수 있죠.
리듀스 사이드 조인과 그룹화
데이터 셋은 관계를 갖는 경우가 많습니다. RDB는 외래키, Document기반 DB는 문서참조, 그래프 모델에서는 간선이 그런 관계입니다. 이런 관계는 조인의 사용을 발생시키고, 아무리 비정규화를 해도 아예 없애기는 쉽지 않습니다. 인덱스를 사용하면 어느정도는 줄일 수 있지만, Map-Reduce는 적어도 그런 색인 개념이 명확하지 않아서 좀 어려운 문제입니다.
맵리듀스에서 입력 파일 전체 내용을 읽어서 사용하는 방식을 full tabel scan이라고 부릅니다. 많은 레코드의 집계연산에는 유리하지만 부분적인 데이터를 사용 할 때는 유용하다고 보기 어렵습니다.
책에서는 일괄 처리 맥락에서 조인은 데이터 셋내의 모든 연관 관계를 다룬다는 뜻이고 특정 사용자의 데이터만 찾는다면 색인을 사용하는 편이 효율적이라고 말합니다. 곧 하게 될 MariaDB, MongoDB, Elasticsearch의 비교 리포트를 적는데 좋은 정보로 사용 될 것 같습니다.
맵리듀스에서의 조인
책에서는 이벤트 로그와 유저 데이터베이스를 조인해서 데이터를 얻는 방법을 예제로 보여줍니다. 정확히 제가 해야 할 일이네요.
이벤트 로그에는 누가 했는지가 포함되어 있으며, 유저의 정보를 담는 데이터베이스는 누구의 정보인지가 포함되어 있기 때문에 조인을 수행해서 누가 어떤 이벤트를 발생 시켰는지를 알 수 있습니다.
이러한 연산을 할 때 모든 이벤트에 대해서 누구의 정보인지를 붙이는건 상당히 큰 성능 낭비를 발생 시키기 때문에, 사용자 데이터베이스를 이벤트가 저장 된 스토리지에 저장하는 것이 좋다고 합니다. 이유는 같은 장비에서 연산을 하기 위해서입니다.
“일괄 처리에서 처리량을 높이기 위해서는 가능한 한 장비에서 연산을 수행해야 한다”는 저자의 말에 따라 이 경우 연산 시간을 확실히 줄일 수 있습니다.
Map-Reduce를 사용해서 이벤트를 ID 기반으로 키-값 형식으로 변환하고 유저 정보또한 모아서 두 정보를 키-값으로 묶어서 Reduce하는 듯합니다.
위에서 데이터를 Reduce 하기 전에 데이터를 셔플링하는 과정에서 어떤 키를 기준으로 그룹화 하고 그 키를 따로 비교해서 정렬하는 방식인 보조 정렬을 사용 할 수 있습니다 이 문제에서는 유저의 ID 키를 그룹화해서 이벤트 시간을 비교하여 정렬 할 수 있죠. 이렇게 여러 데이터를 한 장비에서 연산하여 리듀서가 한 레코드에 대해 한 번 연산하도록 하는 방법이 정렬 병합 조인입니다.
그룹화도 조인과 비슷하게 그룹화 하고자 하는 값을 키로 선정하면 결국 같은 파티션에 정렬 된 상태로 모이게 되고 결과적으로 그룹화 할 수 있습니다.
쏠림 현상
서비스 측면에서의 엔지니어링 경험이 없기 때문에, 상대적으로 겪어보지 못한 문제인 쏠림에 대해서도 이 책에서는 말하고 있습니다.
특정 인기인의 인스타 게시글에 대한 반응, 특정 유튜버의 실시간 반응에 대한 값등은 일반적으로 매우 큰 데이터 부하를 갖습니다. 이 경우에도 일반적으로 유저의 ID가 키로 사용되기에 이 문제는 곧 특정 ID에 대한 비정상적인 부하로 볼 수 있습니다.
인스타를 사용하는 수 천만명 중 이러한 부하를 가진 사람은 기껏해야 몇 만정도이기에 전체에 비하면 부분적이죠. 이런 키를 가리켜 핫 키라고 부릅니다. 그리고 키에 따른 리듀서를 사용해서 핫 키와 대응되는 파티션을 처리하는 리듀싱하는 것을 핫스팟이라고 합니다.
이런 핫스팟 문제를 해결하기 위해 몇 가지 방법을 사용 할 수 있는데, 첫번째 방법인 쏠린 조인 메서드는 샘플링을 통해 어떤 키가 핫 키인지 알아내고, 임의의 리듀서에게 핫 키 레코드를 보낸 뒤 조인 할 입력을 핫키가 전송 된 리듀서에게 모두 복제하는 방식이 있습니다. 이 경우에는 조인 할 입력을 복제하면서 비용이 들지만, 병렬화 효과를 최대화 하는 장점이 있습니다.
두번째 Hive에서는 메타데이터에 핫 키를 지정하고, 관련된 레코드를 별도 파일에 저장합니다. 해당 레코드를 임의의 리듀서로 보내고 각 리듀서는 전달받은 핫 키 레코드의 일부를 그룹화 하고 키 별로 집계하여 간소화 한 값을 출력합니다. 마지막으로 이전 단계에서 나온 키 별 값들을 결합하여 하나의 값으로 만드는 방법이 있습니다. 이 방법은 Hive에서 사용하는 방법이라고 하네요
맵 사이드 조인
이전의 조인 방식은 리듀스 사이드 조인으로 매퍼의 결과 값을 기반으로 리듀서로 복제한 데이터를 이용하여 조인을 수행하는 방식입니다. 이 경우 각기 다른 매퍼를 구현해야 하지만, 유연한 데이터 처리와 대용량 데이터도 분산 환경에서 효과적으로 처리 할 수 있습니다.
하지만 입력 데이터에 대해 가정 할 수 있다면 맵 사이드 조인을 고려하는 것도 좋습니다.
브로드캐스트 해시 조인
브로드캐스트 해시 조인은 메모리에 조인 할 (적은 양의 경우만)데이터를 해시테이블에 캐싱해놓는 방식입니다. 그러면 대상이 되는 키를 읽으면서 해시테이블을 통해 간단히 조인 할 수 있습니다.
임팔라나 하이브에서 이런 방식을 사용하며, 로컬디스크에 읽기 전용 색인으로 저장해서 사용한다고 합니다. mapper를 수행하는 서버의 형태는 아래와 같이 세팅되겠네요.
broadcast_join.png
파티션 해시 조인 ( 하이브의 버킷 맵 조인 )
위에서 사용한 해시 테이블을 이용한 캐싱을 파티션 단위로 바꾸면 파티션 해시 조인이 됩니다. 이 경우에는 파티션 단위로 조회 할 데이터가 선택적으로 바뀌기 때문에 조금 더 빠르게 문제를 해결 할 수 있습니다.
예를들면 십진수로 끝나는 마지막 값이 3인 유저 데이터만을 해시 테이블로 가져오고 이벤트 데이터를 읽을 때 마찬가지로 마지막 값이 3인 유저의 이벤트만을 가져오면 더욱 메모리를 아끼면서 조인을 수행 할 수 있습니다.
맵 사이드 병합 조인
맵 사이드 병합 조인에서의 조건은 키를 기준으로 정렬이 되어 있다면입니다. 키를 기준으로 정렬이 되어 있다면 우리는 마치 병합정렬을 할 때처럼 양쪽의 데이터를 오름차순으로 읽어나가면서 같은 키를 모아서 사용 할 수 있습니다.
이건 마치 리듀스가 수행하는 것과 비슷하게 보입니다.
맵 사이드 조인을 사용하는 맵리듀스 워크플로
리듀스 사이드 조인의 경우에는 조인을 하려는 키를 기반으로 파티셔닝되고 정렬되어 그 값을 이용해서 조인이 이루어집니다.
맵 사이드 조인의 경우, 상대적으로 큰 조인의 입력( 이벤트 데이터 )의 블록(데이터 노드 내)마다 맵 태스크가 동작합니다.
맵 사이드 조인을 사용 할 때는 상대적으로 적은 크기의 데이터( 브로드캐스트에서 인메모리에 담길만한)의 크기와 정렬( 맵 사이드 병합의 전제조건 )여부와 입력 데이터의 파티셔닝( 파티션 해시 조인의 기준 )이라는 제약이 붙습니다.
따라서 조인을 수행 할 때는 분산 파일 시스템 내의 물리적 레이아웃을 파악하는 것은 중요하다고 합니다. 파티션 수와 어떤 키를 기준으로 파티셔닝 되는지, 정렬 되어 있는지를 알아보라고 합니다.
일괄 처리 워크플로의 출력
데이터베이스의 질의의 경우 트랜잭션 처리(OLTP)와 분석 목적(일반적으로 OLAP)로 나눕니다. 일반적으로 OLTP는 색인을 사용해 사용자에게 소량의 레코드만 특정 키로 조회하는 것에 자주 사용되는 말입니다.
분석 목적은 대량의 레코드를 스캔해 그룹화 및 집계 연산을 수행하는 것에 사용되고 그 결과는 주로 보고서로 나오게 됩니다.
그 중에서도 일괄처리는 일반적으로 분석 목적에 더 맞는 경우가 많습니다.
검색 엔진
주로 검색 엔진의 경우에는 일괄 처리와 잘 어울립니다. 특히 루씬 색인을 구축하는데에는 맵리듀스를 사용하는게 좋은 방법 중 하나라고 합니다.
루씬 전문 색인의 경우에는 문서 기반 파티셔닝을 기반으로 특정 키워드를 조회해서 해당 키워드가 포함 된 문서의 ID 목록을 찾아서 관련성을 순위 매기는 방식으로 동작합니다.
만약 특정 조건의 문서 집합에서 검색을 한다면 매퍼가 해당 문서 집합을 파티셔닝하고 각 리듀서가 해당 파티션에 대한 색인을 구축하여 분산 파일 시스템에 저장하면 됩니다.
색인 문서 집합이 바뀐다면 전체 문서 집합을 대상으로 주기적으로 전체 색인 워크플로를 재수행하고 수행이 끝나면 이전 색인 파일을 새로 생성된 색인으로 바꾸는게 직관적이라고 합니다.
다른 방법으로는 증분 색인을 구축하는 것도 가능한데, 루씬은 세그먼트 파일을 새로 기록하고 백그라운드에서 증분식으로 부분 파일을 병합하고 압축한다고 합니다.
일괄 처리의 출력으로 키-값 저장
많은 이야기를 하고 있지만 요약해보면 추천 시스템이나 머신러닝과 같은 서비스 측면에서의 일괄 처리는 키-값의 읽기 전용 파일로 출력해서 기존 파일을 변경보다 새로운 파일을 생성 후 참조하는 방식이 좋다고 말하는 듯 합니다.
일단 추천 시스템의 경우에는 다음과 같은 정보가 출력 될 것입니다.
- A 유저에게 15, 24, 39, 852, 2135번 상품이 추천된다.
- B 유저에게 52, 62, 42, 199, 583번 상품이 추천된다.
그리고 이러한 정보를 모으면 결국 하나의 데이터베이스와 같은 형태가 됩니다. 이러한 정보는 유저의 액션(메인 페이지에 접근)에 맞춰서 요청하고 그 당시의 추천 목록이 출력되게 됩니다.
그러면 다음과 같은 질문이 나올 수 있습니다.
“유저 단위로 분석한 정보가 맵리듀스에서 출력되면 안되나요?”
위험한 생각이 될 수 있는게, 일반적인 일괄 처리의 처리량에 비해 매우 느린 성능을 보일 것 이고, 데이터베이스의 과부화가 발생할 확률이 높습니다.
효율적인 키-값 데이터베이스가 존재하고 맵리듀스 또한 키-값을 기반으로 데이터를 처리하기 때문에 이러한 방식으로 새로운 데이터 파일을 만들어내고, 만들어내는 동안은 이전 데이터 파일을 기반으로 서비스를 지속하는 방향으로 보통 추천 시스템을 만들어나가고 있습니다.
일괄 처리 출력에 대한 철학
맵리듀스와 유닉스는 프로그램이 입력을 읽어 출력을 내놓는다는 철학이 동일합니다. 매퍼의 (셔플 된) 데이터를 리듀서가 읽는 것처럼, 혹은 유닉스가 파이프에 맞추어 이전 결과를 입력으로 사용하는 것과 같은 것이 하나의 철학이라고 합니다.
다음과 같은 출력을 지원합니다.
- 코드에 버그가 있어서 출력이 오염되면 이전 버전의 코드로 재수행합니다. 이러한 방식을 인적 내결함성이라고 합니다.
- 쉽게 되돌릴 수 있는 속성을 갖습니다. 비가역성 최소화라고 합니다.
- 맵이나 리듀스의 태스크가 실패하면 재실행하여 일시적 문제의 경우 올바른 동작이 수행되도록 합니다.
- 입출력 디렉토리를 설정 할 수 있어서 역할을 분리 할 수 있습니다.
맵리듀스를 넘어
맵리듀스는 좋은 데이터 처리 방식을 사용한다고 생각하지만, 그럼에도 아쉬운 부분들이 여럿 있습니다. 이런 점을 한 번 알아보겠습니다.
중간 상태
여기서 중간 상태라는건 각 작업이 끝난 뒤 결과를 디스크에 저장(구체화) 해 놓는 파일입니다.
맵리듀스의 모든 작업은 다른 작업과 독립적입니다. 그렇기 때문에 첫 번째 작업의 결과를 두 번째 작업의 입력으로 사용하기 위해서는 두 경로를 일치시켜줘야 합니다.
이러한 점은 장점으로 활용 될 수 있는데, 바로 첫 번째 작업의 출력이 여러 작업의 입력 값으로 사용된다면 경로만 맞추어 놓는다면 같은 중간 상태 파일을 가지고 연산을 이어나갈 수 있습니다.
중간 상태를 저장하는 방법은 매우 효과적으로 보일 수 있지만, 몇 가지 단점이 있습니다.
- 이전 작업이 모두 끝나야 다음 작업을 실행 할 수 있습니다. 입력 값으로 디스크에 있는 파일을 읽기 때문에 이전 작업의 모든 결과가 디스크에 저장되어 있어야만 하기 때문입니다. 이에비해 유닉스 파이프는 인메모리 방식을 이용해서 출력이 나올 때 즉시 다음 작업의 입력으로 사용됩니다.
- 매퍼가 이전 작업의 리듀서의 결과와 중복 된 일을 할 수 있습니다. 예를들면 매퍼가 이전 작업의 리듀서 결과와 동일한 키로 데이터를 모아준다면 결국 매퍼의 사용 의미가 없습니다. ( 이 경우에는 매퍼 단계를 끼워넣지 않을 수 있다고 합니다. )
- 중간 상태 파일 또한 복제의 대상이 되어 임시데이터의 과잉 조치로 생각됩니다.
데이터플로 엔진 ( Spark, Flink … )
데이터플로 엔진은 위에서 알아 본 맵리듀스의 중간 상태에 대한 문제점을 해결하면서 더 효율적인 문제 해결을 하는 엔진들입니다. 예를 들면 Spark, Flink 등이 이러한 엔진에 속합니다.
데이터플로 엔진도 맵리듀스처럼 어떠한 사용자 함수를 각 레코드마다 별도로 동작하지만, 입력을 파티셔닝하여 병렬화하고 각 함수의 출력을 다른 함수의 입력으로 사용하기 위해 네트워크로 복사합니다. 결국 이전 작업인 map이 모두 끝나서 셔플 된 상태로 디스크에 저장되어야만 reduce가 동작하는 방식과 달리, 각각의 출력 자체가 네트워크를 통해 입력으로 들어오고 그 값을 파티셔닝해서 병렬화 된 상태에서의 작업을 개별적으로 수행합니다.
그러면 우리는 map과 reduce라는 일련의 프로세스를 적용 할 필요없이 다음 작업이 어떤 작업인지만 알고 연산을 이어 갈 수 있습니다. 어차피 각 데이터는 연이어 어떠한 연산을 수행 할 것이고, 이게 map인지 reduce인지 큰 상관이 없기 때문입니다.
데이터플로 엔진에서 이러한 연산을 연산자(Operator)라고 합니다. 연산을 이어나가기 위해서는 연산자의 출력과 또 다른 연산자의 입력을 연결해야 하는데, 이러한 방식은 여러 가지 선택지가 있습니다.
- 레코드를 키로 재파티셔닝하여 정렬하는 방법
- 맵리듀스의 셔플과 비슷하여 정렬 병합 조인과 그룹화가 가능
- 파티셔닝을 하지만 정렬하지 않는 방법
- 어차피 파티션 해시 조인을 하면 파티션 단위로 나뉘고 정렬이 의미가 없는데, 이 경우 별도의 작업 없이 파티션 해시 조인을 수행 할 수 있음
- 브로드캐스트 해시 조인시 연산자의 출력을 모든 파티션으로 전달
이러한 데이터플로 엔진은 맵리듀스와 비교하면 몇 가지 장점이 있습니다.
- 정렬과 같은 비싼 연산은 필요 할 때만 수행
- 필요없는 맵 태스크가 없다. 맵은 파티션이 변하지 않기 때문에 연산자로 수행 가능
- 워크플로에 모든 조인과 의존 관계가 명시적으로 선언되어 있어서 지역성 최적화가 가능
- 예를들어 데이터 생산 태스크와 소비 태스크를 같은 장비에 배치해서 네트워크 IO 절약 가능
- 중간 상태를 매 작업마다 복제 파일로 생성하는게 아닌 최신의 결과를 읽어오고 결과 테이블을 업데이트 하는 방식으로 이전 중간 상태를 제거하기 때문에 효과적입니다.
- 각 연산자들은 입력이 준비되면 즉시 실행 할 수 있습니다. 즉 이전의 중간 단계가 모두 끝나기를 기다리지 않아도 됩니다.
- 새로운 연산자를 실행 할 때 이미 사용되는 JVM을 재활용 할 수 있습니다. 그에비해 맵리듀스는 각 태스크마다 JVM을 구동해야 합니다.
데이터플로 엔진을 적용 할 때 연산자가 맵과 리듀스를 일반화 한 것이라 간단한 설정 만으로도 스파크나 테즈로 전환 할 수 있습니다.
내결함성
맵리듀스는 중간 상태를 HDFS(디스크)에 남기면서 내결함성을 보장했습니다. 실패하면 저장 된 중간 상태를 이용하여 다시 연산을 수행하면 됩니다. 문제는 데이터플로 엔진은 이러한 중간 상태를 디스크에 모든 중간 상태를 남기지 않기 때문에 다른 방식을 사용합니다.
데이터플로 엔진은 장애가 발생하여 연산에 실패하면 아직 유효한 데이터부터 다시 연산을 수행하고, 만약 이 마저도 모두 잃었다면 HDFS내 원본 데이터를 이용하여 연산을 다시 수행(재연산)합니다.
그런데 재연산을 하려면 결국 어떻게 연산이 되는지 추적이 되어야 합니다. 마찬가지로 어떤 파티션을 사용했는지도 알아야합니다. Spark에서는 이러한 재연산을 위해 RDD 추상화를 사용하여 연산의 순서와 파티션 정보를 추적합니다. 비슷하게 플링크도 연산자 상태에 체크포인트를 남겨서 실패 한 연산자를 재개 할 수 있습니다.
여기서 중요한 점은 “해당 연산이 결정적인가?” 파악하는 점 입니다. 결정적이라는 말은 “같은 입력 데이터가 주어졌을 때 연산자들이 항상 동일한 출력을 생산하는가?”로써 만약 다운스트림(다음 연산)으로 데이터를 보냈을 때 일부가 유실되었다면 이 질문은 매우 중요합니다.
만약 결정적이지 않은 연산이라면 재실행 할 경우 다운스트림 연산자들이 이전의 데이터와 새로 얻게 된 결과 사이의 모순을 가지기 때문에 아예 다운스트림 연산들을 죽이고 신규 데이터를 기준으로 다시 수행하는 것이 좋습니다.
이렇듯 모순을 해결하기 위해서 가능한 연산자를 결정적으로 만드는 것이 좋지만 비결정적인 연산이 생기는 상황이 많습니다. 예를 들면 다음과 같은 상황이 있습니다.
- 해시값을 사용하고 원소를 탐색 할 때
- 확률 통계 알고리즘이 임의의 값에 의존 할 때
- ML/DL의 랜덤 초기 값
- Clustering의 랜덤 초기 위치
- 시스템 시계 사용
- 외부 데이터 출처
- third-party의 API 등
이러한 문제를 해결 할 때는 특정한 시드 넘버를 사용하거나 혹은 initialize 함수를 사용하여 가능한 결정적으로 만드는게 좋습니다.
구체화에 대한 논의
데이터플로 엔진은 유닉스 파이프와 비슷하게 동작합니다. 플링크는 연산자의 출력을 다른 연산자에게 전달하고 이렇게 입력을 처리하기 전에 이전 입력이 완료되기를 기다리지 않습니다.
정렬 연산자는 예외가 될 수 있는게, 입력의 순서가 달라질 수 있기 때문에 전체 입력을 소비하여 일시적이라도 상태를 누적(중간 단계를 저장)해야 합니다. 하지만 다른 부분은 파이프라인 방식을 사용해서 해결 가능 합니다.
데이터플로 엔진도 결국 결과를 남깁니다. 하지만 맵리듀스처럼 모든 중간 단계를 남기는 것이 아닌 입력 데이터와 최종 결과 데이터만을 남기기 때문에 맵리듀스가 가진 디스크IO의 문제를 어느정도 해결 할 수 있습니다.
그래프와 반복 처리
그래프 형 모델은 한 번에 하나의 간선을 순회하는 방식으로 특정 정보를 전달하기 위해 인접한 정점을 조인하면서 특정 조건을 만족할 때 까지 반복합니다. 이런 알고리즘을 이행적 폐쇄( 도달 할 수 있는지 없는지를 표현하는 방식 )라고 합니다.
이러한 동작은 맵리듀스로 동작시키면 각 단계를 일괄처리하게 되는데 그래프에 약간의 변경만 있어도 동일하게 전부 일괄처리하게 됩니다.
프리글 처리 모델
맵리듀스와 마찬가지로 한 정점에서 다른 정점으로 메시지를 보내지만, 프리글의 경우에는 정점이 반복에서 사용 한 메모리 상태를 기억하고 있어서 새로 들어온 메시지만 처리하고 아무 메시지도 받지 않은 그래프는 아무일도 하지 않습니다. 또한 정점 사이의 메시지는 내결함성과 지속성이 있으며 메시지 통신은 고정 된 횟수 안에 처리됩니다.
사실 그래프 형 모델을 사용 할 일이 없어서 크게 와닿지는 않지만, 앞으로 사용 될 때를 대비해서 약간만 알아두고 넘어가겠습니다.
고수준 API와 언어
맵리듀스의 방식을 따른 데이터플로 엔진들은 SQL과는 다른 형태를 지니는데, 맵리듀스의 경우에는 함수 콜백 개념으로 각 레코드(혹은 그룹)을 입력으로 사용자 정의 함수를 호출하기 때문에 이미 만들어진 파싱, 자연어 처리, 통계 알고리즘을 그대로 사용 할 수 있다는 장점이 있습니다.
컬럼 기반 저장 레이아웃을 사용하면 전체 레코드에 대해 특정 컬럼만을 이용한 파싱이나 함수 호출의 부하를 줄일 수있으며, 하이브, 스파크, 임팔라의 경우에는 벡터화를 수행하여 내부적으로 데이터를 반복해서 CPU 캐시가 잘되게 하거나 함수 호출을 피하는 방식을 사용합니다.
스파크는 JVM 바이트코드를 생성하고 임팔라는 LLVM을 사용해서 내부 루프의 원시 코드를 작성합니다.
참고
https://www.samsungsds.com/kr/insights/hadoop3-coding.html
https://jhnyang.tistory.com/287
https://jyoondev.tistory.com/54
https://spark-korea.github.io/docs/structured-streaming-programming-guide.html