본문 바로가기

DevOps/#Continuous Testing

[Data Pipeline] Apache Airflow 기반의 데이터 파이프라인 (2)

목차

1. Apache Airflow 살펴보기

Airflow 소개
파이썬 코드로 유연한 파이프라인 정의

Airflow를 사용하면 파이프라인이나, 워크플로우 태스크를 방향성 비순환 그래프(DAG)로 정의할 수 있다. 더불어, Airflow는 파이썬 스크립트로 DAG의 구조를 정의하고 구성하는데, 일반적으로 DAG 파일 안에는 주어진 DAG에 대한 태스크 집합과 태스크 간의 의존성을 기술하고, Airflow는 이 DAG의 구조를 식별하기 위해 코드를 Parsing 한다. 이 외에도, DAG 파일 안에는 Airflow의 실행 방법과 시간 등을 정의한 추가 메타데이터가 포함될 수 있다.

Airflow DAG를 Python 코드로 정의함으로써 추후 외부의 데이터베이스, 빅데이터 기술 및 클라우드 서비스를 포함한 시스템에서 태스크를 실행할 수 있는 확장 기능을 추가로 개발할 수 있다.

파이프라인 스케줄링 및 실행

DAG로 파이프라인 구조를 정의한 후 DAG의 실행 주기를 정의할 수 있다. 개념적으로 Airflow는 다음 세 가지 주요 구성요소를 갖고 있다.

  • Airflow 스케줄러 - DAG를 분석하고 현재 시점에서 DAG의 스케줄이 지난 경우 Airflow 워커에 DAG의 태스크를 예약한다.
  • Airflow 워커 - 예약된 태스크를 선택하고 실행한다.
  • Airflow 웹 서버 - 스케줄러에서 분석한 DAG를 시각화하고 DAG 실행과 결과를 확인할 수 있는 주요 인터페이스를 제공한다.

이 중 Airflow 스케줄러는 다음 단계를 통해 작업을 진행한다.

  1. 사용자가 DAG 워크플로우를 작성하면, 스케줄러는 DAG 파일을 분석하고 각 DAG 태스크, 의존성 및 예약 주기를 확인한다.
  2. 마지막 DAG까지 내용을 확인한 후 DAG 예약 주기가 경과했는지 확인한다. 예약 주기가 현재 시간 이전이라면 실행되도록 예약한다.
  3. 예약된 각 태스크에 대해 스케줄러는 해당 태스크의 의존성(=업스트림 태스크)을 확인한다. 의존성 태스크가 완료되지 않았다면 실행 대기열에 추가한다.
  4. 스케줄러는 1단계로 다시 돌아간 후 새로운 루프를 잠시 동안 대기한다.

이후 태스크가 실행 대기열에 추가되면 Airflow 워커의 풀(Pool) 워커가 태스크를 선ㅌ개하고 실행한다. 이때 실행은 병렬로 수행, 실행 결과는 지속 추적된다. 이때의 모든 결과는 Airflow 메타스토어로 전달되어, 사용자가 웹 인터페이스로 태스크 진행 상황을 추적하고 로그를 확인할 수 있다.

모니터링과 실패 처리

Airflow는 DAG를 확인하고 실행 결과에 대해 모니터링 할 수 있는 UI를 제공한다. 이 기능(뷰)는 DAG의 구조와 각 DAG의 실행 결과를 확인하는 데 유용하게 쓰일 수 있다.

이러한 그래프 뷰 외에도, 특정 DAG에 대한 모든 실행 현황과 기록을 Tree View를 통해 확인할 수 있다. 이 Tree View에서는 특정 모델의 DAG 다중 실행 결과를 확인할 수 있으며, 단일 DAG의 실행 상태와 최근 DAG의 실행 상태를 확인할 수 있다. 또한, 태스크 별 자세한 정보와 재실행이 필요한 태스크의 상태를 초기화할 수 있다.

기본적으로 Airflow는 태스크 실패 시 재시도(재실행 시간 간격 설정 가능)할 수 있기 때문에 오류 발생 시 태스크를 복구할 수 있다. Airflow 태스크가 실패하면 해당 값을 기록하고 사용자에게 실패를 통보한다. 트리 뷰에서 개별 태스크 결과를 삭제하고 종속된 태스크를 모두 재실행할 수 있다.

점진적 로딩 및 백필

Airflow의 스케줄 기능을 활용하여 DAG에서 정의된 특정 시점에 작업을 트리거하거나, 최종 시점 및 다음 스케줄 주기를 알 수 있다.이러한 특성은 데이터 파이프라인이 점진적으로 실행될 수 있도록 구성할 수 있도록 하며, 매번 전체 데이터 세트를 처리할 필요 없이, 해당 시간 슬롯(델타 데이터)에 대해서만 처리할 수 있다. 

이는 특히, 대규모 데이터 세트를 처리하는 경우, 기존 결과에 대한 태스크 전체를 다시 수행하는 것을 방지하여 많은 시간과 비용을 절감할 수 있다. 이는 백필 개념과 결합하여 새로 생성한 DAG를 과거 시점 및 기간에 대해 실행할 수 있다. 이를 통해 과거 특정 기간에 대해 DAG를 실행하여 새로운 데이터 세트를 쉽게 생성할 수 있으며, 과거 실행 결과를 삭제하고, 코드를 변경한 후에 과거 태스크를 재실행할 수 있어, 전체 데이터 세트를 간단하게 재구성해 처리할 수 있다.

언제 Airflow를 사용해야 할까
Airflow를 선택하는 이유
  • 파이썬 코드를 이용해 파이프라인을 구현할 수 있기 때문에 파이썬 언어에서 구현할 수 있는 대부분의 방법을 사용하여 복잡한 커스텀 파이프라인을 만들 수 있다.
  • 파이썬 기반의 Airflow는 쉽게 확장 가능하고, 다양한 시스템과 통합이 가능하다. 실제, Airflow 커뮤니티에는 다양한 유형의 데이터베이스, 클라우드 서비스 등과 통합할 수 있는 애드온이 많이 존대한다.
  • 스케줄링 기법으로 파이프라인을 정기적으로 실행하고 점진적(증분) 처리를 통해, 전체 파이프라인을 재실행할 필요가 없는 효율적인 파이프라인 구축이 가능하다.
  • 백필 기능을 사용하여 과거 데이터를 손쉽게 재처리할 수 있기 때문에 코드를 변경한 후 재생성이 필요한 데이터 재처리가 가능하다.
  • Airflow의 웹 인터페이스로 파이프라인 실행 결과를 모니터링할 수 있고 오류를 디버깅하기 위한 뷰를 제공한다.
  • 오픈 소스로 구성되어 특정 벤더에 종속되지 않고 Airflow를 사용할 수 있다.
Airflow가 적합하지 않은 경우
  • 반복적이거나, 배치 태스크(batch-oriented task)를 실행하는 기능에 초점이 맞추어져 있어 스트리밍(실시간 데이터 처리) 워크플로우 및 해당 파이프라인 처리에 적합하지 않을 수 있다.
  • 추가 및 삭제 태스크가 빈번한 동적 파이프라인의 경우 적합하지 않을 수 있다. Airflow는 동적 태스크를 구현할 수 있으나, 웹 인터페이스는 DAG의 가장 최근 실행 버전에 대한 정의만 표현흐므로, 실행되는 동안 구조가 변경되지 않는 파이프라인에 더 적합하다.
  • 파이썬 언어로 DAG를 구현하므로, 파이썬 프로그래밍 경험이 없는 팀은 적합하지 않다. 이런 팀은 Azure Data Factory 와 같은 GUI 기반 툴이나, 다른 정적 워크플로우 정의가 가능한 솔루션을 선택하는 것이 좋다.
  • 파이썬 코드로 DAG를 작성하는 것은 파이프라인 규모가 커지면 복잡해질 수 있어, 장기적으로 DAG를 유지관리하기 위해서는 초기 사용 시점부터 엄격한 관리가 필요하다.

Airflow는 워크플로우 및 파이프라인 관리 플랫폼이며, 데이터 계보(lineage) 관리, 데이터 버전 관리와 같은 확장 기능은 제공하지 않으므로, 필요한 경우 특정 도구를 직접 통합해야 한다.