GitHub Actions Workflow
Overview
This section covers setting up GitHub Actions workflows to automate the schema mapping and dbt model generation process. We'll create two workflows:
Schema Mapping Workflow: Triggered by Airflow, creates mapping and raises PR
dbt Generation Workflow: Triggered by PR merge, generates dbt artifacts
Schema Mapping Workflow
1. Workflow Configuration
Create .github/workflows/schema-mapping.yml:
name: Automated Schema Mapping
on:
workflow_dispatch:
inputs:
source_system:
description: 'Source system name'
required: true
type: string
table_name:
description: 'Source table name'
required: true
type: string
schema_json:
description: 'JSON schema information'
required: true
type: string
s3_file_path:
description: 'S3 file path'
required: true
type: string
env:
CHICORY_API_KEY: ${{ secrets.CHICORY_API_KEY }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: us-east-1
jobs:
schema-mapping:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install requests boto3 pyyaml
- name: Parse schema information
id: parse-schema
run: |
python -c "
import json
import os
schema_data = json.loads('${{ github.event.inputs.schema_json }}')
# Set outputs
with open(os.environ['GITHUB_OUTPUT'], 'a') as f:
f.write(f'source_system=${{ github.event.inputs.source_system }}\\n')
f.write(f'table_name=${{ github.event.inputs.table_name }}\\n')
f.write(f'row_count={schema_data.get(\"row_count\", 0)}\\n')
f.write(f'column_count={len(schema_data.get(\"columns\", []))}\\n')
"
- name: Generate schema mapping
id: generate-mapping
run: |
python scripts/generate_schema_mapping.py \
--source-schema '${{ github.event.inputs.schema_json }}' \
--source-system '${{ github.event.inputs.source_system }}' \
--table-name '${{ github.event.inputs.table_name }}' \
--output-file mapping_result.json
- name: Validate mapping result
run: |
python scripts/validate_mapping.py mapping_result.json
- name: Create mapping directory
run: |
mkdir -p mappings/${{ steps.parse-schema.outputs.source_system }}
cp mapping_result.json mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.json
- name: Generate mapping documentation
run: |
python scripts/generate_mapping_docs.py \
--mapping-file mapping_result.json \
--output-file mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.md
- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
with:
token: ${{ secrets.GITHUB_TOKEN }}
commit-message: |
Add schema mapping for ${{ steps.parse-schema.outputs.source_system }}.${{ steps.parse-schema.outputs.table_name }}
- Source: ${{ github.event.inputs.s3_file_path }}
- Columns: ${{ steps.parse-schema.outputs.column_count }}
- Rows: ${{ steps.parse-schema.outputs.row_count }}
Auto-generated by Chicory AI schema mapping agent.
title: 'Schema Mapping: ${{ steps.parse-schema.outputs.source_system }}.${{ steps.parse-schema.outputs.table_name }}'
body: |
## Schema Mapping Summary
**Source System:** ${{ steps.parse-schema.outputs.source_system }}
**Table Name:** ${{ steps.parse-schema.outputs.table_name }}
**S3 File:** `${{ github.event.inputs.s3_file_path }}`
**Columns:** ${{ steps.parse-schema.outputs.column_count }}
**Rows:** ${{ steps.parse-schema.outputs.row_count }}
### Changes
- ✅ Generated schema mapping configuration
- ✅ Created mapping documentation
- ✅ Validated mapping structure
### Next Steps
1. Review the generated mapping in `mappings/${{ steps.parse-schema.outputs.source_system }}/${{ steps.parse-schema.outputs.table_name }}_mapping.json`
2. Validate business logic and transformations
3. Merge this PR to trigger dbt model generation
### Mapping Overview
```json
$(cat mapping_result.json | jq '.mapping_metadata + {column_count: .column_mappings | length}')
```
---
🤖 *This PR was automatically created by the Chicory AI schema mapping workflow.*
branch: feature/schema-mapping-${{ steps.parse-schema.outputs.source_system }}-${{ steps.parse-schema.outputs.table_name }}
delete-branch: true
labels: |
automated
schema-mapping
chicory-ai2. Schema Mapping Script
Create scripts/generate_schema_mapping.py:
#!/usr/bin/env python3
"""
Generate schema mapping using Chicory AI agent
"""
import argparse
import json
import requests
import logging
import os
from typing import Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_target_standards() -> Dict[str, Any]:
"""Load target schema standards from configuration"""
standards_file = os.path.join(os.path.dirname(__file__), 'target_standards.json')
if os.path.exists(standards_file):
with open(standards_file, 'r') as f:
return json.load(f)
# Default standards if file doesn't exist
return {
"naming_conventions": {
"table_prefix": {
"dimension": "dim_",
"fact": "fact_",
"staging": "stg_"
},
"column_case": "snake_case"
},
"data_types": {
"string_default": "STRING",
"integer_default": "INTEGER",
"decimal_default": "NUMERIC(15,2)",
"date_default": "DATE",
"timestamp_default": "TIMESTAMP"
}
}
def call_chicory_agent(schema_data: Dict[str, Any], target_standards: Dict[str, Any]) -> Dict[str, Any]:
"""Call Chicory schema mapping agent"""
api_key = os.environ.get('CHICORY_API_KEY')
if not api_key:
raise ValueError("CHICORY_API_KEY environment variable not set")
# Agent configuration
agent_endpoint = "https://api.chicory.ai/v1/agents/schema_mapper_agent/chat"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Prepare the prompt
prompt = f"""
Map the following source schema to our target standards:
SOURCE SCHEMA:
{json.dumps(schema_data, indent=2)}
TARGET STANDARDS:
{json.dumps(target_standards, indent=2)}
Generate a comprehensive schema mapping following dimensional modeling best practices.
"""
payload = {
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 2000
}
logger.info("Calling Chicory schema mapping agent...")
try:
response = requests.post(agent_endpoint, headers=headers, json=payload, timeout=60)
response.raise_for_status()
result = response.json()
content = result['choices'][0]['message']['content']
# Parse JSON from response
mapping = parse_mapping_from_response(content)
logger.info(f"Successfully generated mapping for {mapping.get('mapping_metadata', {}).get('target_table', 'unknown table')}")
return mapping
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
def parse_mapping_from_response(content: str) -> Dict[str, Any]:
"""Extract JSON mapping from agent response"""
import re
# Look for JSON code block
json_match = re.search(r'```json\n(.*?)\n```', content, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
# Look for plain JSON object
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group(0))
# If no JSON found, create a basic mapping
logger.warning("Could not parse JSON from response, creating basic mapping")
return create_fallback_mapping(content)
def create_fallback_mapping(content: str) -> Dict[str, Any]:
"""Create a fallback mapping structure if parsing fails"""
return {
"mapping_metadata": {
"mapping_version": "1.0",
"created_at": "2024-01-15T10:30:00Z",
"mapping_confidence": 0.5,
"notes": "Fallback mapping created due to parsing error"
},
"column_mappings": [],
"recommendations": {
"parsing_error": content[:500]
}
}
def validate_mapping(mapping: Dict[str, Any]) -> bool:
"""Validate the generated mapping structure"""
required_keys = ['mapping_metadata', 'column_mappings']
for key in required_keys:
if key not in mapping:
logger.error(f"Missing required key: {key}")
return False
# Validate column mappings structure
for col_mapping in mapping.get('column_mappings', []):
required_col_keys = ['source_column', 'target_column', 'source_type', 'target_type']
for key in required_col_keys:
if key not in col_mapping:
logger.warning(f"Missing column mapping key: {key}")
logger.info("Mapping validation passed")
return True
def main():
parser = argparse.ArgumentParser(description='Generate schema mapping using Chicory AI')
parser.add_argument('--source-schema', required=True, help='Source schema JSON string')
parser.add_argument('--source-system', required=True, help='Source system name')
parser.add_argument('--table-name', required=True, help='Source table name')
parser.add_argument('--output-file', required=True, help='Output mapping file')
args = parser.parse_args()
try:
# Parse source schema
source_schema = json.loads(args.source_schema)
# Load target standards
target_standards = load_target_standards()
# Generate mapping
mapping = call_chicory_agent(source_schema, target_standards)
# Add metadata
mapping['mapping_metadata'].update({
'source_system': args.source_system,
'source_table': args.table_name,
'generated_by': 'chicory-ai-github-action'
})
# Validate mapping
if not validate_mapping(mapping):
raise ValueError("Generated mapping failed validation")
# Save mapping
with open(args.output_file, 'w') as f:
json.dump(mapping, f, indent=2)
logger.info(f"Schema mapping saved to {args.output_file}")
except Exception as e:
logger.error(f"Schema mapping generation failed: {e}")
exit(1)
if __name__ == "__main__":
main()3. Validation Script
Create scripts/validate_mapping.py:
#!/usr/bin/env python3
"""
Validate generated schema mapping
"""
import argparse
import json
import logging
import sys
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def validate_mapping_structure(mapping: dict) -> list:
"""Validate the structure of the mapping"""
issues = []
# Required top-level keys
required_keys = ['mapping_metadata', 'column_mappings']
for key in required_keys:
if key not in mapping:
issues.append(f"Missing required key: {key}")
# Validate metadata
if 'mapping_metadata' in mapping:
metadata = mapping['mapping_metadata']
required_metadata = ['source_table', 'target_table', 'mapping_version']
for key in required_metadata:
if key not in metadata:
issues.append(f"Missing metadata key: {key}")
# Validate column mappings
if 'column_mappings' in mapping:
for i, col_mapping in enumerate(mapping['column_mappings']):
required_col_keys = ['source_column', 'target_column', 'source_type', 'target_type']
for key in required_col_keys:
if key not in col_mapping:
issues.append(f"Column mapping {i}: Missing key {key}")
return issues
def validate_naming_conventions(mapping: dict) -> list:
"""Validate naming conventions in the mapping"""
issues = []
if 'column_mappings' not in mapping:
return issues
for i, col_mapping in enumerate(mapping['column_mappings']):
target_column = col_mapping.get('target_column', '')
# Check snake_case
if not target_column.islower() or ' ' in target_column:
issues.append(f"Column mapping {i}: target_column '{target_column}' should use snake_case")
# Check for reserved words (basic check)
reserved_words = ['select', 'from', 'where', 'order', 'group']
if target_column.lower() in reserved_words:
issues.append(f"Column mapping {i}: target_column '{target_column}' is a reserved word")
return issues
def validate_data_types(mapping: dict) -> list:
"""Validate data type mappings"""
issues = []
valid_target_types = [
'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'DATE', 'TIMESTAMP',
'NUMERIC', 'JSON', 'ARRAY', 'STRUCT'
]
if 'column_mappings' not in mapping:
return issues
for i, col_mapping in enumerate(mapping['column_mappings']):
target_type = col_mapping.get('target_type', '')
# Check if target type is valid (basic validation)
base_type = target_type.split('(')[0] # Handle NUMERIC(15,2) format
if base_type not in valid_target_types:
issues.append(f"Column mapping {i}: Unknown target_type '{target_type}'")
return issues
def main():
parser = argparse.ArgumentParser(description='Validate schema mapping')
parser.add_argument('mapping_file', help='Path to mapping JSON file')
args = parser.parse_args()
try:
# Load mapping
with open(args.mapping_file, 'r') as f:
mapping = json.load(f)
# Run validations
all_issues = []
structure_issues = validate_mapping_structure(mapping)
naming_issues = validate_naming_conventions(mapping)
datatype_issues = validate_data_types(mapping)
all_issues.extend(structure_issues)
all_issues.extend(naming_issues)
all_issues.extend(datatype_issues)
# Report results
if all_issues:
logger.error(f"Validation failed with {len(all_issues)} issues:")
for issue in all_issues:
logger.error(f" - {issue}")
sys.exit(1)
else:
logger.info("Validation passed successfully")
# Print summary
metadata = mapping.get('mapping_metadata', {})
columns = len(mapping.get('column_mappings', []))
logger.info(f"Mapping Summary:")
logger.info(f" Source: {metadata.get('source_table', 'Unknown')}")
logger.info(f" Target: {metadata.get('target_table', 'Unknown')}")
logger.info(f" Columns: {columns}")
logger.info(f" Confidence: {metadata.get('mapping_confidence', 'Unknown')}")
except FileNotFoundError:
logger.error(f"Mapping file not found: {args.mapping_file}")
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in mapping file: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Validation error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()dbt Generation Workflow
1. Workflow Configuration
Create .github/workflows/dbt-generation.yml:
name: Generate dbt Models
on:
pull_request:
types: [closed]
branches: [main]
paths:
- 'mappings/**/*_mapping.json'
env:
CHICORY_API_KEY: ${{ secrets.CHICORY_API_KEY }}
jobs:
generate-dbt-models:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install requests pyyaml dbt-core dbt-bigquery
- name: Detect changed mapping files
id: detect-changes
run: |
# Get changed files from the merged PR
CHANGED_FILES=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/files \
--jq '.[] | select(.filename | test("mappings/.*_mapping\\.json$")) | .filename')
if [ -z "$CHANGED_FILES" ]; then
echo "No mapping files changed"
echo "has_changes=false" >> $GITHUB_OUTPUT
exit 0
fi
echo "Changed mapping files:"
echo "$CHANGED_FILES"
echo "has_changes=true" >> $GITHUB_OUTPUT
echo "mapping_files<<EOF" >> $GITHUB_OUTPUT
echo "$CHANGED_FILES" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Generate dbt artifacts
if: steps.detect-changes.outputs.has_changes == 'true'
run: |
while IFS= read -r mapping_file; do
if [ -n "$mapping_file" ]; then
echo "Processing: $mapping_file"
python scripts/generate_dbt_artifacts.py \
--mapping-file "$mapping_file" \
--output-dir models/
fi
done <<< "${{ steps.detect-changes.outputs.mapping_files }}"
- name: Validate generated dbt models
if: steps.detect-changes.outputs.has_changes == 'true'
run: |
# Parse dbt models for syntax
dbt parse --project-dir . --profiles-dir profiles/
- name: Run dbt tests on generated models
if: steps.detect-changes.outputs.has_changes == 'true'
continue-on-error: true
run: |
dbt test --select +tag:auto-generated --project-dir . --profiles-dir profiles/
- name: Create Pull Request
if: steps.detect-changes.outputs.has_changes == 'true'
uses: peter-evans/create-pull-request@v5
with:
token: ${{ secrets.GITHUB_TOKEN }}
commit-message: |
Generate dbt models from schema mappings
Auto-generated dbt models and documentation from approved schema mappings.
Files generated:
${{ steps.detect-changes.outputs.mapping_files }}
title: 'dbt Models: Auto-generated from Schema Mappings'
body: |
## dbt Model Generation
This PR contains auto-generated dbt models and documentation based on the recently merged schema mappings.
### Generated Files
${{ steps.detect-changes.outputs.mapping_files }}
### Validation Results
- ✅ dbt syntax validation passed
- ✅ Model compilation successful
- ⚠️ Tests may require review and adjustment
### Next Steps
1. Review generated SQL for business logic accuracy
2. Validate column descriptions and documentation
3. Adjust any failing tests
4. Merge to deploy new models
---
🤖 *This PR was automatically created by the Chicory AI dbt generation workflow.*
branch: feature/dbt-models-auto-generated
delete-branch: true
labels: |
automated
dbt-models
chicory-ai2. dbt Generation Script
Create scripts/generate_dbt_artifacts.py:
#!/usr/bin/env python3
"""
Generate dbt models and YAML documentation using Chicory AI agent
"""
import argparse
import json
import requests
import logging
import os
import re
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def call_chicory_dbt_agent(mapping: dict) -> dict:
"""Call Chicory dbt generation agent"""
api_key = os.environ.get('CHICORY_API_KEY')
if not api_key:
raise ValueError("CHICORY_API_KEY environment variable not set")
agent_endpoint = "https://api.chicory.ai/v1/agents/dbt_generator_agent/chat"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Load dbt project configuration
dbt_project_config = load_dbt_project_config()
prompt = f"""
Generate dbt model artifacts for the following schema mapping:
SCHEMA MAPPING:
{json.dumps(mapping, indent=2)}
DBT PROJECT CONFIG:
{json.dumps(dbt_project_config, indent=2)}
Please generate:
1. Complete dbt model SQL file
2. Comprehensive YAML documentation with tests
3. Any necessary macros or additional files
Follow dbt best practices and ensure proper documentation, tests, and transformations.
"""
payload = {
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1,
"max_tokens": 3000
}
logger.info("Calling Chicory dbt generation agent...")
try:
response = requests.post(agent_endpoint, headers=headers, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
content = result['choices'][0]['message']['content']
# Parse artifacts from response
artifacts = parse_dbt_artifacts(content)
logger.info(f"Successfully generated dbt artifacts")
return artifacts
except Exception as e:
logger.error(f"dbt generation failed: {e}")
raise
def load_dbt_project_config() -> dict:
"""Load dbt project configuration"""
config_file = 'dbt_project.yml'
if os.path.exists(config_file):
import yaml
with open(config_file, 'r') as f:
return yaml.safe_load(f)
# Default configuration
return {
"name": "analytics_dbt",
"version": "1.0.0",
"profile": "analytics",
"model-paths": ["models"],
"source-paths": ["models"],
"test-paths": ["tests"]
}
def parse_dbt_artifacts(content: str) -> dict:
"""Extract dbt artifacts from agent response"""
artifacts = {}
# Extract SQL model
sql_pattern = r'```sql\n(.*?)\n```'
sql_match = re.search(sql_pattern, content, re.DOTALL | re.IGNORECASE)
if sql_match:
artifacts['sql_model'] = sql_match.group(1).strip()
# Extract YAML documentation
yaml_pattern = r'```yaml\n(.*?)\n```'
yaml_match = re.search(yaml_pattern, content, re.DOTALL | re.IGNORECASE)
if yaml_match:
artifacts['yaml_doc'] = yaml_match.group(1).strip()
# Extract macros if present
macro_pattern = r'```macro\n(.*?)\n```'
macro_match = re.search(macro_pattern, content, re.DOTALL | re.IGNORECASE)
if macro_match:
artifacts['macro'] = macro_match.group(1).strip()
return artifacts
def write_dbt_files(artifacts: dict, mapping: dict, output_dir: str):
"""Write generated dbt files to disk"""
metadata = mapping.get('mapping_metadata', {})
target_table = metadata.get('target_table', 'unknown_table')
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Write SQL model
if 'sql_model' in artifacts:
model_file = output_path / f"{target_table}.sql"
with open(model_file, 'w') as f:
f.write(artifacts['sql_model'])
logger.info(f"Created dbt model: {model_file}")
# Write YAML documentation
if 'yaml_doc' in artifacts:
yaml_file = output_path / f"_{target_table}.yml"
with open(yaml_file, 'w') as f:
f.write(artifacts['yaml_doc'])
logger.info(f"Created YAML documentation: {yaml_file}")
# Write macro if present
if 'macro' in artifacts:
macro_dir = Path("macros")
macro_dir.mkdir(exist_ok=True)
macro_file = macro_dir / f"{target_table}_macros.sql"
with open(macro_file, 'w') as f:
f.write(artifacts['macro'])
logger.info(f"Created macro: {macro_file}")
def main():
parser = argparse.ArgumentParser(description='Generate dbt artifacts using Chicory AI')
parser.add_argument('--mapping-file', required=True, help='Schema mapping JSON file')
parser.add_argument('--output-dir', required=True, help='Output directory for dbt files')
args = parser.parse_args()
try:
# Load mapping
with open(args.mapping_file, 'r') as f:
mapping = json.load(f)
# Generate artifacts
artifacts = call_chicory_dbt_agent(mapping)
# Write files
write_dbt_files(artifacts, mapping, args.output_dir)
logger.info("dbt artifacts generated successfully")
except Exception as e:
logger.error(f"dbt generation failed: {e}")
exit(1)
if __name__ == "__main__":
main()Repository Setup
1. GitHub Secrets
Configure these secrets in your GitHub repository:
# Required secrets
CHICORY_API_KEY=your_chicory_api_key
AWS_ACCESS_KEY_ID=your_aws_access_key
AWS_SECRET_ACCESS_KEY=your_aws_secret_key2. Target Standards Configuration
Create scripts/target_standards.json:
{
"naming_conventions": {
"table_prefix": {
"dimension": "dim_",
"fact": "fact_",
"staging": "stg_",
"intermediate": "int_"
},
"column_case": "snake_case",
"reserved_suffixes": {
"primary_key": "_sk",
"business_key": "_bk",
"foreign_key": "_fk",
"date": "_date",
"timestamp": "_ts"
}
},
"data_types": {
"string_default": "STRING",
"integer_default": "INTEGER",
"decimal_default": "NUMERIC(15,2)",
"date_default": "DATE",
"timestamp_default": "TIMESTAMP",
"boolean_default": "BOOLEAN"
},
"standard_columns": {
"audit_columns": [
{"name": "created_at", "type": "TIMESTAMP"},
{"name": "updated_at", "type": "TIMESTAMP"},
{"name": "is_active", "type": "BOOLEAN"}
]
}
}3. Directory Structure
Ensure your repository has the required structure:
your-dbt-repo/
├── .github/workflows/
│ ├── schema-mapping.yml
│ └── dbt-generation.yml
├── scripts/
│ ├── generate_schema_mapping.py
│ ├── validate_mapping.py
│ ├── generate_dbt_artifacts.py
│ └── target_standards.json
├── mappings/
│ └── [auto-generated mapping files]
├── models/
│ └── [auto-generated dbt models]
├── dbt_project.yml
└── README.mdTesting GitHub Actions
1. Local Testing
Test the scripts locally before deploying:
# Test schema mapping
python scripts/generate_schema_mapping.py \
--source-schema '{"table_name": "test", "columns": [...]}' \
--source-system "test_system" \
--table-name "test_table" \
--output-file test_mapping.json
# Test validation
python scripts/validate_mapping.py test_mapping.json
# Test dbt generation
python scripts/generate_dbt_artifacts.py \
--mapping-file test_mapping.json \
--output-dir test_output/2. Workflow Testing
Use workflow dispatch to test:
# Trigger schema mapping workflow
gh workflow run schema-mapping.yml \
-f source_system=test \
-f table_name=customers \
-f schema_json='{"table_name": "customers", "columns": [...]}' \
-f s3_file_path=incoming/test_customers.csvNext: dbt Model Generation
Last updated