Template Variables
Use Jinja2-style template variables to parameterize your SQL queries for multi-environment pipelines, dynamic table names, and dbt-style integrations.
Overview
Template variables allow you to:
- Deploy the same SQL across multiple environments (dev, staging, prod)
- Parameterize table names by project, region, or dataset
- Integrate with dbt using compiled SQL output
- Support Airflow macros for dynamic table naming
- Validate SQL syntax before variable substitution
Basic Usage
Simple Variable Substitution
from clgraph import Pipeline
# SQL with template variables
queries = [
"""
CREATE TABLE {{env}}_staging.orders AS
SELECT order_id, customer_id, amount
FROM raw.orders
WHERE status = 'completed'
"""
]
# Build pipeline for production
prod_pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"env": "prod"}
)
# Tables created: prod_staging.orders
print(prod_pipeline.table_graph.tables)
# Output: ['raw.orders', 'prod_staging.orders']
Multi-Environment Deployment
# Same SQL, different environments
queries = [
"""
CREATE TABLE {{env}}_staging.orders AS
SELECT order_id, customer_id, amount
FROM raw.orders
""",
"""
CREATE TABLE {{env}}_analytics.customer_metrics AS
SELECT customer_id, COUNT(*) as order_count
FROM {{env}}_staging.orders
GROUP BY customer_id
"""
]
# Development environment
dev_pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"env": "dev"}
)
# Production environment
prod_pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"env": "prod"}
)
# Different tables for each environment
print(dev_pipeline.table_graph.tables)
# Output: ['raw.orders', 'dev_staging.orders', 'dev_analytics.customer_metrics']
print(prod_pipeline.table_graph.tables)
# Output: ['raw.orders', 'prod_staging.orders', 'prod_analytics.customer_metrics']
Input Formats
Template context can be provided in multiple formats:
Python Dictionary
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={
"env": "prod",
"project": "my_company",
"region": "us_central"
}
)
YAML Configuration
import yaml
# Load from YAML file
with open("config.yaml", "r") as f:
template_context = yaml.safe_load(f)
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context=template_context
)
Example config.yaml:
env: production
project: analytics_pipeline
config:
region: us-central1
dataset: customer_data
start_date: 2025-01-01
JSON Configuration
import json
# Load from JSON file
with open("config.json", "r") as f:
template_context = json.load(f)
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context=template_context
)
Template Syntax
Simple Variables
Nested Variables
CREATE TABLE {{config.project}}.{{config.env}}_staging.orders AS
SELECT order_id, customer_id, amount
FROM {{config.source_db}}.raw_{{config.region}}.orders
WHERE created_at >= '{{config.start_date}}'
Template context:
template_context = {
"config": {
"project": "data_platform",
"env": "staging",
"source_db": "external_db",
"region": "us_central",
"start_date": "2025-01-01"
}
}
Factory Method Support
All Pipeline factory methods support template variables:
from_sql_list()
queries = [
"CREATE TABLE {{project}}.staging.users AS SELECT * FROM raw.users",
"CREATE TABLE {{project}}.analytics.users AS SELECT * FROM {{project}}.staging.users"
]
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"project": "my_company"}
)
from_sql_files()
pipeline = Pipeline.from_sql_files(
"sql/",
dialect="bigquery",
template_context={"env": "prod", "project": "analytics"}
)
from_dict()
queries = {
"staging": "CREATE TABLE {{project}}.staging.orders AS SELECT * FROM raw.orders",
"analytics": "CREATE TABLE {{project}}.analytics.metrics AS SELECT * FROM {{project}}.staging.orders"
}
pipeline = Pipeline.from_dict(
queries,
dialect="bigquery",
template_context={"project": "my_company"}
)
from_tuples()
queries = [
("staging", "CREATE TABLE {{project}}.staging.orders AS SELECT * FROM raw.orders"),
("analytics", "CREATE TABLE {{project}}.analytics.metrics AS SELECT * FROM {{project}}.staging.orders")
]
pipeline = Pipeline.from_tuples(
queries,
dialect="bigquery",
template_context={"project": "my_company"}
)
from_sql_string()
sql = """
CREATE TABLE {{project}}.staging.orders AS
SELECT * FROM raw.orders;
CREATE TABLE {{project}}.analytics.metrics AS
SELECT * FROM {{project}}.staging.orders;
"""
pipeline = Pipeline.from_sql_string(
sql,
dialect="bigquery",
template_context={"project": "my_company"}
)
Real-World Examples
Example 1: Multi-Region Deployment
# SQL with region placeholders
queries = [
"""
CREATE TABLE {{region}}_analytics.customer_metrics AS
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_revenue
FROM {{region}}_raw.orders
GROUP BY customer_id
"""
]
# Deploy to US region
us_pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"region": "us"}
)
# Deploy to EU region
eu_pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"region": "eu"}
)
# Each region has independent tables
print(us_pipeline.table_graph.tables)
# Output: ['us_raw.orders', 'us_analytics.customer_metrics']
print(eu_pipeline.table_graph.tables)
# Output: ['eu_raw.orders', 'eu_analytics.customer_metrics']
Example 2: Project-Based Configuration
queries = {
"staging": """
CREATE TABLE {{project}}.staging.user_data AS
SELECT user_id, username, email, created_at
FROM {{project}}.raw.users
""",
"analytics": """
CREATE TABLE {{project}}.analytics.user_summary AS
SELECT
user_id,
username,
COUNT(*) as activity_count
FROM {{project}}.staging.user_data
JOIN {{project}}.raw.activities USING (user_id)
GROUP BY user_id, username
"""
}
pipeline = Pipeline.from_dict(
queries,
dialect="bigquery",
template_context={"project": "my_company"}
)
# All tables namespaced by project
print(pipeline.table_graph.tables)
# Output: [
# 'my_company.raw.users',
# 'my_company.staging.user_data',
# 'my_company.raw.activities',
# 'my_company.analytics.user_summary'
# ]
Example 3: CI/CD Pipeline
import os
import yaml
# Load environment-specific config
env = os.getenv("DEPLOY_ENV", "dev")
with open(f"config/{env}.yaml", "r") as f:
config = yaml.safe_load(f)
# Load SQL files
pipeline = Pipeline.from_sql_files(
"sql/",
dialect="bigquery",
template_context=config
)
# Validate before deployment
print(f"Deploying to {env} environment")
print(f"Tables to create: {len(pipeline.table_graph.tables)}")
# Execute pipeline
results = pipeline.run(executor=bigquery_executor)
print(f"✅ Deployed {len(results['completed'])} tables to {env}")
Column Lineage with Templates
Template variables work seamlessly with column lineage tracing:
queries = [
"""
CREATE TABLE {{env}}_staging.orders AS
SELECT order_id, customer_id, amount
FROM raw.orders
""",
"""
CREATE TABLE {{env}}_analytics.customer_metrics AS
SELECT
customer_id,
SUM(amount) as total_revenue
FROM {{env}}_staging.orders
GROUP BY customer_id
"""
]
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery",
template_context={"env": "prod"}
)
# Trace column lineage (template variables resolved)
sources = pipeline.trace_column_backward(
"prod_analytics.customer_metrics",
"total_revenue"
)
print(f"Source columns for total_revenue:")
for source in sources:
print(f" - {source.table_name}.{source.column_name}")
# Output:
# - raw.orders.amount
Validation Without Context
You can validate SQL syntax without providing template context. The template syntax will be preserved:
queries = [
"CREATE TABLE {{env}}_staging.orders AS SELECT * FROM raw.orders"
]
# No template_context provided
pipeline = Pipeline.from_sql_list(
queries,
dialect="bigquery"
# NO template_context
)
# Template syntax preserved
print(pipeline.table_graph.tables)
# Output: ['raw.orders', '{{env}}_staging.orders']
This is useful for:
- SQL syntax validation before deployment
- Linting SQL files in CI/CD
- Testing SQL structure without specific environment values
Integration Examples
dbt Integration
Use dbt's compiled SQL output with clgraph:
# After running: dbt compile
pipeline = Pipeline.from_sql_files(
"target/compiled/my_project/models/",
dialect="bigquery",
template_context={
"env": "prod"
}
)
Note: dbt's {{ ref() }} and {{ source() }} functions are resolved during dbt compile. Use the compiled SQL with clgraph for lineage analysis.
Airflow Macros
Replace Airflow macros with template variables:
# Before (Airflow template)
sql = """
CREATE TABLE {{ params.env }}_staging.orders AS
SELECT * FROM raw.orders
WHERE date = '{{ ds }}'
"""
# With clgraph (validate structure)
pipeline = Pipeline.from_sql_string(
sql,
dialect="bigquery",
template_context={
"params": {"env": "prod"},
"ds": "2025-01-01"
}
)
Best Practices
1. Use Configuration Files
Store template variables in separate config files:
# config/production.yaml
env: production
project: analytics_platform
config:
region: us-central1
dataset: customer_data
retention_days: 90
import yaml
with open("config/production.yaml") as f:
config = yaml.safe_load(f)
pipeline = Pipeline.from_sql_files(
"sql/",
dialect="bigquery",
template_context=config
)
2. Consistent Naming Conventions
Use consistent variable names across your organization:
env- Environment (dev, staging, prod)project- GCP project or namespaceregion- Geographic regiondataset- BigQuery dataset or database schema
3. Document Required Variables
Create a README documenting required variables:
## Required Template Variables
- `env` (string): Environment name (dev/staging/prod)
- `project` (string): GCP project ID
- `config.region` (string): Data region (us-central1, eu-west1)
- `config.start_date` (string): Processing start date (YYYY-MM-DD)
4. Validate Templates
Always validate template resolution before deployment:
# Check that all templates were resolved
for table in pipeline.table_graph.tables:
if "{{" in table or "}}" in table:
raise ValueError(f"Unresolved template in table: {table}")
Troubleshooting
Undefined Variables
If a template variable is not provided, Jinja2 will raise an error:
# ERROR: Missing 'project' variable
pipeline = Pipeline.from_sql_list(
["CREATE TABLE {{project}}.orders AS SELECT * FROM raw.orders"],
dialect="bigquery",
template_context={"env": "prod"} # 'project' missing!
)
Solution: Ensure all variables used in templates are defined in template_context.
Syntax Errors
Template variables must use Jinja2 syntax: {{ variable }}
# ❌ Wrong
sql = "CREATE TABLE $env_staging.orders ..." # Shell-style
sql = "CREATE TABLE %env%_staging.orders ..." # Batch-style
# ✅ Correct
sql = "CREATE TABLE {{env}}_staging.orders ..." # Jinja2-style
API Reference
Pipeline Factory Methods
All factory methods accept template_context parameter:
Pipeline.from_sql_list(
queries: List[str],
dialect: str,
template_context: Optional[Dict[str, Any]] = None
)
Pipeline.from_sql_files(
sql_directory: str,
dialect: str,
template_context: Optional[Dict[str, Any]] = None
)
Pipeline.from_dict(
queries: Dict[str, str],
dialect: str,
template_context: Optional[Dict[str, Any]] = None
)
Pipeline.from_tuples(
queries: List[Tuple[str, str]],
dialect: str,
template_context: Optional[Dict[str, Any]] = None
)
Pipeline.from_sql_string(
sql: str,
dialect: str,
template_context: Optional[Dict[str, Any]] = None
)
Complete Example
See examples/template_variables_example.py for a comprehensive example with 7 different use cases.
Next Steps
- Quick Start - Get started in 5 minutes
- Examples - More real-world patterns
- Pipeline API - Full Pipeline reference