AI & LLM Integration
Use clgraph's AI capabilities to generate documentation, build text-to-SQL systems, and create conversational data agents.
clgraph provides first-class LLM integration with schema-grounded context, ensuring AI tools have accurate information about your data pipeline.
The Challenge
AI applications on data pipelines often fail due to:
- LLM hallucinations: Models invent table and column names that don't exist
- Missing context: AI can't understand data relationships without lineage
- Manual documentation: Writing column descriptions is tedious and outdated quickly
- Complex queries: Users need SQL but don't know the schema
- Siloed AI tools: Each AI application requires custom integration
The Solution: Schema-Grounded AI
clgraph solves these problems by:
- Lineage context: AI sees how data flows through transformations
- Schema awareness: Tools automatically include real table/column names
- Metadata enrichment: PII flags, ownership, and descriptions inform AI responses
- Unified tools: Same tools work across agents, MCP, and direct API
Use Case 1: LLM-Powered Description Generation
Scenario: Auto-generate descriptions for columns that don't have them.
Setup with Ollama (Local, Free)
from clgraph import Pipeline
from langchain_ollama import ChatOllama
# Load your pipeline
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# Connect to local Ollama
llm = ChatOllama(
model="llama3:latest", # or llama3.2, qwen3-coder:30b
temperature=0.3,
)
pipeline.llm = llm
Setup with OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
)
pipeline.llm = llm
Generate Descriptions
# Generate descriptions for all columns without them
pipeline.generate_all_descriptions()
# Or generate for specific columns
from clgraph.column import generate_description
for col in pipeline.columns.values():
if not col.description and col.table_name.startswith("mart_"):
generate_description(col, pipeline.llm, pipeline)
print(f"{col.full_name}: {col.description}")
Why LLM-generated descriptions are good:
The LLM sees lineage context including:
- Source columns and their metadata
- Transformations (SUM, JOIN, CASE)
- Filter conditions (WHERE clauses)
- Aggregation logic (GROUP BY)
This produces descriptions that understand the data flow, not just the column name.
Use Case 2: Text-to-SQL with Schema Grounding
Scenario: Generate SQL from natural language without hallucinations.
The Problem with Traditional Text-to-SQL
LLMs often hallucinate table and column names:
User: "Show me total revenue by customer"
LLM: SELECT customer, SUM(revenue) FROM sales...
Problem: There's no "sales" table or "revenue" column!
The Solution: GenerateSQLTool
clgraph provides a GenerateSQLTool that automatically includes your real schema:
from clgraph import Pipeline
from clgraph.tools import GenerateSQLTool
from langchain_openai import ChatOpenAI
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Create the SQL generation tool
sql_tool = GenerateSQLTool(pipeline, llm)
# Generate SQL from natural language
result = sql_tool.run(question="What is total revenue by customer?")
if result.success:
print(result.data["sql"])
print(result.data["explanation"])
Two-Stage Generation
For large schemas, use the two-stage strategy to select relevant tables first:
result = sql_tool.run(
question="What is total revenue by customer?",
strategy="two_stage", # Select tables first, then generate
include_explanation=True,
)
Manual Context Building
For custom integrations, build the context yourself:
import json
# Export the graph as context for the LLM
context = {
"tables": list(pipeline.table_graph.tables.keys()),
"columns_by_table": {},
}
for table_name in pipeline.table_graph.tables:
context["columns_by_table"][table_name] = [
col.column_name
for col in pipeline.columns.values()
if col.table_name == table_name
]
# The context can be included in an LLM prompt
print(f"Found {len(context['tables'])} tables")
print(f"Sample table columns: {list(context['columns_by_table'].keys())[:3]}")
Use Case 3: LineageAgent - Conversational Interface
Scenario: Ask questions about your data in natural language.
Basic Usage
from clgraph import Pipeline
from clgraph.agent import LineageAgent
from langchain_openai import ChatOpenAI
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Create the agent
agent = LineageAgent(pipeline, llm=llm)
# Ask lineage questions
result = agent.query("Where does mart_customer_ltv.revenue come from?")
print(result.answer)
# Ask schema questions
result = agent.query("What tables are available?")
print(result.answer)
# Ask governance questions
result = agent.query("Which columns contain PII?")
print(result.answer)
# Generate SQL
result = agent.query("Write SQL to get monthly revenue by region")
print(result.data["sql"])
Question Types Supported
The LineageAgent automatically classifies and routes questions:
| Question Type | Example | Tool Used |
|---|---|---|
| Lineage (backward) | "Where does revenue come from?" | trace_backward |
| Lineage (forward) | "What depends on orders.amount?" | trace_forward |
| Schema (tables) | "What tables exist?" | list_tables |
| Schema (columns) | "What columns does orders have?" | get_table_schema |
| Schema (search) | "Find columns named 'revenue'" | search_columns |
| Governance (PII) | "Which columns are PII?" | find_pii_columns |
| Governance (owners) | "Who owns the orders table?" | get_owners |
| SQL (generate) | "Write SQL to..." | generate_sql |
| SQL (explain) | "Explain this query..." | explain_query |
Direct Tool Access
You can also run tools directly:
# Run a specific tool
result = agent.run_tool("trace_backward", table="mart_customer_ltv", column="revenue")
# List available tools
tools = agent.list_tools()
print(tools)
Use Case 4: SQL Explanation
Scenario: Understand what complex SQL queries do.
from clgraph.tools import ExplainQueryTool
explain_tool = ExplainQueryTool(pipeline, llm)
sql = """
SELECT
c.customer_id,
c.name,
SUM(o.total_amount) as lifetime_value
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'completed'
GROUP BY c.customer_id, c.name
HAVING SUM(o.total_amount) > 1000
"""
result = explain_tool.run(sql=sql, detail_level="detailed")
print(result.data["explanation"])
Detail Levels:
brief: One-sentence summarynormal: 2-3 sentence explanationdetailed: Full breakdown of purpose, tables, joins, filters, and output
Use Case 5: MCP Server for External AI Agents
Scenario: Expose your pipeline to Claude, Cursor, or other MCP-compatible AI tools.
What is MCP?
The Model Context Protocol (MCP) is a standard for exposing tools and data to AI assistants. clgraph provides an MCP server that exposes lineage tools to any MCP-compatible client.
Running the MCP Server
Or programmatically:
from clgraph import Pipeline
from clgraph.mcp import run_mcp_server
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
run_mcp_server(pipeline) # Blocks until terminated
Available MCP Tools
The MCP server exposes all lineage tools:
| Tool | Description |
|---|---|
trace_backward |
Find source columns for a given column |
trace_forward |
Find downstream impacts of a column |
list_tables |
List all tables in the pipeline |
get_table_schema |
Get columns for a table |
search_columns |
Search for columns by name |
find_pii_columns |
Find PII-flagged columns |
get_owners |
Get ownership information |
get_execution_order |
Get query execution order |
MCP Resources
The server also exposes resources for direct data access:
pipeline://schema- Full pipeline schemapipeline://tables- List of all tablespipeline://tables/{name}- Specific table details
Claude Desktop Configuration
Add to your Claude Desktop config:
{
"mcpServers": {
"clgraph": {
"command": "python",
"args": ["-m", "clgraph.mcp", "--pipeline", "/path/to/queries", "--dialect", "bigquery"]
}
}
}
Use Case 6: Building Custom AI Tools
Scenario: Create your own AI-powered data tools using the clgraph tools framework.
Tool Registry
The tool registry provides a unified interface for all tools:
from clgraph.tools import create_tool_registry
# Create registry (LLM is optional - only needed for SQL tools)
registry = create_tool_registry(pipeline, llm=None)
# Run tools by name - basic tools work without LLM
result = registry.run("list_tables")
print(f"Found {len(result.data)} tables") # result.data is a list of tables
print(f"Message: {result.message}")
# List available tools
print(f"Available tools: {registry.tool_names()[:5]}...")
Available Tool Categories
Lineage Tools (no LLM required):
TraceBackwardTool- Find source columnsTraceForwardTool- Find downstream impactsGetLineagePathTool- Find path between columnsGetTableLineageTool- Get table-level dependencies
Schema Tools (no LLM required):
ListTablesTool- List all tablesGetTableSchemaTool- Get table columnsSearchColumnsTool- Search columns by patternGetRelationshipsTool- Get table relationshipsGetExecutionOrderTool- Get topological execution order
Governance Tools (no LLM required):
FindPIIColumnsTool- Find PII columnsGetOwnersTool- Get ownership infoGetColumnsByTagTool- Filter by tagsListTagsTool- List all tagsCheckDataQualityTool- Audit metadata quality
SQL Tools (require LLM):
GenerateSQLTool- Natural language to SQLExplainQueryTool- Explain SQL queries
Creating Custom Tools
Extend BaseTool or LLMTool:
from clgraph.tools import BaseTool, ParameterSpec, ParameterType, ToolResult
class MyCustomTool(BaseTool):
name = "my_tool"
description = "My custom lineage tool"
@property
def parameters(self):
return {
"table": ParameterSpec(
name="table",
type=ParameterType.STRING,
description="Table to analyze",
required=True,
),
}
def run(self, table: str) -> ToolResult:
# Access pipeline via self.pipeline
columns = list(self.pipeline.get_columns_by_table(table))
return ToolResult.success_result(
data={"column_count": len(columns)},
message=f"Table {table} has {len(columns)} columns",
)
# Test the custom tool
tool = MyCustomTool(pipeline)
print(f"Tool name: {tool.name}")
print(f"Tool description: {tool.description}")
Best Practices
1. Use Metadata for Better AI Results
The more metadata you provide, the better AI responses:
# Add descriptions before using AI
for col in pipeline.columns.values():
if col.table_name == "raw_customers":
col.description = "Customer data from CRM"
col.owner = "data-team"
# Now AI has context for better responses
result = agent.query("Where does customer data come from?")
2. Propagate Metadata First
Always propagate metadata before AI operations:
# Ensure metadata flows through lineage
pipeline.propagate_all_metadata()
# Now PII tracking is complete for AI queries
result = agent.query("Which columns contain sensitive data?")
3. Use Two-Stage for Large Schemas
For pipelines with many tables, use two-stage SQL generation:
result = sql_tool.run(
question="Get revenue trends",
strategy="two_stage", # Selects relevant tables first
)
4. Validate AI-Generated SQL
Always review generated SQL before execution:
result = sql_tool.run(question="Delete all old orders")
# Check the generated SQL
print(result.data["sql"])
# NEVER execute destructive SQL without review!
Key Benefits
| Traditional Approach | clgraph AI Integration |
|---|---|
| Manual documentation | Auto-generated with lineage context |
| Hallucinated table names | Schema-grounded SQL generation |
| Custom integrations per tool | Unified tool framework |
| Siloed AI applications | MCP server for any AI client |
| No lineage awareness | AI understands data flow |
Next Steps
- Data Catalog & Governance - Set up metadata for better AI results
- SQL Debugging & Lineage - Understand lineage before AI queries
- Multi-Environment Execution - Execute AI-generated SQL safely