Troubleshooting
Overview
This section provides solutions for common issues encountered when implementing and running the automated ingestion and schema mapping pipeline.
Common Issues
1. S3 and Airflow Issues
Issue: S3 Sensor Not Triggering
Symptoms:
DAG not starting when files are uploaded
S3KeySensor task stuck in running state
No detection of new files
Solutions:
# Check S3 connection in Airflow
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def test_s3_connection():
hook = S3Hook(aws_conn_id='aws_default')
try:
# Test bucket access
hook.list_keys(bucket_name='your-bucket', prefix='incoming/')
print("S3 connection successful")
except Exception as e:
print(f"S3 connection failed: {e}")
# Add to your DAG for debugging
test_connection = PythonOperator(
task_id='test_s3_connection',
python_callable=test_s3_connection,
dag=dag
)Checklist:
Issue: Schema Extraction Fails
Symptoms:
Task fails with pandas or CSV parsing errors
Memory issues with large files
Encoding problems
Solutions:
def robust_schema_extraction(**context):
"""Enhanced schema extraction with error handling"""
import pandas as pd
import chardet
s3_client = boto3.client('s3')
file_key = context['task_instance'].xcom_pull(
task_ids='wait_for_s3_file',
key='s3_file_key'
)
try:
# Download file with streaming for large files
response = s3_client.get_object(Bucket=S3_BUCKET, Key=file_key)
# Detect encoding
raw_data = response['Body'].read()
encoding = chardet.detect(raw_data)['encoding']
# Parse CSV with robust options
df = pd.read_csv(
io.BytesIO(raw_data),
encoding=encoding,
encoding_errors='replace',
dtype=str, # Read all as strings initially
nrows=1000, # Sample first 1000 rows for schema
na_values=['', 'NULL', 'null', 'N/A', 'n/a']
)
# Continue with schema extraction...
except pd.errors.EmptyDataError:
logging.error(f"File {file_key} is empty")
raise
except pd.errors.ParserError as e:
logging.error(f"CSV parsing error: {e}")
# Try alternative parsing
df = pd.read_csv(
io.BytesIO(raw_data),
sep=None, # Auto-detect separator
engine='python',
encoding=encoding
)
except MemoryError:
logging.error("Memory error - file too large")
# Use chunked processing
df = pd.read_csv(
io.BytesIO(raw_data),
chunksize=1000,
encoding=encoding
)
df = next(df) # Get first chunk for schema
return extract_schema_info(df, file_key)2. Chicory Agent Issues
Issue: Agent API Timeouts
Symptoms:
Requests timeout after 60 seconds
Agent responses are slow
Intermittent connectivity issues
Solutions:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_resilient_session():
"""Create HTTP session with retry logic"""
session = requests.Session()
# Retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "POST"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def call_chicory_agent_with_retry(schema_data, target_standards):
"""Call Chicory agent with retry logic"""
session = create_resilient_session()
headers = {
"Authorization": f"Bearer {CHICORY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"messages": [{"role": "user", "content": create_prompt(schema_data, target_standards)}],
"temperature": 0.1,
"max_tokens": 2000
}
try:
response = session.post(
"https://api.chicory.ai/v1/agents/schema_mapper_agent/chat",
headers=headers,
json=payload,
timeout=120 # Increased timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logging.error("Request timed out - try breaking down the schema")
raise
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
raiseIssue: Poor Quality Agent Responses
Symptoms:
Inconsistent mapping quality
Missing required fields in responses
Incorrect data type mappings
Solutions:
def validate_and_enhance_mapping(mapping, source_schema):
"""Validate and enhance agent-generated mapping"""
# Ensure all source columns are mapped
source_columns = {col['name'] for col in source_schema['columns']}
mapped_columns = {cm['source_column'] for cm in mapping['column_mappings']}
missing_columns = source_columns - mapped_columns
if missing_columns:
logging.warning(f"Missing mappings for columns: {missing_columns}")
# Add basic mappings for missing columns
for col_name in missing_columns:
source_col = next(col for col in source_schema['columns'] if col['name'] == col_name)
basic_mapping = {
'source_column': col_name,
'target_column': col_name.lower().replace(' ', '_'),
'source_type': source_col['dtype'],
'target_type': infer_target_type(source_col['dtype']),
'transformation': f'CAST({{{col_name}}} AS STRING)',
'confidence': 0.5
}
mapping['column_mappings'].append(basic_mapping)
# Validate data types
for col_mapping in mapping['column_mappings']:
if col_mapping['target_type'] not in VALID_TARGET_TYPES:
logging.warning(f"Invalid target type: {col_mapping['target_type']}")
col_mapping['target_type'] = 'STRING' # Default fallback
return mapping
def infer_target_type(source_type):
"""Infer target type from source type"""
type_mapping = {
'int64': 'INTEGER',
'float64': 'FLOAT',
'bool': 'BOOLEAN',
'datetime64[ns]': 'TIMESTAMP',
'object': 'STRING'
}
return type_mapping.get(source_type, 'STRING')3. GitHub Actions Issues
Issue: Workflow Not Triggering
Symptoms:
GitHub Actions workflow doesn't start
No workflow runs showing in GitHub
API calls fail with 404
Solutions:
Check Workflow File Location:
# Ensure workflow files are in correct location
ls -la .github/workflows/
# Should show: schema-mapping.yml, dbt-generation.ymlValidate Workflow Syntax:
# Use GitHub CLI to validate
gh workflow list
gh workflow view schema-mapping.ymlTest API Trigger:
import requests
def test_workflow_trigger():
"""Test GitHub workflow trigger API"""
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/workflows/schema-mapping.yml/dispatches"
headers = {
'Authorization': f'token {GITHUB_TOKEN}',
'Accept': 'application/vnd.github.v3+json'
}
payload = {
'ref': 'main',
'inputs': {
'source_system': 'test',
'table_name': 'test',
'schema_json': '{}',
's3_file_path': 'test'
}
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 204:
print("Workflow triggered successfully")
else:
print(f"Failed: {response.status_code} - {response.text}")
# Common issues and solutions
if response.status_code == 404:
print("Check: Repo name, workflow file name, branch name")
elif response.status_code == 422:
print("Check: Workflow inputs, branch exists")
elif response.status_code == 401:
print("Check: GitHub token permissions")
test_workflow_trigger()Issue: dbt Model Generation Fails
Symptoms:
dbt compilation errors
Invalid SQL syntax in generated models
Missing source references
Solutions:
def validate_generated_sql(sql_content, table_name):
"""Validate generated SQL syntax"""
import sqlparse
try:
# Parse SQL to check syntax
parsed = sqlparse.parse(sql_content)
if not parsed:
raise ValueError("Empty or invalid SQL")
# Check for required elements
sql_lower = sql_content.lower()
required_elements = [
'select',
'from',
'{{', # dbt syntax
table_name.lower()
]
missing_elements = [elem for elem in required_elements if elem not in sql_lower]
if missing_elements:
raise ValueError(f"Missing required elements: {missing_elements}")
return True
except Exception as e:
logging.error(f"SQL validation failed: {e}")
return False
def fix_common_sql_issues(sql_content):
"""Fix common SQL generation issues"""
fixes = [
# Fix missing spaces
(r'{{source\(', '{{ source('),
(r'\)}}', ') }}'),
# Fix column references
(r'{([^}]+)}', r'{{ \1 }}'),
# Fix data types
('VARCHAR', 'STRING'),
('INTEGER', 'INT64'),
# Fix common syntax errors
(',,', ','),
(',\n)', '\n)'),
]
for pattern, replacement in fixes:
sql_content = re.sub(pattern, replacement, sql_content)
return sql_content4. dbt Issues
Issue: dbt Models Don't Compile
Symptoms:
dbt compilefailsMissing dependencies
Invalid references
Solutions:
# dbt_project.yml - ensure proper configuration
name: 'analytics_dbt'
version: '1.0.0'
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
analytics_dbt:
auto_generated:
+materialized: table
+tags: ['auto-generated']
# Add to models/_sources.yml
sources:
- name: raw_data
description: Raw data from various source systems
tables:
- name: customers
description: Customer data from CRM
- name: orders
description: Order data from e-commerce platformDebugging Steps:
# Check dbt configuration
dbt debug
# Parse project without running
dbt parse
# Compile specific model
dbt compile --select dim_customer
# Check dependencies
dbt list --models +dim_customer
# Run with verbose output
dbt run --select dim_customer --full-refresh --debugIssue: Test Failures
Symptoms:
Data quality tests fail
Relationship tests fail
Custom tests error out
Solutions:
-- Create more robust tests
-- tests/generic/test_email_format.sql
{% test valid_email_format(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} is not null
and not regexp_contains({{ column_name }}, r'^[^@]+@[^@]+\.[^@]+$')
{% endtest %}
-- tests/generic/test_reasonable_date.sql
{% test reasonable_date_range(model, column_name, start_date='1900-01-01', end_date=None) %}
{% if end_date is none %}
{% set end_date = modules.datetime.date.today() %}
{% endif %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} < '{{ start_date }}'
or {{ column_name }} > '{{ end_date }}'
{% endtest %}5. Performance Issues
Issue: Slow Pipeline Execution
Symptoms:
Long processing times
Memory issues
Timeout errors
Solutions:
Optimize Airflow Configuration:
# In airflow.cfg
[core]
executor = CeleryExecutor # For parallel processing
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 16
[celery]
worker_concurrency = 16Implement Parallel Processing:
from concurrent.futures import ThreadPoolExecutor
import asyncio
async def process_multiple_files(file_list):
"""Process multiple files concurrently"""
async def process_single_file(file_path):
# Process individual file
return await extract_and_map_schema(file_path)
# Process files in parallel
tasks = [process_single_file(file_path) for file_path in file_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
return resultsOptimize Large File Processing:
def process_large_csv_in_chunks(file_path, chunk_size=10000):
"""Process large CSV files in chunks"""
chunk_schemas = []
for chunk_df in pd.read_csv(file_path, chunksize=chunk_size):
chunk_schema = extract_chunk_schema(chunk_df)
chunk_schemas.append(chunk_schema)
# Merge chunk schemas
consolidated_schema = merge_schemas(chunk_schemas)
return consolidated_schemaMonitoring and Alerting
1. Set Up Comprehensive Monitoring
# monitoring/pipeline_monitor.py
import logging
import boto3
from datetime import datetime, timedelta
def monitor_pipeline_health():
"""Monitor pipeline health and send alerts"""
checks = {
's3_file_processing': check_s3_processing_rate(),
'airflow_dag_success': check_airflow_dag_health(),
'github_actions': check_github_actions_health(),
'dbt_model_freshness': check_dbt_model_freshness(),
'chicory_agent_performance': check_agent_response_times()
}
failed_checks = {k: v for k, v in checks.items() if not v['status']}
if failed_checks:
send_alert(failed_checks)
return checks
def check_s3_processing_rate():
"""Check S3 file processing rate"""
s3_client = boto3.client('s3')
# Check for files older than 1 hour in incoming/
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET,
Prefix='incoming/'
)
old_files = []
cutoff_time = datetime.now() - timedelta(hours=1)
for obj in response.get('Contents', []):
if obj['LastModified'].replace(tzinfo=None) < cutoff_time:
old_files.append(obj['Key'])
return {
'status': len(old_files) == 0,
'message': f'{len(old_files)} files stuck in processing',
'details': old_files
}
def send_alert(failed_checks):
"""Send alert for failed checks"""
import json
# Send to Slack, email, or monitoring system
alert_message = {
'timestamp': datetime.now().isoformat(),
'alert_type': 'pipeline_health_check',
'failed_checks': failed_checks,
'runbook': 'https://docs.example.com/runbook/pipeline-troubleshooting'
}
# Example: Send to CloudWatch
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_data(
Namespace='Pipeline/Health',
MetricData=[
{
'MetricName': 'FailedChecks',
'Value': len(failed_checks),
'Unit': 'Count'
}
]
)2. Create Debugging Scripts
# debug/pipeline_debugger.py
def debug_pipeline_step(step_name, **kwargs):
"""Debug specific pipeline steps"""
debuggers = {
's3_detection': debug_s3_file_detection,
'schema_extraction': debug_schema_extraction,
'mapping_generation': debug_mapping_generation,
'dbt_generation': debug_dbt_generation
}
if step_name in debuggers:
return debuggers[step_name](**kwargs)
else:
raise ValueError(f"Unknown debug step: {step_name}")
def debug_s3_file_detection(bucket_name, prefix):
"""Debug S3 file detection issues"""
import boto3
s3_client = boto3.client('s3')
print(f"Debugging S3 detection for bucket: {bucket_name}, prefix: {prefix}")
try:
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix
)
if 'Contents' in response:
print(f"Found {len(response['Contents'])} objects")
for obj in response['Contents'][:10]: # Show first 10
print(f" - {obj['Key']} (Modified: {obj['LastModified']})")
else:
print("No objects found")
except Exception as e:
print(f"Error accessing S3: {e}")
print("Check: AWS credentials, bucket name, permissions")
# Usage: python debug/pipeline_debugger.py s3_detection --bucket=my-bucket --prefix=incoming/Emergency Procedures
1. Pipeline Failure Recovery
#!/bin/bash
# emergency/recover_pipeline.sh
echo "Starting pipeline recovery procedure..."
# 1. Stop all running workflows
echo "Stopping active workflows..."
gh workflow disable schema-mapping.yml
gh workflow disable dbt-generation.yml
# 2. Clear stuck files
echo "Moving stuck files..."
aws s3 mv s3://your-bucket/incoming/ s3://your-bucket/recovery/ --recursive
# 3. Reset Airflow DAG state
echo "Resetting Airflow DAG..."
airflow dags state-set automated_csv_ingestion SUCCESS $(date -d "1 hour ago" '+%Y-%m-%dT%H:%M:%S')
# 4. Validate system health
echo "Validating system health..."
python monitoring/pipeline_monitor.py
# 5. Re-enable workflows
echo "Re-enabling workflows..."
gh workflow enable schema-mapping.yml
gh workflow enable dbt-generation.yml
echo "Recovery complete. Monitor logs for issues."2. Data Quality Issues
# emergency/data_quality_fix.py
def fix_data_quality_issues(table_name):
"""Fix common data quality issues"""
fixes = [
fix_null_values,
fix_duplicate_records,
fix_data_type_issues,
fix_constraint_violations
]
for fix_function in fixes:
try:
fix_function(table_name)
print(f"Applied {fix_function.__name__} to {table_name}")
except Exception as e:
print(f"Failed to apply {fix_function.__name__}: {e}")
def fix_null_values(table_name):
"""Fix null values in critical columns"""
# Implementation depends on your data warehouse
pass
def fix_duplicate_records(table_name):
"""Remove duplicate records"""
# Implementation depends on your data warehouse
passGetting Help
1. Log Analysis
# Check Airflow logs
airflow logs list
airflow logs get automated_csv_ingestion extract_csv_schema 2024-01-15
# Check GitHub Actions logs
gh run list --workflow=schema-mapping.yml
gh run view <run_id>
# Check dbt logs
cat logs/dbt.log | grep ERROR2. Support Channels
Chicory AI Support: [email protected]
GitHub Issues: Create issue in your repository
Internal Documentation: Update troubleshooting docs with new solutions
3. Escalation Process
Level 1: Check common issues in this guide
Level 2: Run debugging scripts and check logs
Level 3: Contact system administrators
Level 4: Engage vendor support (Chicory, cloud providers)
This concludes the Automated Ingestion & Schema Mapping cookbook. For additional support, refer to the Chicory AI documentation or contact your system administrator.
Last updated