Airflow DAG Configuration
Overview
This section covers creating the Airflow DAG that monitors S3 for new CSV files, extracts their schemas, and triggers the GitHub Actions workflow for automated schema mapping.
DAG Structure
Our DAG will consist of:
S3 Key Sensor: Monitor for new CSV files
Schema Extraction: Analyze CSV structure and data types
GitHub Trigger: Start the schema mapping workflow
File Management: Move processed files to appropriate folders
DAG Implementation
1. Main DAG File
Create automated_ingestion_dag.py in your Airflow DAGs folder:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3MoveObjectOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import pandas as pd
import boto3
import json
import logging
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automated_csv_ingestion',
default_args=default_args,
description='Automated CSV ingestion and schema mapping',
schedule_interval=None, # Triggered by file sensor
catchup=False,
max_active_runs=1,
tags=['ingestion', 'schema-mapping', 'chicory']
)
# Configuration
S3_BUCKET = "{{ var.value.AWS_S3_BUCKET }}"
S3_PREFIX = "incoming/"
GITHUB_TOKEN = "{{ var.value.GITHUB_TOKEN }}"
GITHUB_REPO = "{{ var.value.GITHUB_REPO }}"
CHICORY_API_KEY = "{{ var.value.CHICORY_API_KEY }}"2. S3 File Sensor
def get_latest_s3_file(**context):
"""Find the latest CSV file in the S3 bucket"""
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET,
Prefix=S3_PREFIX
)
if 'Contents' not in response:
raise ValueError("No files found in S3 bucket")
# Filter for CSV files and get the latest
csv_files = [
obj for obj in response['Contents']
if obj['Key'].endswith('.csv')
]
if not csv_files:
raise ValueError("No CSV files found in S3 bucket")
latest_file = max(csv_files, key=lambda x: x['LastModified'])
# Store file info in XCom
context['task_instance'].xcom_push(
key='s3_file_key',
value=latest_file['Key']
)
return latest_file['Key']
wait_for_file = PythonOperator(
task_id='wait_for_s3_file',
python_callable=get_latest_s3_file,
dag=dag
)3. Schema Extraction
def extract_csv_schema(**context):
"""Extract schema information from the CSV file"""
s3_client = boto3.client('s3')
# Get file key from previous task
file_key = context['task_instance'].xcom_pull(
task_ids='wait_for_s3_file',
key='s3_file_key'
)
# Parse filename for metadata
filename = file_key.split('/')[-1]
parts = filename.replace('.csv', '').split('_')
source_system = parts[0] if len(parts) > 0 else 'unknown'
table_name = parts[1] if len(parts) > 1 else 'unknown'
try:
# Download and analyze CSV
response = s3_client.get_object(Bucket=S3_BUCKET, Key=file_key)
df = pd.read_csv(response['Body'])
# Extract schema information
schema_info = {
'source_system': source_system,
'table_name': table_name,
'filename': filename,
'file_path': file_key,
'row_count': len(df),
'columns': []
}
for col in df.columns:
col_info = {
'name': col,
'dtype': str(df[col].dtype),
'null_count': int(df[col].isnull().sum()),
'null_percentage': float(df[col].isnull().sum() / len(df) * 100),
'unique_count': int(df[col].nunique()),
'sample_values': df[col].dropna().head(5).tolist()
}
# Infer semantic type
if 'id' in col.lower():
col_info['semantic_type'] = 'identifier'
elif 'email' in col.lower():
col_info['semantic_type'] = 'email'
elif 'phone' in col.lower():
col_info['semantic_type'] = 'phone'
elif 'date' in col.lower() or 'time' in col.lower():
col_info['semantic_type'] = 'temporal'
elif df[col].dtype in ['float64', 'int64']:
col_info['semantic_type'] = 'numeric'
else:
col_info['semantic_type'] = 'text'
schema_info['columns'].append(col_info)
# Save schema to S3
schema_key = f"schemas/{source_system}_{table_name}_schema.json"
s3_client.put_object(
Bucket=S3_BUCKET,
Key=schema_key,
Body=json.dumps(schema_info, indent=2),
ContentType='application/json'
)
# Store in XCom for next task
context['task_instance'].xcom_push(
key='schema_info',
value=schema_info
)
logging.info(f"Schema extracted for {filename}: {len(schema_info['columns'])} columns")
return schema_info
except Exception as e:
logging.error(f"Failed to extract schema from {filename}: {str(e)}")
raise
extract_schema = PythonOperator(
task_id='extract_csv_schema',
python_callable=extract_csv_schema,
dag=dag
)4. GitHub Actions Trigger
def trigger_github_workflow(**context):
"""Trigger GitHub Actions workflow with schema information"""
import requests
schema_info = context['task_instance'].xcom_pull(
task_ids='extract_csv_schema',
key='schema_info'
)
# GitHub workflow dispatch payload
payload = {
'ref': 'main',
'inputs': {
'source_system': schema_info['source_system'],
'table_name': schema_info['table_name'],
'schema_json': json.dumps(schema_info),
's3_file_path': schema_info['file_path']
}
}
# Trigger workflow
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/workflows/schema-mapping.yml/dispatches"
headers = {
'Authorization': f'token {GITHUB_TOKEN}',
'Accept': 'application/vnd.github.v3+json',
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 204:
logging.info(f"Successfully triggered GitHub workflow for {schema_info['filename']}")
return "SUCCESS"
else:
logging.error(f"Failed to trigger GitHub workflow: {response.status_code} - {response.text}")
raise Exception(f"GitHub workflow trigger failed: {response.status_code}")
trigger_workflow = PythonOperator(
task_id='trigger_github_workflow',
python_callable=trigger_github_workflow,
dag=dag
)5. File Management
def move_processed_file(**context):
"""Move processed file to processed folder"""
file_key = context['task_instance'].xcom_pull(
task_ids='wait_for_s3_file',
key='s3_file_key'
)
# Generate new key in processed folder
filename = file_key.split('/')[-1]
processed_key = f"processed/{filename}"
s3_client = boto3.client('s3')
# Copy file to processed folder
s3_client.copy_object(
Bucket=S3_BUCKET,
CopySource={'Bucket': S3_BUCKET, 'Key': file_key},
Key=processed_key
)
# Delete original file
s3_client.delete_object(Bucket=S3_BUCKET, Key=file_key)
logging.info(f"Moved {file_key} to {processed_key}")
return processed_key
move_file = PythonOperator(
task_id='move_processed_file',
python_callable=move_processed_file,
dag=dag
)6. Error Handling
def handle_failure(**context):
"""Handle task failures by moving file to failed folder"""
try:
file_key = context['task_instance'].xcom_pull(
task_ids='wait_for_s3_file',
key='s3_file_key'
)
if file_key:
filename = file_key.split('/')[-1]
failed_key = f"failed/{filename}"
s3_client = boto3.client('s3')
# Copy to failed folder
s3_client.copy_object(
Bucket=S3_BUCKET,
CopySource={'Bucket': S3_BUCKET, 'Key': file_key},
Key=failed_key
)
# Delete original
s3_client.delete_object(Bucket=S3_BUCKET, Key=file_key)
logging.info(f"Moved failed file {file_key} to {failed_key}")
except Exception as e:
logging.error(f"Failed to handle error cleanup: {str(e)}")
failure_handler = PythonOperator(
task_id='handle_failure',
python_callable=handle_failure,
trigger_rule='one_failed',
dag=dag
)7. Task Dependencies
# Define task dependencies
wait_for_file >> extract_schema >> trigger_workflow >> move_file
[wait_for_file, extract_schema, trigger_workflow] >> failure_handlerDAG Configuration
1. Airflow Variables
Set up these Airflow variables in the UI or CLI:
airflow variables set AWS_S3_BUCKET "your-data-ingestion-bucket"
airflow variables set GITHUB_TOKEN "ghp_your_github_token"
airflow variables set GITHUB_REPO "your-org/your-dbt-repo"
airflow variables set CHICORY_API_KEY "your_chicory_api_key"2. Connections
Create AWS connection in Airflow:
airflow connections add 'aws_default' \
--conn-type 'aws' \
--conn-login 'your_access_key' \
--conn-password 'your_secret_key' \
--conn-extra '{"region_name": "us-east-1"}'Testing the DAG
1. Manual Testing
# Test functions individually
from airflow.models import DagBag
dag_bag = DagBag()
dag = dag_bag.get_dag('automated_csv_ingestion')
# Test schema extraction
task = dag.get_task('extract_csv_schema')
# ... test execution2. Integration Testing
Upload a test file and monitor DAG execution:
# Upload test file
aws s3 cp test_data.csv s3://your-data-ingestion-bucket/incoming/
# Monitor DAG
airflow dags state automated_csv_ingestionMonitoring and Alerting
1. CloudWatch Integration
# Add CloudWatch metrics
import boto3
def publish_metrics(**context):
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_data(
Namespace='Airflow/Ingestion',
MetricData=[
{
'MetricName': 'FileProcessed',
'Value': 1,
'Unit': 'Count'
}
]
)
metrics_task = PythonOperator(
task_id='publish_metrics',
python_callable=publish_metrics,
dag=dag
)2. Slack Notifications
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
slack_alert = SlackWebhookOperator(
task_id='slack_notification',
http_conn_id='slack_default',
message='New CSV file processed successfully: {{ ti.xcom_pull(key="s3_file_key") }}',
dag=dag
)Next: Chicory Agent Creation
Last updated