Airflow DAG Configuration
Overview
DAG Structure
DAG Implementation
1. Main DAG File
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3MoveObjectOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import pandas as pd
import boto3
import json
import logging
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automated_csv_ingestion',
default_args=default_args,
description='Automated CSV ingestion and schema mapping',
schedule_interval=None, # Triggered by file sensor
catchup=False,
max_active_runs=1,
tags=['ingestion', 'schema-mapping', 'chicory']
)
# Configuration
S3_BUCKET = "{{ var.value.AWS_S3_BUCKET }}"
S3_PREFIX = "incoming/"
GITHUB_TOKEN = "{{ var.value.GITHUB_TOKEN }}"
GITHUB_REPO = "{{ var.value.GITHUB_REPO }}"
CHICORY_API_KEY = "{{ var.value.CHICORY_API_KEY }}"2. S3 File Sensor
3. Schema Extraction
4. GitHub Actions Trigger
5. File Management
6. Error Handling
7. Task Dependencies
DAG Configuration
1. Airflow Variables
2. Connections
Testing the DAG
1. Manual Testing
2. Integration Testing
Monitoring and Alerting
1. CloudWatch Integration
2. Slack Notifications
Last updated