Airflow DAG Configuration

Overview

This section covers creating the Airflow DAG that monitors S3 for new CSV files, extracts their schemas, and triggers the GitHub Actions workflow for automated schema mapping.

DAG Structure

Our DAG will consist of:

  1. S3 Key Sensor: Monitor for new CSV files

  2. Schema Extraction: Analyze CSV structure and data types

  3. GitHub Trigger: Start the schema mapping workflow

  4. File Management: Move processed files to appropriate folders

DAG Implementation

1. Main DAG File

Create automated_ingestion_dag.py in your Airflow DAGs folder:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3MoveObjectOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import pandas as pd
import boto3
import json
import logging

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'automated_csv_ingestion',
    default_args=default_args,
    description='Automated CSV ingestion and schema mapping',
    schedule_interval=None,  # Triggered by file sensor
    catchup=False,
    max_active_runs=1,
    tags=['ingestion', 'schema-mapping', 'chicory']
)

# Configuration
S3_BUCKET = "{{ var.value.AWS_S3_BUCKET }}"
S3_PREFIX = "incoming/"
GITHUB_TOKEN = "{{ var.value.GITHUB_TOKEN }}"
GITHUB_REPO = "{{ var.value.GITHUB_REPO }}"
CHICORY_API_KEY = "{{ var.value.CHICORY_API_KEY }}"

2. S3 File Sensor

3. Schema Extraction

4. GitHub Actions Trigger

5. File Management

6. Error Handling

7. Task Dependencies

DAG Configuration

1. Airflow Variables

Set up these Airflow variables in the UI or CLI:

2. Connections

Create AWS connection in Airflow:

Testing the DAG

1. Manual Testing

2. Integration Testing

Upload a test file and monitor DAG execution:

Monitoring and Alerting

1. CloudWatch Integration

2. Slack Notifications


Next: Chicory Agent Creation

Last updated