꾸준함이 중요한 Lion2me의 기술블로그

Spark RDD

04 Apr 2022

Spark RDD

이전 포스팅에서는 Spark가 어떤 프레임워크인지 간략하게 적었습니다. RDD도 함께 적고 싶었지만, 이 부분은 상당히 길게 적게 될 것 같아 별도로 정리합니다.

RDD란

RDD는 Resilient Distributed Dataset의 약자로 회복성있는 분산 데이터 셋입니다. 여기서 회복성이 있다는 이야기는 회복 할 수 있도록 한다는 이야기이지 유연한 변경이 가능하다는 것을 이야기하지 않습니다.

여기서 회복이 가능하다는 이야기에 대해서 자세하게 말하면, 불변하기 때문에 추적 할 수 있고 회복이 가능하다 라고 말 할 수 있습니다.

RDD는 불변합니다. 이후에 알아 볼 Transformation(변형)을 수행하면 우리가 보기에는 마치 데이터가 필터링되거나 혹은 값의 변화가 생긴 것 처럼 보이지만, 실제로는 현재 RDD에서 새로운 RDD가 생겨난 개념이라고 할 수 있습니다.

만약 데이터를 처리하다가 에러가 발생하면 이전 RDD에 대한 메타데이터가 있기 때문에 추적 할 수 있고, 회복이 가능하게 되는 것 입니다.

RDD의 종류

RDD의 종류는 크게 두 종류로 볼 수 있습니다.

먼저 Single Value ( 단일 값 ) RDD로 RAW한 텍스트 데이터 형태의 경우라고 생각 할 수 있습니다. wordcount라는 RDD 예제 코드에서는 이 RDD를 사용하기도 합니다.

그리고 Key-Value RDD가 있습니다. NOSQL과 비슷한 형태이기 때문에 상대적으로 자주 다루어지는 개념이고, 편리한 개념입니다.

Key-Value는 Key를 기반으로 하는 연산을 수행 할 때 사용됩니다. 예를 들면 특정 유저의 정보를 다룰 때 고유 ID를 Key로 사용하고, 유저의 액션을 Value로 설정하면 유저의 선호 액션을 분석 할 수 있을 것 같습니다.

Single Value에 map 함수에 두 개의 값을 리턴하면 Key-Value RDD입니다.

Key-Value를 사용 할 때 굳이 두 값을 모두 사용하는 map함수를 사용 할 필요가 없이 Key가 고정적이라면 mapValues() 혹은 flatMapValues()를 사용 할 수 있습니다.

이러한 Key를 고정하는 연산의 경우에 Spark 내부에서 Partitioning을 하지 않는 장점이 있어서 성능상에도 큰 이점이 있습니다.

Spark 연산에서 RDD는 어떤 방식으로 동작하는지

Transformations

Transformations는 말 그대로 데이터를 변형하는 방식입니다. 하지만 위에서 이야기했던 내용처럼 새로운 RDD를 생성하는 것임을 알아야합니다.

일단 Spark가 인메모리 데이터 처리를 수행한다는 것을 생각하면 모든 RDD가 데이터를 담고 있을리는 없다고 예상 할 수 있습니다. 이 내용은 저의 이전 포스팅에 정리 한 적이 있습니다.

그러면 새로운 RDD를 생성하는 Transformation이 작동하는 방식은 어떻게 될까요?

바로 Action이 수행 될 때 동작합니다. 그 전까지는 그저 “나는 이런 동작을 할거야!”라는 계획만을 수립 한 채로 실제 Action이 이루어지기를 기다리고 있죠.

RDD는 여러 Transformation을 거쳐서 Action을 수행하게 되는데 그 과정을 연달아 적어놓은 뒤 연이어 수행을 하게 되면 우리가 원하는 결과를 얻을 수 있게 되는 방식입니다.

이러한 실행 방식을 Lazy 한 실행이라고 말하기도 합니다.

Transformation은 크게 두 가지로 나눌 수 있습니다.

Narrow Transformation

Narrow Transformation은 1:1 변환입니다. 예를들면 map 함수를 통해서 데이터를 각각 변환을 하게 되는 것을 생각 할 수 있습니다.

Spark는 노드 간 통신을 통해서 연산하는 것을 지원합니다. 하지만 모든 연산에서 노드 간 통신을 할 필요는 없죠. 각 노드에서 각각의 작업을 통해 연산에 필요 한 데이터만을 필터링 및 변환 한 뒤 최소의 통신으로 작업을 진행하면, 훨씬 좋은 성능적인 이득을 얻을 수 있습니다.

결국 Narrow Transformation은 이어서 설명 할 Wide Transformation보다 더욱 빠른 변환 방식이며, Wide Transformation을 수행하려 한다면, 그 전에 먼저 수행되어야 할 변환이기도 합니다.

대표적인 Narrow Transformation은 map(), mapPartition(), flatMap(), filter(), union() 이 있습니다.

Wide Transformation

Wide Transformation은 노드 간 통신을 통해 수행되는 변환입니다. 어떤 연산을 수행하는지는 Spark에서 알아서 정해주지만, 우리가 신경써야 하는 문제는 바로 통신 입니다.

통신이 들어가는 것은 곧 연산의 시간이 증가하는 것이고, 통신 할 데이터가 크면 클수록 그 오버헤드는 커지게 됩니다.

즉 이 부분을 최적화 하는 것이 중요합니다.

대표적인 Wide Transformation은 groupByKey(), aggregateByKey(), aggregate(), join(), repartition()이 있습니다.

이러한 Transformation이 잘 정리되어 있는 사이트가 있어 올려놓습니다. https://sparkbyexamples.com/apache-spark-rdd/spark-rdd-transformations/

Actions

Action은 실제로 우리가 어떠한 결과를 얻는 것입니다.

N개의 데이터를 실제로 추출 할 수 있고, 파일 형식으로 추출 할 수도 있으며 심지어는 메모리에 데이터를 영속화 하는 것 또한 Action에 속합니다.

Action은 명령과 동시에 실행되는 즉시 실행이라는 특징을 가지고 있습니다.

그리고 실제로 Action은 Worker 노드에서 실행됩니다.

자세한 함수는 다음에 알아보도록 하겠습니다.

왜 지연 연산을 사용하는가?

지연 연산을 사용하는 이유는 최적화와 안정성이라고 생각합니다.

Spark가 사용되기 전 Hadoop의 Map Reduce의 경우에는 분산 환경에서 동작하지만 각 동작을 즉시 실행하면서 중간 과정을 디스크에 저장했습니다.

이 당시에는 이 방식을 사용 할 수 밖에 없는 이유는 각 분산 환경에서 개발자가 직접 연산을 통합하기 때문에 변경된 정보를 저장해야 했기 때문입니다. 이렇게 안하면 장애가 발생 했을 때 복구하기 어렵죠.

그런데 또 다른 문제도 있습니다. 바로 Disk IO에 걸리는 시간입니다.

매 중간 결과에 대해 Disk 저장과 불러오는 과정의 반복이다보니 시간은 더욱 느릴 수 밖에 없습니다.

Spark는 이러한 문제를 단번에 해결할 수 있는 방안을 제시합니다. 지연 연산이 그 답이였죠.

Transformation을 실행 할 때 실제 동작하지 않고, 새로운 RDD를 쌓아가면서 연산의 순서를 메타데이터에 기록합니다. 그리고 Action이 수행되면 그 때 디스크에서 해당하는 데이터를 1번 불러오고 동작합니다.

즉 Spark의 DiskIO는 (최적의 경우) 1번이라고 말 할 수 있습니다.

그리고 모든 데이터를 덜컥 메모리에 올리지 않고, 동작하기로 약속 되어있는 Transformation을 수행하면서 데이터를 변환해나가기 시작합니다.

약속 되어 있는 동작을 수행하면 항상 같은 값이 나오는 것이 보장되어 있기 때문에 에러가 발생하면 바로 이전 과정에서 재실행을 할 수 있는 상태가 됩니다.

무엇을 메모리에 올려 놓을 것인가?

Map Reduce와의 가장 큰 차이점은 최적의 메모리 사용이라고 했습니다. 무엇을 메모리에 올려놓을지 선택하는 것이 중요하다는 것을 쉽게 알 수 있습니다.

Spark에서 중간 과정을 메모리에 올리는 방법은 크게 두 가지 함수가 있는데 바로 cache()와 persist()입니다.

메모리에 올라가있는 데이터는 여러 번의 작업을 수행해도 Transformation을 처음부터 수행하지 않기 때문에 반복되는 작업에서 필수적인 요소라고 생각 할 수 있습니다.

예를들면 머신러닝을 학습 할 때 동일한 데이터 셋에서 Train/Validation Set(k-fold 5)를 나누어서 모델을 테스팅한다고 하면, 총 5번 수행하게 됩니다.

수 TB의 데이터 중 Filtering -> mapping 과정을 거쳐 선별된 10GB의 데이터라고 한다면, 그 전체 과정을 5번 연속 수행하면 많은 시간이 소요 될 것임을 알 수 있습니다.

이 과정에서 Transformation이 끝난 시점의 RDD를 영속화(persist)한다면 어떨까요?

10GB의 데이터를 메모리에 올리게 되지만, 연산은 확연히 줄어드는 것을 알 수 있습니다.

이렇듯 어떤 데이터를 메모리에 적재 할 것인가에 대한 고민이 중요합니다.

cache와 persist의 차이

정확히 말하면 차이는 StorageLevel을 결정 할 수 있는가 없는가의 차이입니다.

persist의 경우에는 StorageLevel을 직접 결정 할 수 있기 때문에 필요에 따라 설정 할 수 있습니다. 하지만 cache의 경우에는 자동적으로 결정됩니다.

cache의 경우에는 RDD 수준에서 수행하면 MEMORY_ONLY를 사용하고 DF 수준에서 수행하면 MEMORY_AND_DISK를 사용하게 됩니다.

StorageLevel 부분은 더 자세하게 포스팅하도록 하겠습니다.

갑자기 궁금한 점, 만약 Transformation을 수행했는데 그 결과가 노드 전체에서 보관 할 수 있는 메모리의 총 량을 넘어가면? 스파크는 자동으로 디스크를 쓸 것인가?