[python] Apache Airflow 소개 및 실습하기(기초)

 

Apache Airflow는 배치 스케쥴링(파이프라인) 플랫폼입니다

실행할 Task(Operator)를 정의하고 순서에 등록 & 실행 & 모니터링할 수 있습니다

DAG(Directed Acyclic Graph, 비순환 방향 그래프)로 각 배치 스케쥴이 관리됩니다

DAG하위에는 고유한 여러 Task가 존재하며 순서를 갖습니다

Task는 BashOperator, PythonOperator 등 다양한 Operator를 지원합니다

DAG(Directed Acyclic Graph) ID ├── Task ID 1 ├── Task ID 2 ├── Task ID 3

[배치 스케쥴링할 때 고려해야 할 문제들]

일부 Task가 실패했습니다. 어떻게게 인지하고 재실행할 수 있을까요?

각 Task는 선행조건과 수행시간이 다릅니다. 어떤 순서가 최적일까요?

수행시간이 일부 Task는 짧고 일부 Taks는 깁니다. 어떤 이유일까요? 로그는 쉽게 찾아 비교할 수 있을까요?

여러 서버에 Task가 존재합니다. 어떻게 스케쥴링 할 수 있을까요?

과거 일부 Task가 실행이 누락되었습니다. 과거기준으로 다시 실행할 수 있을까요?

Airflow는 이 문제에 해답을 제시합니다

그럼 실습환경을 구축하고 실습코드를 만들어 스케쥴링해보겠습니다

1) 도커 이미지를 가져옵니다

docker pull jupyter/base-notebook:python-3.7.3

2) 도커 컨테이너를 실행합니다

docker run --name py3 -it -p 8881-8889:8881-8889 -v C:\notebooks\:/notebooks/ jupyter/base-notebook:python-3.7.3 bash

3) conda를 이용하여 필요한 모듈을 설치합니다

# conda 최신버전으로 업데이트한다 conda update -y conda # airflow 모듈을 설치한다(버전 1.10.3) conda install -y airflow # vim 모듈을 설치한다(소스코드 편집용) conda install -y vim

4) cfg(환경설정파일)과 기본데이터베이스(SQLite)를 초기화합니다 (SQLite이외 다른 DB 사용 가능)

airflow initdb

5) 사용자 환경변수에 airflow 경로를 추가하고 활성화합니다

echo 'export AIRFLOW_HOME=~/airflow' >> /home/jovyan/.profile echo 'export AIRFLOW_HOME=~/airflow' >> /home/jovyan/.bashrc source ~/.profile

6) DAG(Directed Acyclic Graph, 비순환 방향 그래프)파일을 저장할 디렉토리를 만듭니다

mkdir $AIRFLOW_HOME/dags

기본 폴더 구조는 아래와 같습니다

airflow ├── airflow.cfg <- airflow 환경설정 파일 ├── airflow.db <- 데이터베이스(SQLite)파일 ├── dags <- DAG들을 저장하는 디렉토리 │ └── my_first_dag.py <- DAG 정의 파이썬 파일 ├── logs <- 로그파일을 저장하는 디렉토리

airflow.cfg파일에 dags, logs폴더가 지정되어 있습니다.

dags_folder = /home/jovyan/airflow/dags

base_log_folder = /home/jobyan/airflow/logs

7) 설치된 airflow 버전을 확인합니다

airflow version

8) 등록된 DAG 목록을 조회합니다

 
airflow list_dags

9) dags 폴더에 실습할 my_first.py 을 만듭니다

cd $AIRFLOW_HOME/dags/ && vim my_first.py

[소스코드]

from airflow.models import DAG from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator args = {'owner': 'jovyan', 'start_date': days_ago(n=1)} dag = DAG(dag_id='my_first_dag', default_args=args, schedule_interval='@daily') t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator(task_id='sleep', bash_command='sleep 3', dag=dag) t3 = BashOperator(task_id='print_whoami', bash_command='whoami', dag=dag) t1 >> t2 >> t3

10) 코드를 실행해봅니다(문법 오류 정도를 검증할 수 있습니다)

cd $AIRFLOW_HOME/dags/ && python my_first.py

경축! 아무것도 안하여 에스천사게임즈가 새로운 모습으로 재오픈 하였습니다.
어린이용이며, 설치가 필요없는 브라우저 게임입니다.
https://s1004games.com

11) 다시 등록된 DAG 목록을 조회합니다

airflow list_dags

my_first_dag 이름을 가진 DAG를 볼 수 있습니다

12) 새로만든 DAG의 Task를 조회합니다

airflow list_tasks my_first_dag

print_date, print_whoami, sleep 3가지 Task가 있습니다

명령어 두에 --tree를 붙이면 트리형태로 확인할 수 있습니다

13) 테스트로 Task 1개만 실행해봅니다

# [사용법] airflow test dag_id task_id execution_date airflow test my_first_dag print_date 2019-06-01T09:00:00

테스트 목적으로 Task 1개만 단독으로 실행되었습니다

14) airflow 스케쥴러를 실행합니다

airflow scheduler &

15) 스케쥴러를 컨트롤하고 관리할 웹서버를 실행합니다

airflow webserver -p 8882 &

16) 웹브라우저로 접속합니다

http://localhost:8882/admin/

17) DAGs 탭에서 만들었던 my_first_dag을 Off에서 On으로 활성화 합니다

활성화 후 우측의 Links에서 3번째 Graph View를 클릭합니다

18) Graph View를 통해 각 Operator의 순서와 상태를 확인할 수 있습니다

종료되면 진한 녹샌으로 변합니다. 더 자세한 로그를 확인하고자 Operator를 클릭합니다

19) Operator 팝업에서 "View Log"를 클릭합니다

20) 해당 Operator의 실행 차수별 로그를 확인할 수 있습니다

정상적으로 BashOperator인 print_date가 실행되었습니다

21) Gantt 차트를 확인해보겠습니다

print_date -> sleep -> print_whoami 순서로 진행되었고, sleep이 가장 오래 시간이 걸렸습니다

Airflow는 다양한 종류의 Operator를 지원합니다.

위 기초 실습에서는 이해를 돕기 위해 BashOperator 만을 사용했습니다.

끝.

[출처] https://m.blog.naver.com/wideeyed/221565240108

 

 

 

 

본 웹사이트는 광고를 포함하고 있습니다.
광고 클릭에서 발생하는 수익금은 모두 웹사이트 서버의 유지 및 관리, 그리고 기술 콘텐츠 향상을 위해 쓰여집니다.
대표 김성준 주소 : 경기 용인 분당수지 U타워 등록번호 : 142-07-27414
통신판매업 신고 : 제2012-용인수지-0185호 출판업 신고 : 수지구청 제 123호 개인정보보호최고책임자 : 김성준 sjkim70@stechstar.com
대표전화 : 010-4589-2193 [fax] 02-6280-1294 COPYRIGHT(C) stechstar.com ALL RIGHTS RESERVED