Skip to content

From App to Code

This guide shows how to achieve the same results as the Interactive Lineage App programmatically using Python.

Try the App First

If you haven't already, try the Interactive Lineage App to visualize how SQL pipelines are analyzed. The examples below match those available in the app.


Setting Up

from clgraph import Pipeline

That's it! The Pipeline class is your main entry point for all lineage analysis.


Single Query Examples

Simple SELECT

In the App: Load "1-Query: Simple SELECT"

In Code:

from clgraph import Pipeline

sql = """
SELECT id, name, email
FROM users
WHERE active = true
"""

pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")

# View the lineage
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")
print(f"Columns: {len(pipeline.columns)}")

# Get column details
for col in pipeline.columns.values():
    print(f"  {col.table_name}.{col.column_name} ({col.node_type})")

Query with JOIN

In the App: Load "1-Query: With JOIN"

In Code:

from clgraph import Pipeline

sql = """
SELECT
    u.id,
    u.name,
    o.total,
    o.status
FROM users u
JOIN orders o ON u.id = o.user_id
"""

pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")

# Trace where a column comes from
sources = pipeline.trace_column_backward("select_result", "total")
print("Sources for 'total' column:")
for source in sources:
    print(f"  <- {source.table_name}.{source.column_name}")

Query with Aggregates

In the App: Load "1-Query: With Aggregates"

In Code:

from clgraph import Pipeline

sql = """
SELECT
    user_id,
    COUNT(*) as order_count,
    SUM(total) as total_spent,
    AVG(total) as avg_order
FROM orders
GROUP BY user_id
"""

pipeline = Pipeline.from_sql_string(sql, dialect="bigquery")

# See the transformation types
for name, col in pipeline.columns.items():
    if col.layer == "output":
        print(f"{col.column_name}: {col.node_type}")
        # Output shows different node types like:
        # user_id: base_column
        # order_count: aggregate
        # total_spent: aggregate

Multi-Query Pipeline Examples

3-Layer Pipeline

In the App: Load "3-Query: 3-Layer Pipeline"

In Code:

from clgraph import Pipeline

queries = [
    ("01_staging", """
        CREATE TABLE staging.user_orders AS
        SELECT
            user_id,
            order_id,
            amount,
            order_date
        FROM raw.orders
        WHERE status = 'completed'
    """),
    ("02_analytics", """
        CREATE TABLE analytics.user_metrics AS
        SELECT
            user_id,
            COUNT(*) as order_count,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM staging.user_orders
        GROUP BY user_id
    """),
    ("03_reports", """
        CREATE TABLE reports.monthly_revenue AS
        SELECT
            DATE_TRUNC(uo.order_date, MONTH) as month,
            SUM(um.total_revenue) as revenue
        FROM analytics.user_metrics um
        JOIN staging.user_orders uo USING (user_id)
        GROUP BY month
    """),
]

pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# View table execution order
print("Execution order:")
for i, table in enumerate(pipeline.table_graph.topological_sort(), 1):
    print(f"  {i}. {table}")

# Trace a column across the entire pipeline
sources = pipeline.trace_column_backward("reports.monthly_revenue", "revenue")
print("\nSources for 'revenue':")
for source in sources:
    print(f"  <- {source.table_name}.{source.column_name}")

SELECT * EXCEPT (Privacy)

In the App: Load "2-Query: SELECT * EXCEPT (Privacy)"

In Code:

from clgraph import Pipeline

queries = [
    ("01_staging", """
        CREATE TABLE staging.orders AS
        SELECT
            order_id,
            user_id,
            amount,
            customer_email,
            customer_ssn,
            order_date
        FROM raw.orders
    """),
    ("02_clean", """
        CREATE TABLE analytics.clean_orders AS
        SELECT * EXCEPT (customer_email, customer_ssn)
        FROM staging.orders
    """),
]

pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# See which columns are in the final table
final_cols = [c for c in pipeline.columns.values()
              if c.table_name == "analytics.clean_orders"]
print("Columns in clean_orders:")
for col in final_cols:
    print(f"  {col.column_name}")
    # Note: customer_email and customer_ssn are excluded!

# Verify sensitive columns were excluded
excluded = {"customer_email", "customer_ssn"}
included = {c.column_name for c in final_cols}
print(f"\nExcluded columns: {excluded - included}")

SELECT * REPLACE (Transform)

In the App: Load "2-Query: SELECT * REPLACE (Transform)"

In Code:

from clgraph import Pipeline

queries = [
    ("01_staging", """
        CREATE TABLE staging.orders AS
        SELECT
            order_id,
            user_id,
            amount,
            status,
            order_date
        FROM raw.orders
    """),
    ("02_normalize", """
        CREATE TABLE analytics.orders_normalized AS
        SELECT * REPLACE (
            UPPER(status) as status,
            ROUND(amount, 2) as amount
        )
        FROM staging.orders
    """),
]

pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# The replaced columns are tracked with their transformations
for name, col in pipeline.columns.items():
    if "orders_normalized" in name:
        print(f"{col.column_name}: {col.node_type}")

Visualization

Column Lineage Visualization

The app shows interactive diagrams. In code, you can create GraphViz visualizations:

from clgraph import Pipeline, visualize_pipeline_lineage

# Create a simple pipeline
queries = [
    ("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
    ("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Create column lineage visualization
dot = visualize_pipeline_lineage(pipeline.column_graph)

# Save to DOT file
with open("lineage.dot", "w") as f:
    f.write(dot.source)

# Or render directly (requires graphviz system package)
# dot.render("lineage", format="png", cleanup=True)

print(f"Created visualization with {len(dot.source)} characters")

Table Dependencies Visualization

from clgraph import Pipeline, visualize_table_dependencies, visualize_table_dependencies_with_levels

# Using the same pipeline from above
queries = [
    ("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
    ("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Basic table dependency graph
dot = visualize_table_dependencies(pipeline.table_graph)
with open("tables.dot", "w") as f:
    f.write(dot.source)

# With execution levels (shows which tables can run in parallel)
dot_levels = visualize_table_dependencies_with_levels(pipeline.table_graph, pipeline)
with open("tables_levels.dot", "w") as f:
    f.write(dot_levels.source)

print("Created table dependency visualizations")

Lineage Path Tracing

from clgraph import Pipeline, visualize_lineage_path

queries = [
    ("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
    ("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Trace backward from a column to its sources
nodes, edges = pipeline.trace_column_backward_full("analytics.totals", "total")
dot = visualize_lineage_path(nodes, edges, is_backward=True)
with open("backward_trace.dot", "w") as f:
    f.write(dot.source)

# Trace forward to see impact
nodes, edges = pipeline.trace_column_forward_full("raw.orders", "amount")
dot = visualize_lineage_path(nodes, edges, is_backward=False)
with open("forward_trace.dot", "w") as f:
    f.write(dot.source)

print("Created lineage path visualizations")

Available Visualization Functions

Function Purpose
visualize_pipeline_lineage() Multi-query column lineage
visualize_column_lineage() Single-query column lineage
visualize_table_dependencies() Table-level DAG
visualize_table_dependencies_with_levels() Table DAG with execution levels
visualize_lineage_path() Traced lineage path
visualize_column_path() Path to specific column

Rendering DOT Files

Convert DOT to image using GraphViz CLI:

# PNG
dot -Tpng lineage.dot -o lineage.png

# SVG (recommended for web)
dot -Tsvg lineage.dot -o lineage.svg

# PDF
dot -Tpdf lineage.dot -o lineage.pdf

Export to JSON/CSV

from clgraph import Pipeline
from clgraph.export import JSONExporter, CSVExporter

# Using the same pipeline from above
queries = [
    ("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
    ("analytics", "CREATE TABLE analytics.totals AS SELECT SUM(amount) as total FROM staging.orders"),
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Export to JSON
JSONExporter.export_to_file(pipeline, "lineage.json")

# Export to CSV
CSVExporter.export_columns_to_file(pipeline, "columns.csv")

print("Exported lineage.json and columns.csv")

Key Differences: App vs Code

Feature App Code
Input Paste SQL in text boxes Load from files, strings, or lists
Visualization Interactive diagrams GraphViz DOT → PNG/SVG/PDF
Scale Single pipeline Batch process hundreds of files
Automation Manual CI/CD integration, scripts
Metadata View only Programmatic access, propagation
Export View in browser JSON, CSV, GraphViz DOT

Next Steps