
[Airflow] 6.1 센서를 사용한 폴링 조건
2024. 3. 26. 18:27
Airflow
이전의 방식: 미리 정의된 시간에 스케줄 간격에 따라 워크플로 시작 ① 주어진 간격으로 워크플로 실행 ② 다음 시간 계산 ③ 해당 날짜 및 시간에 첫 번째 태스크 시작 새로운 방식: 고정된 스케줄이 아닌 다른 방식으로 파이프라인 트리거 특정 이벤트(예: 새로운 파일 입력, HTTP 서비스 호출)로 파이프라인 실행 특정 태스크 수행 후 트리거 예: 공유 드라이브 내에 파일 업로드, 개발자가 코드를 리포지터리로 push, 새로운 데이터 입력 등 예시: 회사 간 공유 스토리지 시스템에 매일매일 데이터 덤프를 제공하는 경우 데이터가 비정기적으로 도착하므로 스케줄 간격 설정 불가 해당 워크플로를 위한 초기 로직 copy_to_raw_supermarket: 슈퍼마켓 1~4에서 제공한 데이터를 raw 스토리지에 복사..

[Airflow] 5.5 태스크 간 데이터 공유
2024. 3. 26. 18:16
Airflow
Airflow의 XCom(cross-communication)을 통해 태스크 간 데이터 공유 가능 XCom: 태스크 간 메시지를 교환하여 특정 상태 공유 XCom을 사용하여 데이터 공유하기 예시: Umbrella DAG의 train_model 태스크에서 훈련된 모델이 모델 레지스트리에 등록 훈련된 모델 배포를 위해 deploy_model 태스크에 모델 버전 식별자 전달 XCom을 통해 train_model과 deploy_model 작업 사이에 모델 식별자 공유 가능 train_model 태스크는 XCom에 모델 식별자 값 전송 → 다른 태스크에서 XCom 값을 사용할 수 있도록 함 Airflow 컨택스트의 xcom_push 메서드를 통해 값 게시 xcom_push를 사용해 명시적으로 XCom의 값을 게시..

[Airflow] 5.4 트리거 규칙에 대한 추가 정보
2024. 3. 23. 14:35
Airflow
브랜치 또는 조건부의 경우 Airflow 트리거 규칙에 의해 제어 태스크 실행 시기를 정확히 결정 트리거 규칙 이해하기 Airflow가 DAG 실행 내에서 작업을 실행하는 방법 Airflow는 DAG 실행 시 각 태스크를 지속적으로 확인하여 실행 여부 확인 태스크 실행이 가능할 시 실행 예약 즉, Airflow 내에 사용 가능한 실행 슬롯이 있을 경우 즉시 태스크 실행 Airflow의 태스크 실행 시기를 결정하기 위해 트리거 규칙 필요 트리거 규칙이란? 트리거 규칙: 태스크가 실행 준비 상태인지 파악하기 위한 필수 조건 default: all_success (태스크 실행을 위해 의존적인 태스크가 모두 성공적으로 완료되어야 함) 예시: Umbrella DAG DAG 시작 시, Airflow는 실행할 수 있..

[Airflow] 5.3 조건부 태스크
2024. 3. 23. 14:18
Airflow
DAG 내에서 특정 조건에 따라 태스크를 건너뛰는 방법 예시: Umbrella DAG 데이터 정제 코드 변경 시, 모델 배포 태스크 다시 진행 태스크 내에서 조건 가장 최근 실행된 DAG에 대해서만 모델을 배포하도록 DAG 변경 가장 최근 데이터 세트로 학습된 모델 중 특정 버전만 배포 PythonOperator를 통해 배포 구현 → 배포 함수 내에서 DAG의 실행 날짜 명시 태스크 내에서 조건 구현하기 def _deploy(**context): if context["execution_date"] == ...: deploy_model() deploy = PythonOperator( task_id = "deploy_model", python_callable = _deploy, ) deploy_model 태..

[Airflow] 5.2 브랜치하기
2024. 3. 23. 13:50
Airflow
시스템 내에 변경사항이 있더라도 이전 시스템과 새로운 시스템을 모두 활용하여 과거 데이터를 계속 이용하기 예시: ERP 시스템 전환 후에도 판매 데이터 수집 작업 계속 진행 태스크 내에서 브랜치하기 해당 태스크를 다시 작성하는 방법 실행 날짜 기준으로 판매 데이터 수집 및 처리 코드를 2개로 분리 정제 태스크 안에서 브랜치하기 def _clean_sales(**context): if context["execution_date"] < ERP_CHANGE_DATE: _clean_sales_old(**context) else _clean_sales_new(**context) ... clean_sales_data = PythonOperator( task_id = "clean_sales", python_calla..

[Airflow] 5.1 기본 의존성 유형
2024. 3. 22. 23:29
Airflow
Airflow에서 작업 의존성 정의하기 태스크 의존성 패턴의 종류를 먼저 알아보자 선형 체인(linear chain): 연속적으로 실행되는 작업 팬아웃/팬인(fan-out/fan-in): 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대 선형 의존성 유형 이전의 예시는 주로 단일 선형 태스크 체인 이전 태스크의 결과 → 다음 태스크의 입력 다음 태스크로 이동 전에 각 태스크가 완료되어야 함 >>를 사용하여 태스크 간의 의존성 정의 태스크 간에 의존성 추가하기 download_launches >> get_pictures get_pictures >> notify download_launches >> get_pictures >> notify 작업의 의존성을 각각 또는 여러 개를 한 번에 설정 가능 ..

[Airflow] 4.1 Airflow로 처리할 데이터 검사하기
2024. 3. 19. 20:34
Airflow
예시: 감성 분석을 이용한 주식시장 예측 도구 StockSense 페이지 뷰 증가 = 긍정적인 감성 페이지 뷰 감소 = 부정적인 감성 긍정적인 감성의 경우 주가가 상승할 가능성이 높다는 것을 가정 증분 데이터를 적재하는 방법 결정하기 파이프라인 구축 전 접근 방식에 대한 기술적 계획이 중요 이후에 데이터를 다시 처리할 것인지 수신하는 데이터이 빈도, 크기, 형식, 소스 유형은 어떻게 되는지 데이터로 무엇을 구축할 것인지 일정한 URL 형식에 따라 데이터 일괄 다운로드 가능 파일 내용이 포함하는 요소 도메인 코드 페이지 제목 조회수 응답 크기(byte)

[Airflow] 3.5 과거 데이터 간격을 메꾸기 위해 백필 사용하기
2024. 3. 19. 19:58
Airflow
Airflow를 통해 스케줄 간격 정의 가능 임의의 시작 날짜로부터 정의 가능하므로, 과거 날짜로부터 과거 간격 정의도 가능 DAG의 과거 시점을 지정하여 실행 → 과거 데이터 세트 로드 또는 분석에 용이 이를 백필(backfilling)이라고 함 과거 시점의 작업 실행하기 Airflow는 아직 실행되지 않은 과거 스케줄 간격을 예약 및 실행 과거 시작 날짜 지정 후 해당 DAG 활성화 → 과거 시작 이후 현재 시간 이전의 모든 스케줄 간격 생성 DAG의 catchup 매개변수로 정의 catchup = False 시 비활성화 과거 시점의 태스크 실행을 피하기 위한 catchup 비활성화하기 dag = DAG( dag_id = "09_no_catchup", schedule_interval = "@daily..