GitHub Actions Workflow

Overview

This section covers setting up GitHub Actions workflows to automate the schema mapping and dbt model generation process. We'll create two workflows:

  1. Schema Mapping Workflow: Triggered by Airflow, creates mapping and raises PR

  2. dbt Generation Workflow: Triggered by PR merge, generates dbt artifacts

Schema Mapping Workflow

1. Workflow Configuration

Create .github/workflows/schema-mapping.yml:

name: Automated Schema Mapping

on:
  workflow_dispatch:
    inputs:
      source_system:
        description: 'Source system name'
        required: true
        type: string
      table_name:
        description: 'Source table name'
        required: true
        type: string
      schema_json:
        description: 'JSON schema information'
        required: true
        type: string
      s3_file_path:
        description: 'S3 file path'
        required: true
        type: string

env:
  CHICORY_API_KEY: ${{ secrets.CHICORY_API_KEY }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
  AWS_REGION: us-east-1

jobs:
  schema-mapping:
    runs-on: ubuntu-latest
    permissions:
      contents: write
      pull-requests: write

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4
        with:
          token: ${{ secrets.GITHUB_TOKEN }}

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'

      - name: Install dependencies
        run: |
          pip install requests boto3 pyyaml

      - name: Parse schema information
        id: parse-schema
        run: |
          python -c "
          import json
          import os

          schema_data = json.loads('${{ github.event.inputs.schema_json }}')

          # Set outputs
          with open(os.environ['GITHUB_OUTPUT'], 'a') as f:
              f.write(f'source_system=${{ github.event.inputs.source_system }}\\n')
              f.write(f'table_name=${{ github.event.inputs.table_name }}\\n')
              f.write(f'row_count={schema_data.get(\"row_count\", 0)}\\n')
              f.write(f'column_count={len(schema_data.get(\"columns\", []))}\\n')
          "

      - name: Generate schema mapping
        id: generate-mapping
        run: |
          python scripts/generate_schema_mapping.py \
            --source-schema '${{ github.event.inputs.schema_json }}' \
            --source-system '${{ github.event.inputs.source_system }}' \
            --table-name '${{ github.event.inputs.table_name }}' \
            --output-file mapping_result.json

      - name: Validate mapping result
        run: |
          python scripts/validate_mapping.py mapping_result.json

      - name: Create mapping directory
        run: |
          mkdir -p mappings/${{ steps.parse-schema.outputs.source_system }}
          cp mapping_result.json mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.json

      - name: Generate mapping documentation
        run: |
          python scripts/generate_mapping_docs.py \
            --mapping-file mapping_result.json \
            --output-file mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.md

      - name: Create Pull Request
        uses: peter-evans/create-pull-request@v5
        with:
          token: ${{ secrets.GITHUB_TOKEN }}
          commit-message: |
            Add schema mapping for ${{ steps.parse-schema.outputs.source_system }}.${{ steps.parse-schema.outputs.table_name }}

            - Source: ${{ github.event.inputs.s3_file_path }}
            - Columns: ${{ steps.parse-schema.outputs.column_count }}
            - Rows: ${{ steps.parse-schema.outputs.row_count }}

            Auto-generated by Chicory AI schema mapping agent.
          title: 'Schema Mapping: ${{ steps.parse-schema.outputs.source_system }}.${{ steps.parse-schema.outputs.table_name }}'
          body: |
            ## Schema Mapping Summary

            **Source System:** ${{ steps.parse-schema.outputs.source_system }}
            **Table Name:** ${{ steps.parse-schema.outputs.table_name }}
            **S3 File:** `${{ github.event.inputs.s3_file_path }}`
            **Columns:** ${{ steps.parse-schema.outputs.column_count }}
            **Rows:** ${{ steps.parse-schema.outputs.row_count }}

            ### Changes
            - ✅ Generated schema mapping configuration
            - ✅ Created mapping documentation
            - ✅ Validated mapping structure

            ### Next Steps
            1. Review the generated mapping in `mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.json`
            2. Validate business logic and transformations
            3. Merge this PR to trigger dbt model generation

            ### Mapping Overview
            ```json
            $(cat mapping_result.json | jq '.mapping_metadata + {column_count: .column_mappings | length}')
            ```

            ---
            🤖 *This PR was automatically created by the Chicory AI schema mapping workflow.*
          branch: feature/schema-mapping-${{ steps.parse-schema.outputs.source_system }}-${{ steps.parse-schema.outputs.table_name }}
          delete-branch: true
          labels: |
            automated
            schema-mapping
            chicory-ai

2. Schema Mapping Script

Create scripts/generate_schema_mapping.py:

#!/usr/bin/env python3
"""
Generate schema mapping using Chicory AI agent
"""

import argparse
import json
import requests
import logging
import os
from typing import Dict, Any

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_target_standards() -> Dict[str, Any]:
    """Load target schema standards from configuration"""

    standards_file = os.path.join(os.path.dirname(__file__), 'target_standards.json')

    if os.path.exists(standards_file):
        with open(standards_file, 'r') as f:
            return json.load(f)

    # Default standards if file doesn't exist
    return {
        "naming_conventions": {
            "table_prefix": {
                "dimension": "dim_",
                "fact": "fact_",
                "staging": "stg_"
            },
            "column_case": "snake_case"
        },
        "data_types": {
            "string_default": "STRING",
            "integer_default": "INTEGER",
            "decimal_default": "NUMERIC(15,2)",
            "date_default": "DATE",
            "timestamp_default": "TIMESTAMP"
        }
    }

def call_chicory_agent(schema_data: Dict[str, Any], target_standards: Dict[str, Any]) -> Dict[str, Any]:
    """Call Chicory schema mapping agent"""

    api_key = os.environ.get('CHICORY_API_KEY')
    if not api_key:
        raise ValueError("CHICORY_API_KEY environment variable not set")

    # Agent configuration
    agent_endpoint = "https://api.chicory.ai/v1/agents/schema_mapper_agent/chat"

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    # Prepare the prompt
    prompt = f"""
    Map the following source schema to our target standards:

    SOURCE SCHEMA:
    {json.dumps(schema_data, indent=2)}

    TARGET STANDARDS:
    {json.dumps(target_standards, indent=2)}

    Generate a comprehensive schema mapping following dimensional modeling best practices.
    """

    payload = {
        "messages": [
            {
                "role": "user",
                "content": prompt
            }
        ],
        "temperature": 0.1,
        "max_tokens": 2000
    }

    logger.info("Calling Chicory schema mapping agent...")

    try:
        response = requests.post(agent_endpoint, headers=headers, json=payload, timeout=60)
        response.raise_for_status()

        result = response.json()
        content = result['choices'][0]['message']['content']

        # Parse JSON from response
        mapping = parse_mapping_from_response(content)

        logger.info(f"Successfully generated mapping for {mapping.get('mapping_metadata', {}).get('target_table', 'unknown table')}")
        return mapping

    except requests.exceptions.RequestException as e:
        logger.error(f"API request failed: {e}")
        raise
    except json.JSONDecodeError as e:
        logger.error(f"Failed to parse JSON response: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

def parse_mapping_from_response(content: str) -> Dict[str, Any]:
    """Extract JSON mapping from agent response"""
    import re

    # Look for JSON code block
    json_match = re.search(r'```json\n(.*?)\n```', content, re.DOTALL)
    if json_match:
        return json.loads(json_match.group(1))

    # Look for plain JSON object
    json_match = re.search(r'\{.*\}', content, re.DOTALL)
    if json_match:
        return json.loads(json_match.group(0))

    # If no JSON found, create a basic mapping
    logger.warning("Could not parse JSON from response, creating basic mapping")
    return create_fallback_mapping(content)

def create_fallback_mapping(content: str) -> Dict[str, Any]:
    """Create a fallback mapping structure if parsing fails"""
    return {
        "mapping_metadata": {
            "mapping_version": "1.0",
            "created_at": "2024-01-15T10:30:00Z",
            "mapping_confidence": 0.5,
            "notes": "Fallback mapping created due to parsing error"
        },
        "column_mappings": [],
        "recommendations": {
            "parsing_error": content[:500]
        }
    }

def validate_mapping(mapping: Dict[str, Any]) -> bool:
    """Validate the generated mapping structure"""
    required_keys = ['mapping_metadata', 'column_mappings']

    for key in required_keys:
        if key not in mapping:
            logger.error(f"Missing required key: {key}")
            return False

    # Validate column mappings structure
    for col_mapping in mapping.get('column_mappings', []):
        required_col_keys = ['source_column', 'target_column', 'source_type', 'target_type']
        for key in required_col_keys:
            if key not in col_mapping:
                logger.warning(f"Missing column mapping key: {key}")

    logger.info("Mapping validation passed")
    return True

def main():
    parser = argparse.ArgumentParser(description='Generate schema mapping using Chicory AI')
    parser.add_argument('--source-schema', required=True, help='Source schema JSON string')
    parser.add_argument('--source-system', required=True, help='Source system name')
    parser.add_argument('--table-name', required=True, help='Source table name')
    parser.add_argument('--output-file', required=True, help='Output mapping file')

    args = parser.parse_args()

    try:
        # Parse source schema
        source_schema = json.loads(args.source_schema)

        # Load target standards
        target_standards = load_target_standards()

        # Generate mapping
        mapping = call_chicory_agent(source_schema, target_standards)

        # Add metadata
        mapping['mapping_metadata'].update({
            'source_system': args.source_system,
            'source_table': args.table_name,
            'generated_by': 'chicory-ai-github-action'
        })

        # Validate mapping
        if not validate_mapping(mapping):
            raise ValueError("Generated mapping failed validation")

        # Save mapping
        with open(args.output_file, 'w') as f:
            json.dump(mapping, f, indent=2)

        logger.info(f"Schema mapping saved to {args.output_file}")

    except Exception as e:
        logger.error(f"Schema mapping generation failed: {e}")
        exit(1)

if __name__ == "__main__":
    main()

3. Validation Script

Create scripts/validate_mapping.py:

#!/usr/bin/env python3
"""
Validate generated schema mapping
"""

import argparse
import json
import logging
import sys

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def validate_mapping_structure(mapping: dict) -> list:
    """Validate the structure of the mapping"""
    issues = []

    # Required top-level keys
    required_keys = ['mapping_metadata', 'column_mappings']
    for key in required_keys:
        if key not in mapping:
            issues.append(f"Missing required key: {key}")

    # Validate metadata
    if 'mapping_metadata' in mapping:
        metadata = mapping['mapping_metadata']
        required_metadata = ['source_table', 'target_table', 'mapping_version']

        for key in required_metadata:
            if key not in metadata:
                issues.append(f"Missing metadata key: {key}")

    # Validate column mappings
    if 'column_mappings' in mapping:
        for i, col_mapping in enumerate(mapping['column_mappings']):
            required_col_keys = ['source_column', 'target_column', 'source_type', 'target_type']

            for key in required_col_keys:
                if key not in col_mapping:
                    issues.append(f"Column mapping {i}: Missing key {key}")

    return issues

def validate_naming_conventions(mapping: dict) -> list:
    """Validate naming conventions in the mapping"""
    issues = []

    if 'column_mappings' not in mapping:
        return issues

    for i, col_mapping in enumerate(mapping['column_mappings']):
        target_column = col_mapping.get('target_column', '')

        # Check snake_case
        if not target_column.islower() or ' ' in target_column:
            issues.append(f"Column mapping {i}: target_column '{target_column}' should use snake_case")

        # Check for reserved words (basic check)
        reserved_words = ['select', 'from', 'where', 'order', 'group']
        if target_column.lower() in reserved_words:
            issues.append(f"Column mapping {i}: target_column '{target_column}' is a reserved word")

    return issues

def validate_data_types(mapping: dict) -> list:
    """Validate data type mappings"""
    issues = []

    valid_target_types = [
        'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'DATE', 'TIMESTAMP',
        'NUMERIC', 'JSON', 'ARRAY', 'STRUCT'
    ]

    if 'column_mappings' not in mapping:
        return issues

    for i, col_mapping in enumerate(mapping['column_mappings']):
        target_type = col_mapping.get('target_type', '')

        # Check if target type is valid (basic validation)
        base_type = target_type.split('(')[0]  # Handle NUMERIC(15,2) format
        if base_type not in valid_target_types:
            issues.append(f"Column mapping {i}: Unknown target_type '{target_type}'")

    return issues

def main():
    parser = argparse.ArgumentParser(description='Validate schema mapping')
    parser.add_argument('mapping_file', help='Path to mapping JSON file')

    args = parser.parse_args()

    try:
        # Load mapping
        with open(args.mapping_file, 'r') as f:
            mapping = json.load(f)

        # Run validations
        all_issues = []

        structure_issues = validate_mapping_structure(mapping)
        naming_issues = validate_naming_conventions(mapping)
        datatype_issues = validate_data_types(mapping)

        all_issues.extend(structure_issues)
        all_issues.extend(naming_issues)
        all_issues.extend(datatype_issues)

        # Report results
        if all_issues:
            logger.error(f"Validation failed with {len(all_issues)} issues:")
            for issue in all_issues:
                logger.error(f"  - {issue}")
            sys.exit(1)
        else:
            logger.info("Validation passed successfully")

            # Print summary
            metadata = mapping.get('mapping_metadata', {})
            columns = len(mapping.get('column_mappings', []))

            logger.info(f"Mapping Summary:")
            logger.info(f"  Source: {metadata.get('source_table', 'Unknown')}")
            logger.info(f"  Target: {metadata.get('target_table', 'Unknown')}")
            logger.info(f"  Columns: {columns}")
            logger.info(f"  Confidence: {metadata.get('mapping_confidence', 'Unknown')}")

    except FileNotFoundError:
        logger.error(f"Mapping file not found: {args.mapping_file}")
        sys.exit(1)
    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON in mapping file: {e}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"Validation error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

dbt Generation Workflow

1. Workflow Configuration

Create .github/workflows/dbt-generation.yml:

name: Generate dbt Models

on:
  pull_request:
    types: [closed]
    branches: [main]
    paths:
      - 'mappings/**/*_mapping.json'

env:
  CHICORY_API_KEY: ${{ secrets.CHICORY_API_KEY }}

jobs:
  generate-dbt-models:
    if: github.event.pull_request.merged == true
    runs-on: ubuntu-latest
    permissions:
      contents: write
      pull-requests: write

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4
        with:
          token: ${{ secrets.GITHUB_TOKEN }}

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'

      - name: Install dependencies
        run: |
          pip install requests pyyaml dbt-core dbt-bigquery

      - name: Detect changed mapping files
        id: detect-changes
        run: |
          # Get changed files from the merged PR
          CHANGED_FILES=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/files \
            --jq '.[] | select(.filename | test("mappings/.*_mapping\\.json$")) | .filename')

          if [ -z "$CHANGED_FILES" ]; then
            echo "No mapping files changed"
            echo "has_changes=false" >> $GITHUB_OUTPUT
            exit 0
          fi

          echo "Changed mapping files:"
          echo "$CHANGED_FILES"
          echo "has_changes=true" >> $GITHUB_OUTPUT
          echo "mapping_files<<EOF" >> $GITHUB_OUTPUT
          echo "$CHANGED_FILES" >> $GITHUB_OUTPUT
          echo "EOF" >> $GITHUB_OUTPUT
        env:
          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

      - name: Generate dbt artifacts
        if: steps.detect-changes.outputs.has_changes == 'true'
        run: |
          while IFS= read -r mapping_file; do
            if [ -n "$mapping_file" ]; then
              echo "Processing: $mapping_file"
              python scripts/generate_dbt_artifacts.py \
                --mapping-file "$mapping_file" \
                --output-dir models/
            fi
          done <<< "${{ steps.detect-changes.outputs.mapping_files }}"

      - name: Validate generated dbt models
        if: steps.detect-changes.outputs.has_changes == 'true'
        run: |
          # Parse dbt models for syntax
          dbt parse --project-dir . --profiles-dir profiles/

      - name: Run dbt tests on generated models
        if: steps.detect-changes.outputs.has_changes == 'true'
        continue-on-error: true
        run: |
          dbt test --select +tag:auto-generated --project-dir . --profiles-dir profiles/

      - name: Create Pull Request
        if: steps.detect-changes.outputs.has_changes == 'true'
        uses: peter-evans/create-pull-request@v5
        with:
          token: ${{ secrets.GITHUB_TOKEN }}
          commit-message: |
            Generate dbt models from schema mappings

            Auto-generated dbt models and documentation from approved schema mappings.

            Files generated:
            ${{ steps.detect-changes.outputs.mapping_files }}
          title: 'dbt Models: Auto-generated from Schema Mappings'
          body: |
            ## dbt Model Generation

            This PR contains auto-generated dbt models and documentation based on the recently merged schema mappings.

            ### Generated Files
            ${{ steps.detect-changes.outputs.mapping_files }}

            ### Validation Results
            - ✅ dbt syntax validation passed
            - ✅ Model compilation successful
            - ⚠️ Tests may require review and adjustment

            ### Next Steps
            1. Review generated SQL for business logic accuracy
            2. Validate column descriptions and documentation
            3. Adjust any failing tests
            4. Merge to deploy new models

            ---
            🤖 *This PR was automatically created by the Chicory AI dbt generation workflow.*
          branch: feature/dbt-models-auto-generated
          delete-branch: true
          labels: |
            automated
            dbt-models
            chicory-ai

2. dbt Generation Script

Create scripts/generate_dbt_artifacts.py:

#!/usr/bin/env python3
"""
Generate dbt models and YAML documentation using Chicory AI agent
"""

import argparse
import json
import requests
import logging
import os
import re
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def call_chicory_dbt_agent(mapping: dict) -> dict:
    """Call Chicory dbt generation agent"""

    api_key = os.environ.get('CHICORY_API_KEY')
    if not api_key:
        raise ValueError("CHICORY_API_KEY environment variable not set")

    agent_endpoint = "https://api.chicory.ai/v1/agents/dbt_generator_agent/chat"

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    # Load dbt project configuration
    dbt_project_config = load_dbt_project_config()

    prompt = f"""
    Generate dbt model artifacts for the following schema mapping:

    SCHEMA MAPPING:
    {json.dumps(mapping, indent=2)}

    DBT PROJECT CONFIG:
    {json.dumps(dbt_project_config, indent=2)}

    Please generate:
    1. Complete dbt model SQL file
    2. Comprehensive YAML documentation with tests
    3. Any necessary macros or additional files

    Follow dbt best practices and ensure proper documentation, tests, and transformations.
    """

    payload = {
        "messages": [
            {
                "role": "user",
                "content": prompt
            }
        ],
        "temperature": 0.1,
        "max_tokens": 3000
    }

    logger.info("Calling Chicory dbt generation agent...")

    try:
        response = requests.post(agent_endpoint, headers=headers, json=payload, timeout=120)
        response.raise_for_status()

        result = response.json()
        content = result['choices'][0]['message']['content']

        # Parse artifacts from response
        artifacts = parse_dbt_artifacts(content)

        logger.info(f"Successfully generated dbt artifacts")
        return artifacts

    except Exception as e:
        logger.error(f"dbt generation failed: {e}")
        raise

def load_dbt_project_config() -> dict:
    """Load dbt project configuration"""

    config_file = 'dbt_project.yml'

    if os.path.exists(config_file):
        import yaml
        with open(config_file, 'r') as f:
            return yaml.safe_load(f)

    # Default configuration
    return {
        "name": "analytics_dbt",
        "version": "1.0.0",
        "profile": "analytics",
        "model-paths": ["models"],
        "source-paths": ["models"],
        "test-paths": ["tests"]
    }

def parse_dbt_artifacts(content: str) -> dict:
    """Extract dbt artifacts from agent response"""

    artifacts = {}

    # Extract SQL model
    sql_pattern = r'```sql\n(.*?)\n```'
    sql_match = re.search(sql_pattern, content, re.DOTALL | re.IGNORECASE)
    if sql_match:
        artifacts['sql_model'] = sql_match.group(1).strip()

    # Extract YAML documentation
    yaml_pattern = r'```yaml\n(.*?)\n```'
    yaml_match = re.search(yaml_pattern, content, re.DOTALL | re.IGNORECASE)
    if yaml_match:
        artifacts['yaml_doc'] = yaml_match.group(1).strip()

    # Extract macros if present
    macro_pattern = r'```macro\n(.*?)\n```'
    macro_match = re.search(macro_pattern, content, re.DOTALL | re.IGNORECASE)
    if macro_match:
        artifacts['macro'] = macro_match.group(1).strip()

    return artifacts

def write_dbt_files(artifacts: dict, mapping: dict, output_dir: str):
    """Write generated dbt files to disk"""

    metadata = mapping.get('mapping_metadata', {})
    target_table = metadata.get('target_table', 'unknown_table')

    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    # Write SQL model
    if 'sql_model' in artifacts:
        model_file = output_path / f"{target_table}.sql"
        with open(model_file, 'w') as f:
            f.write(artifacts['sql_model'])
        logger.info(f"Created dbt model: {model_file}")

    # Write YAML documentation
    if 'yaml_doc' in artifacts:
        yaml_file = output_path / f"_{target_table}.yml"
        with open(yaml_file, 'w') as f:
            f.write(artifacts['yaml_doc'])
        logger.info(f"Created YAML documentation: {yaml_file}")

    # Write macro if present
    if 'macro' in artifacts:
        macro_dir = Path("macros")
        macro_dir.mkdir(exist_ok=True)
        macro_file = macro_dir / f"{target_table}_macros.sql"
        with open(macro_file, 'w') as f:
            f.write(artifacts['macro'])
        logger.info(f"Created macro: {macro_file}")

def main():
    parser = argparse.ArgumentParser(description='Generate dbt artifacts using Chicory AI')
    parser.add_argument('--mapping-file', required=True, help='Schema mapping JSON file')
    parser.add_argument('--output-dir', required=True, help='Output directory for dbt files')

    args = parser.parse_args()

    try:
        # Load mapping
        with open(args.mapping_file, 'r') as f:
            mapping = json.load(f)

        # Generate artifacts
        artifacts = call_chicory_dbt_agent(mapping)

        # Write files
        write_dbt_files(artifacts, mapping, args.output_dir)

        logger.info("dbt artifacts generated successfully")

    except Exception as e:
        logger.error(f"dbt generation failed: {e}")
        exit(1)

if __name__ == "__main__":
    main()

Repository Setup

1. GitHub Secrets

Configure these secrets in your GitHub repository:

# Required secrets
CHICORY_API_KEY=your_chicory_api_key
AWS_ACCESS_KEY_ID=your_aws_access_key
AWS_SECRET_ACCESS_KEY=your_aws_secret_key

2. Target Standards Configuration

Create scripts/target_standards.json:

{
  "naming_conventions": {
    "table_prefix": {
      "dimension": "dim_",
      "fact": "fact_",
      "staging": "stg_",
      "intermediate": "int_"
    },
    "column_case": "snake_case",
    "reserved_suffixes": {
      "primary_key": "_sk",
      "business_key": "_bk",
      "foreign_key": "_fk",
      "date": "_date",
      "timestamp": "_ts"
    }
  },
  "data_types": {
    "string_default": "STRING",
    "integer_default": "INTEGER",
    "decimal_default": "NUMERIC(15,2)",
    "date_default": "DATE",
    "timestamp_default": "TIMESTAMP",
    "boolean_default": "BOOLEAN"
  },
  "standard_columns": {
    "audit_columns": [
      {"name": "created_at", "type": "TIMESTAMP"},
      {"name": "updated_at", "type": "TIMESTAMP"},
      {"name": "is_active", "type": "BOOLEAN"}
    ]
  }
}

3. Directory Structure

Ensure your repository has the required structure:

your-dbt-repo/
├── .github/workflows/
│   ├── schema-mapping.yml
│   └── dbt-generation.yml
├── scripts/
│   ├── generate_schema_mapping.py
│   ├── validate_mapping.py
│   ├── generate_dbt_artifacts.py
│   └── target_standards.json
├── mappings/
│   └── [auto-generated mapping files]
├── models/
│   └── [auto-generated dbt models]
├── dbt_project.yml
└── README.md

Testing GitHub Actions

1. Local Testing

Test the scripts locally before deploying:

# Test schema mapping
python scripts/generate_schema_mapping.py \
  --source-schema '{"table_name": "test", "columns": [...]}' \
  --source-system "test_system" \
  --table-name "test_table" \
  --output-file test_mapping.json

# Test validation
python scripts/validate_mapping.py test_mapping.json

# Test dbt generation
python scripts/generate_dbt_artifacts.py \
  --mapping-file test_mapping.json \
  --output-dir test_output/

2. Workflow Testing

Use workflow dispatch to test:

# Trigger schema mapping workflow
gh workflow run schema-mapping.yml \
  -f source_system=test \
  -f table_name=customers \
  -f schema_json='{"table_name": "customers", "columns": [...]}' \
  -f s3_file_path=incoming/test_customers.csv

Next: dbt Model Generation

Last updated