꾸준함이 중요한 Lion2me의 기술블로그

Airflow란

05 Jun 2022

Airflow

Airflow는 데이터 엔지니어링을 위한 워크플로우를 관리해주는 하나의 플랫폼입니다. 2016년 airbnb에서 만들어 낸 플랫폼이지만, 많은 장점이 있기 때문에 상당히 많은 기업에서 사용하고 있는 기술입니다.

이번에는 그 Airflow를 사용하기 전 어떤 구조로 이루어져 있으며, 어떻게 동작하는지를 알아보고 장점과 단점에 대해서 알아보고자 합니다.

현재 재직중인 회사의 스펙이라 더더욱 소중한 Airflow에 대한 포스팅을 시작합니다.

Airflow는 왜 사용하는가?

어떤 기술이든 사용하기 전에 고려해야 하는 점은 “왜 사용하는지?” 입니다.

제가 작업하는 환경은 Airflow와 함께 Shell script의 cron으로 스케줄링을 하는 혼종의 환경에서 작업하고 있습니다. 하지만 그렇기 때문에 Airflow를 사용하는 이유를 뼈저리게 깨닫게 되었습니다.

저는 이 왜 사용하는가?라는 질문에 crontab과 비교하면서 말씀드리고자 합니다.

1. 실패 복구 ( backfill )

crontab으로 동작시킨 코드를 보면 항상 포함 된 옵션이 있습니다. 바로 현재 날짜를 입력받는 파라미터입니다.

완벽한 코드를 구현하더라도, 심지어 완벽한 Dev Ops의 손을 거친 서버라도 언젠가는 제대로 동작하지 않을 수 있습니다. 그 경우 원하는 데이터만을 ETL 할 수 있는 방안이 필요한 것은 당연합니다.

crontab도 불가능하지는 않습니다. shell script를 통해 날짜 및 시간을 입력받은 뒤 통합 관리하는 코드에 그 값을 추가해주면 얼마든지 가능합니다만, 번거롭고 언제 어디서 문제가 발생했는지 명확하게 파악 할 필요가 있습니다.

airflow는 excution_date를 가지고 있습니다. 물론 이 값이 start_date라는 개념과 많은 혼동을 부르고 있지만, 그럼에도 airflow에서 특정 dag가 실행되어 영향을 미치는 시점에 대한 메타데이터를 가지고 있는 것은 큰 장점으로 작용합니다.

즉 우리는 Airflow UI를 통해 해당 날짜와 시간의 dag를 재시작해주면 됩니다.

2. 모니터링

실패 복구를 위해서는 어디서 문제가 발생했는지 알아야 한다고 했습니다. 하지만 crontab을 이용해서 코드를 실행시켰을 때는 이러한 부분을 알기 어렵습니다. 얼마나 알기 어렵냐면 Except코드에서 traceback을 켜놓고 그마저도 불안해서 DB에 해당 일자의 데이터가 잘 들어왔는지 뷰를 만들어놓기도 합니다.

개발 시간과 추적 시스템을 만드는 시간이 거의 동일하게 소요 된 것 같습니다.

하지만 Airflow를 사용하면 이러한 부분을 방지 할 수 있습니다. UI로 쉽게 어느 부분에서 문제가 발생했는지 알 수 있고, python 단에서 print를 찍는 것으로 (logger를 써야 하지만..) 문제를 파악 할 수도 있습니다.

그럼 우리는 문제가 발생 한 dag의 해당 위치만 다시 실행시키면 끝납니다.

3. 의존성 관리

의존성 관리도 쉽습니다.

shell script의 큰 문제점 중 하나인 위 코드가 에러가 나든 말든 앞은 실행된다.는 다른 프로그래밍 언어에서 찾아 볼 수 없는 획기적인 실행 방식으로 우리는 스케줄링을 눈으로 스케줄링해야하는 엽기적인 사건이 벌어지기도 합니다.

하지만 airflow는 위 코드에서 문제가 발생하면 이후 로직은 수행하지 않으며, Branching Operator는 문제가 발생했을 때 실행 할 task를 별도로 지정 할 수도 있습니다.

해당 task가 정상적으로 수행한다면 이전의 task는 안전하게 수행되었음을 알 수 있죠.

4. 확장성

Airflow는 분산 된 환경에서의 확장성에 장점이 있다고 합니다. 그런데 이 부분에 대해서는 아직 와닿는 개발 경험이 없기 때문에 알아만두겠습니다.

워크 플로우는 무엇인가?

워크플로우는 말그대로 Work의 Flow, 즉 의존성으로 연결 된 작업의 집합입니다. 그리고 Airflow에서는 이러한 워크플로우를 DAG라고 표현하고 있습니다.

DAG는 방향성이 있으며 순환하지 않는 그래프입니다. 그래프 자료구조를 배우면 조금 더 명확하게 이해 할 수 있습니다만, 위에서 말하는 의존성으로 연결 된 작업의 집합 이라는 말을 엮으면 쉽게 이해 할 수 있습니다.

A » B » C 는 A 작업이 끝난 후 B 작업을 실행하고, B 작업이 끝나면 C 작업을 실행한다고 이해 할 수 있습니다. 이러한 A 작업을 의존하는 B 작업, B 작업에 의존하는 C 작업이고, 이러한 플로우는 방향성이 있고, 순환하지 않으므로 DAG가 될 수 있습니다.

Airflow는 무엇으로 이루어져 있는가?

1. 웹 서버

Airflow는 모니터링과 Connection 혹은 Variable을 설정하기 쉽도록 보여주는 UI를 지원합니다. 최고..

2. 스케줄러

워크플로우가 언제 실행되는지를 관리해주는 컴포넌트입니다.

3. Metastore

이 부분이 중요 할 수 있습니다.

Airflow는 Task간의 데이터 전송에서 사용되는 XCOM ( 별로 좋지는 않다고 하지만 )과 DAG의 상태 정보와 같은 정보를 담아야 합니다. 이러한 데이터를 담는 부분이 이 Metastore입니다.

4. Executor

Task가 어떻게 실행 될 것인가를 정의해주는 컴포넌트입니다.

5. Worker

Task가 실제로 실행되는 프로세스입니다.

Operator

하나의 작업을 정의하는데 사용되는 개념입니다.

Task는 Operator가 실행 된 인스턴스라고 할 수 있습니다.

Task = Operator’s Instance

1. Action Operator

실제 연산을 수행하는 Operator

2. Transfer Operator

데이터를 전송하는 Operator입니다. 예를 들면 hdfs에서 MariaDB로 데이터를 옮겨주는, 일종의 Migration과 같은 동작을 수핼해주는 Operator입니다.

3. Sensor Operator

Task를 언제 실행시킬지 정의하는 Operator입니다.

항상 트리거를 기다리고 있다가 조건이 충족되면 Task를 실행시키는 Operator입니다. 예를 들면 특정 파일이 특정 디렉토리에 도착 했을 떄 실행하는 등의 동작을 수행 할 수 있습니다.

그러면 Airflow의 동작 방식은?

사용자가 Dag를 생성하고 Airflow에서 수행하게 되면 다음과 같은 수행 방식을 갖습니다.

1. Dag를 작성하여 Workflow를 만듭니다.

Dag는 Task로 이루어져 있으며, Task는 Operator의 인스턴스입니다.

2. Dag를 실행시키면, Scheduler는 DagRun 오브젝트를 생성합니다.

여기서 DagRun 오브젝트는 Dag를 인스턴스화 한 것입니다.

Dag에 대한 정보는 MetaStore에 있습니다. 때문에 Scheduler는 DagRun 오브젝트를 만들 때 MetaStore를 참고합니다.

3. DagRun 오브젝트가 Task를 인스턴스로 만들고 스케줄러는 Task 인스턴스를 스케줄링합니다.

조금 모호 할 수 있지만, 중요한건 이렇게 실행 함으로서 각 DagRun 오브젝트가 가진 Task 오브젝트는 개별적인 객체임을 알 수 있습니다.

Task 인스턴스를 스케줄링 한다는 뜻은 실행하려는 Task 인스턴스를 Executor에게 전달한다는 말이 됩니다.

4. Worker가 Task를 실행시킨 뒤 “완료”상태로 변경합니다.