Airflow DAG Configuration
Overview
This section covers creating the Airflow DAG that monitors S3 for new CSV files, extracts their schemas, and triggers the GitHub Actions workflow for automated schema mapping.
DAG Structure
Our DAG will consist of:
S3 Key Sensor: Monitor for new CSV files
Schema Extraction: Analyze CSV structure and data types
GitHub Trigger: Start the schema mapping workflow
File Management: Move processed files to appropriate folders
DAG Implementation
1. Main DAG File
Create automated_ingestion_dag.py in your Airflow DAGs folder:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3MoveObjectOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import pandas as pd
import boto3
import json
import logging
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automated_csv_ingestion',
default_args=default_args,
description='Automated CSV ingestion and schema mapping',
schedule_interval=None, # Triggered by file sensor
catchup=False,
max_active_runs=1,
tags=['ingestion', 'schema-mapping', 'chicory']
)
# Configuration
S3_BUCKET = "{{ var.value.AWS_S3_BUCKET }}"
S3_PREFIX = "incoming/"
GITHUB_TOKEN = "{{ var.value.GITHUB_TOKEN }}"
GITHUB_REPO = "{{ var.value.GITHUB_REPO }}"
CHICORY_API_KEY = "{{ var.value.CHICORY_API_KEY }}"2. S3 File Sensor
3. Schema Extraction
4. GitHub Actions Trigger
5. File Management
6. Error Handling
7. Task Dependencies
DAG Configuration
1. Airflow Variables
Set up these Airflow variables in the UI or CLI:
2. Connections
Create AWS connection in Airflow:
Testing the DAG
1. Manual Testing
2. Integration Testing
Upload a test file and monitor DAG execution:
Monitoring and Alerting
1. CloudWatch Integration
2. Slack Notifications
Next: Chicory Agent Creation
Last updated