[python] Apache Airflow 소개 및 실습하기(기초)

 

Apache Airflow는 배치 스케쥴링(파이프라인) 플랫폼입니다

실행할 Task(Operator)를 정의하고 순서에 등록 & 실행 & 모니터링할 수 있습니다

DAG(Directed Acyclic Graph, 비순환 방향 그래프)로 각 배치 스케쥴이 관리됩니다

DAG하위에는 고유한 여러 Task가 존재하며 순서를 갖습니다

Task는 BashOperator, PythonOperator 등 다양한 Operator를 지원합니다

DAG(Directed Acyclic Graph) ID ├── Task ID 1 ├── Task ID 2 ├── Task ID 3

[배치 스케쥴링할 때 고려해야 할 문제들]

일부 Task가 실패했습니다. 어떻게게 인지하고 재실행할 수 있을까요?

각 Task는 선행조건과 수행시간이 다릅니다. 어떤 순서가 최적일까요?

수행시간이 일부 Task는 짧고 일부 Taks는 깁니다. 어떤 이유일까요? 로그는 쉽게 찾아 비교할 수 있을까요?

여러 서버에 Task가 존재합니다. 어떻게 스케쥴링 할 수 있을까요?

과거 일부 Task가 실행이 누락되었습니다. 과거기준으로 다시 실행할 수 있을까요?

Airflow는 이 문제에 해답을 제시합니다

그럼 실습환경을 구축하고 실습코드를 만들어 스케쥴링해보겠습니다

1) 도커 이미지를 가져옵니다

docker pull jupyter/base-notebook:python-3.7.3

2) 도커 컨테이너를 실행합니다

docker run --name py3 -it -p 8881-8889:8881-8889 -v C:\notebooks\:/notebooks/ jupyter/base-notebook:python-3.7.3 bash

3) conda를 이용하여 필요한 모듈을 설치합니다

# conda 최신버전으로 업데이트한다 conda update -y conda # airflow 모듈을 설치한다(버전 1.10.3) conda install -y airflow # vim 모듈을 설치한다(소스코드 편집용) conda install -y vim

4) cfg(환경설정파일)과 기본데이터베이스(SQLite)를 초기화합니다 (SQLite이외 다른 DB 사용 가능)

airflow initdb

5) 사용자 환경변수에 airflow 경로를 추가하고 활성화합니다

echo 'export AIRFLOW_HOME=~/airflow' >> /home/jovyan/.profile echo 'export AIRFLOW_HOME=~/airflow' >> /home/jovyan/.bashrc source ~/.profile

6) DAG(Directed Acyclic Graph, 비순환 방향 그래프)파일을 저장할 디렉토리를 만듭니다

mkdir $AIRFLOW_HOME/dags

기본 폴더 구조는 아래와 같습니다

airflow ├── airflow.cfg <- airflow 환경설정 파일 ├── airflow.db <- 데이터베이스(SQLite)파일 ├── dags <- DAG들을 저장하는 디렉토리 │ └── my_first_dag.py <- DAG 정의 파이썬 파일 ├── logs <- 로그파일을 저장하는 디렉토리

airflow.cfg파일에 dags, logs폴더가 지정되어 있습니다.

dags_folder = /home/jovyan/airflow/dags

base_log_folder = /home/jobyan/airflow/logs

7) 설치된 airflow 버전을 확인합니다

airflow version

8) 등록된 DAG 목록을 조회합니다

 
airflow list_dags

9) dags 폴더에 실습할 my_first.py 을 만듭니다

cd $AIRFLOW_HOME/dags/ && vim my_first.py

[소스코드]

from airflow.models import DAG from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator args = {'owner': 'jovyan', 'start_date': days_ago(n=1)} dag = DAG(dag_id='my_first_dag', default_args=args, schedule_interval='@daily') t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator(task_id='sleep', bash_command='sleep 3', dag=dag) t3 = BashOperator(task_id='print_whoami', bash_command='whoami', dag=dag) t1 >> t2 >> t3

10) 코드를 실행해봅니다(문법 오류 정도를 검증할 수 있습니다)

cd $AIRFLOW_HOME/dags/ && python my_first.py

11) 다시 등록된 DAG 목록을 조회합니다

airflow list_dags

my_first_dag 이름을 가진 DAG를 볼 수 있습니다

12) 새로만든 DAG의 Task를 조회합니다

airflow list_tasks my_first_dag

print_date, print_whoami, sleep 3가지 Task가 있습니다

명령어 두에 --tree를 붙이면 트리형태로 확인할 수 있습니다

13) 테스트로 Task 1개만 실행해봅니다

# [사용법] airflow test dag_id task_id execution_date airflow test my_first_dag print_date 2019-06-01T09:00:00

테스트 목적으로 Task 1개만 단독으로 실행되었습니다

14) airflow 스케쥴러를 실행합니다

airflow scheduler &

15) 스케쥴러를 컨트롤하고 관리할 웹서버를 실행합니다

airflow webserver -p 8882 &

16) 웹브라우저로 접속합니다

http://localhost:8882/admin/

17) DAGs 탭에서 만들었던 my_first_dag을 Off에서 On으로 활성화 합니다

활성화 후 우측의 Links에서 3번째 Graph View를 클릭합니다

18) Graph View를 통해 각 Operator의 순서와 상태를 확인할 수 있습니다

종료되면 진한 녹샌으로 변합니다. 더 자세한 로그를 확인하고자 Operator를 클릭합니다

19) Operator 팝업에서 "View Log"를 클릭합니다

20) 해당 Operator의 실행 차수별 로그를 확인할 수 있습니다

정상적으로 BashOperator인 print_date가 실행되었습니다

21) Gantt 차트를 확인해보겠습니다

print_date -> sleep -> print_whoami 순서로 진행되었고, sleep이 가장 오래 시간이 걸렸습니다

Airflow는 다양한 종류의 Operator를 지원합니다.

위 기초 실습에서는 이해를 돕기 위해 BashOperator 만을 사용했습니다.

끝.

[출처] https://m.blog.naver.com/wideeyed/221565240108

 

 

 

 

본 웹사이트는 광고를 포함하고 있습니다.
광고 클릭에서 발생하는 수익금은 모두 웹사이트 서버의 유지 및 관리, 그리고 기술 콘텐츠 향상을 위해 쓰여집니다.
번호 제목 글쓴이 날짜 조회 수
» [python] Apache Airflow 소개 및 실습하기(기초) file 졸리운_곰 2022.07.25 2
393 [anaconde3][python] Create environment for tensorflow 1.4 in Anaconda 3 졸리운_곰 2022.07.02 3
392 [python 인터넷] Python으로 XML-RPC 서버 구축 file 졸리운_곰 2022.06.28 1
391 [python 인터넷][네이버 뉴스 크롤링] python crawling - 네이버 뉴스 기사 크롤링 file 졸리운_곰 2022.06.28 4
390 [python RPA] Robotic Process Automation with Python file 졸리운_곰 2022.03.03 8
389 [python RPA] 3 WAYS TO DO RPA WITH PYTHON file 졸리운_곰 2022.03.03 2
388 [Python RPA] [Python] - 모듈탐구 pyautogui - Python 폴터가이스트 졸리운_곰 2022.03.03 6
387 [python][자료구조] python anaconda 에서 mysql 접속 졸리운_곰 2022.01.25 10
386 [python][anaconda] 파이썬3(python3) 설치하고 환경(env) 관리하기 - 아나콘다3(anaconda3)를 활용한 설치 file 졸리운_곰 2022.01.20 5
385 [python][인공지능] [TensorFlow] Anaconda 가상환경 이용하여 TensorFlow GPU 설치 졸리운_곰 2022.01.20 1
384 [python][anaconda] 파이선 아나콘다 최신 버전 업데이트하기 file 졸리운_곰 2022.01.20 3
383 [python 자료구조] 림코딩의 파이썬으로 csv 다루기 강좌 (읽기,쓰기,수정,추가) 졸리운_곰 2022.01.16 40
382 [Python 데이터분석][python 데이터분석 프로덕션] [Python] Docker를 사용한 Dash 웹앱 생성 file 졸리운_곰 2021.12.10 8
381 [python][flask] bitnami의 django 서버로 flask 서비스 file 졸리운_곰 2021.12.10 5
380 [python 인터넷] How to Select CheckBox and Radio Button in Selenium WebDriver 졸리운_곰 2021.11.26 8
379 [python 인터넷] [Python] Selenium 사용하기 (+PhantomJS) file 졸리운_곰 2021.11.26 3
378 [python 인터넷] Python Selenium(셀레니움) 크롬창 활성 탭 변경하기 file 졸리운_곰 2021.11.26 6
377 [python 인터넷 ,selenium] selenium iframe 처리하기 file 졸리운_곰 2021.11.26 9
376 [python 인터넷] Selenium에서 특정 element가 갑자기 클릭이 되지 않을 때 (python) 졸리운_곰 2021.11.26 4
375 [python 인터넷] Selenium Python button 클릭 file 졸리운_곰 2021.11.26 4
대표 김성준 주소 : 경기 용인 분당수지 U타워 등록번호 : 142-07-27414
통신판매업 신고 : 제2012-용인수지-0185호 출판업 신고 : 수지구청 제 123호 개인정보보호최고책임자 : 김성준 sjkim70@stechstar.com
대표전화 : 010-4589-2193 [fax] 02-6280-1294 COPYRIGHT(C) stechstar.com ALL RIGHTS RESERVED