From App to Code
This guide shows how to achieve the same results as the Interactive Lineage App programmatically using Python.
Try the App First
If you haven't already, try the Interactive Lineage App to visualize how SQL pipelines are analyzed. The examples below match those available in the app.
Setting Up
That's it! The Pipeline class is your main entry point for all lineage analysis.
Single Query Examples
Simple SELECT
In the App: Load "1-Query: Simple SELECT"
In Code:
from clgraph import Pipeline
sql = """
SELECT id, name, email
FROM users
WHERE active = true
"""
pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")
# View the lineage
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")
print(f"Columns: {len(pipeline.columns)}")
# Get column details
for col in pipeline.columns.values():
print(f" {col.table_name}.{col.column_name} ({col.node_type})")
Query with JOIN
In the App: Load "1-Query: With JOIN"
In Code:
from clgraph import Pipeline
sql = """
SELECT
u.id,
u.name,
o.total,
o.status
FROM users u
JOIN orders o ON u.id = o.user_id
"""
pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")
# Trace where a column comes from
sources = pipeline.trace_column_backward("select_result", "total")
print("Sources for 'total' column:")
for source in sources:
print(f" <- {source.table_name}.{source.column_name}")
Query with Aggregates
In the App: Load "1-Query: With Aggregates"
In Code:
from clgraph import Pipeline
sql = """
SELECT
user_id,
COUNT(*) as order_count,
SUM(total) as total_spent,
AVG(total) as avg_order
FROM orders
GROUP BY user_id
"""
pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")
# See the transformation types
for name, col in pipeline.columns.items():
if col.layer == "output":
print(f"{col.column_name}: {col.node_type}")
# Output shows different node types like:
# user_id: base_column
# order_count: aggregate
# total_spent: aggregate
Multi-Query Pipeline Examples
3-Layer Pipeline
In the App: Load "3-Query: 3-Layer Pipeline"
In Code:
from clgraph import Pipeline
queries = [
("01_staging", """
CREATE TABLE staging.user_orders AS
SELECT
user_id,
order_id,
amount,
order_date
FROM raw.orders
WHERE status = 'completed'
"""),
("02_analytics", """
CREATE TABLE analytics.user_metrics AS
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM staging.user_orders
GROUP BY user_id
"""),
("03_reports", """
CREATE TABLE reports.monthly_revenue AS
SELECT
DATE_TRUNC(uo.order_date, MONTH) as month,
SUM(um.total_revenue) as revenue
FROM analytics.user_metrics um
JOIN staging.user_orders uo USING (user_id)
GROUP BY month
"""),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# View table execution order
print("Execution order:")
for i, table in enumerate(pipeline.table_graph.topological_sort(), 1):
print(f" {i}. {table}")
# Trace a column across the entire pipeline
sources = pipeline.trace_column_backward("reports.monthly_revenue", "revenue")
print("\nSources for 'revenue':")
for source in sources:
print(f" <- {source.table_name}.{source.column_name}")
SELECT * EXCEPT (Privacy)
In the App: Load "2-Query: SELECT * EXCEPT (Privacy)"
In Code:
from clgraph import Pipeline
queries = [
("01_staging", """
CREATE TABLE staging.orders AS
SELECT
order_id,
user_id,
amount,
customer_email,
customer_ssn,
order_date
FROM raw.orders
"""),
("02_clean", """
CREATE TABLE analytics.clean_orders AS
SELECT * EXCEPT (customer_email, customer_ssn)
FROM staging.orders
"""),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# See which columns are in the final table
final_cols = [c for c in pipeline.columns.values()
if c.table_name == "analytics.clean_orders"]
print("Columns in clean_orders:")
for col in final_cols:
print(f" {col.column_name}")
# Note: customer_email and customer_ssn are excluded!
# Verify sensitive columns were excluded
excluded = {"customer_email", "customer_ssn"}
included = {c.column_name for c in final_cols}
print(f"\nExcluded columns: {excluded - included}")
SELECT * REPLACE (Transform)
In the App: Load "2-Query: SELECT * REPLACE (Transform)"
In Code:
from clgraph import Pipeline
queries = [
("01_staging", """
CREATE TABLE staging.orders AS
SELECT
order_id,
user_id,
amount,
status,
order_date
FROM raw.orders
"""),
("02_normalize", """
CREATE TABLE analytics.orders_normalized AS
SELECT * REPLACE (
UPPER(status) as status,
ROUND(amount, 2) as amount
)
FROM staging.orders
"""),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# The replaced columns are tracked with their transformations
for name, col in pipeline.columns.items():
if "orders_normalized" in name:
print(f"{col.column_name}: {col.node_type}")
Visualization
Column Lineage Visualization
The app shows interactive diagrams. In code, you can create GraphViz visualizations:
from clgraph import Pipeline, visualize_pipeline_lineage
# Create a simple pipeline
queries = [
("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# Create column lineage visualization
dot = visualize_pipeline_lineage(pipeline.column_graph)
# Save to DOT file
with open("lineage.dot", "w") as f:
f.write(dot.source)
# Or render directly (requires graphviz system package)
# dot.render("lineage", format="png", cleanup=True)
print(f"Created visualization with {len(dot.source)} characters")
Table Dependencies Visualization
from clgraph import Pipeline, visualize_table_dependencies, visualize_table_dependencies_with_levels
# Using the same pipeline from above
queries = [
("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# Basic table dependency graph
dot = visualize_table_dependencies(pipeline.table_graph)
with open("tables.dot", "w") as f:
f.write(dot.source)
# With execution levels (shows which tables can run in parallel)
dot_levels = visualize_table_dependencies_with_levels(pipeline.table_graph, pipeline)
with open("tables_levels.dot", "w") as f:
f.write(dot_levels.source)
print("Created table dependency visualizations")
Lineage Path Tracing
from clgraph import Pipeline, visualize_lineage_path
queries = [
("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# Trace backward from a column to its sources
nodes, edges = pipeline.trace_column_backward_full("analytics.totals", "total")
dot = visualize_lineage_path(nodes, edges, is_backward=True)
with open("backward_trace.dot", "w") as f:
f.write(dot.source)
# Trace forward to see impact
nodes, edges = pipeline.trace_column_forward_full("raw.orders", "amount")
dot = visualize_lineage_path(nodes, edges, is_backward=False)
with open("forward_trace.dot", "w") as f:
f.write(dot.source)
print("Created lineage path visualizations")
Available Visualization Functions
| Function | Purpose |
|---|---|
visualize_pipeline_lineage() |
Multi-query column lineage |
visualize_column_lineage() |
Single-query column lineage |
visualize_table_dependencies() |
Table-level DAG |
visualize_table_dependencies_with_levels() |
Table DAG with execution levels |
visualize_lineage_path() |
Traced lineage path |
visualize_column_path() |
Path to specific column |
Rendering DOT Files
Convert DOT to image using GraphViz CLI:
# PNG
dot -Tpng lineage.dot -o lineage.png
# SVG (recommended for web)
dot -Tsvg lineage.dot -o lineage.svg
# PDF
dot -Tpdf lineage.dot -o lineage.pdf
Export to JSON/CSV
from clgraph import Pipeline
from clgraph.export import JSONExporter, CSVExporter
# Using the same pipeline from above
queries = [
("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")
# Export to JSON
JSONExporter.export_to_file(pipeline, "lineage.json")
# Export to CSV
CSVExporter.export_columns_to_file(pipeline, "columns.csv")
print("Exported lineage.json and columns.csv")
Key Differences: App vs Code
| Feature | App | Code |
|---|---|---|
| Input | Paste SQL in text boxes | Load from files, strings, or lists |
| Visualization | Interactive diagrams | GraphViz DOT → PNG/SVG/PDF |
| Scale | Single pipeline | Batch process hundreds of files |
| Automation | Manual | CI/CD integration, scripts |
| Metadata | View only | Programmatic access, propagation |
| Export | View in browser | JSON, CSV, GraphViz DOT |
Next Steps
- API Reference - Full API documentation
- Pipeline Orchestration - Execute pipelines
- Metadata from Comments - Add PII tags, descriptions
- Examples - More code examples