Lineage Agent
The LineageAgent provides a natural language interface to pipeline lineage. Ask questions in plain English and get structured answers.
Overview
from clgraph.agent import LineageAgent
agent = LineageAgent(pipeline)
# Ask questions naturally
result = agent.query("Where does the revenue column come from?")
print(result.answer) # "Column revenue is derived from: raw.orders.amount"
result = agent.query("What tables are available?")
print(result.answer) # "Found 12 tables (3 source, 9 derived)"
Quick Start
Without LLM (Pattern Matching)
The agent works without an LLM by using pattern matching to route questions:
from clgraph.agent import LineageAgent
agent = LineageAgent(pipeline)
# Lineage questions
result = agent.query("Where does analytics.revenue.total come from?")
print(result.tool_used) # "trace_backward"
print(result.answer) # "Column is derived from: ..."
# Schema questions
result = agent.query("What columns does the orders table have?")
print(result.tool_used) # "get_table_schema"
# Search questions
result = agent.query("Find columns containing 'customer'")
print(result.tool_used) # "search_columns"
With LLM (Enhanced Capabilities)
Add an LLM for SQL generation and general questions:
from clgraph.agent import LineageAgent
from langchain_ollama import ChatOllama
llm = ChatOllama(model="llama3.2", temperature=0.3)
agent = LineageAgent(pipeline, llm=llm)
# SQL generation (requires LLM)
result = agent.query("Write SQL to get top customers by revenue")
print(result.data["sql"])
# Query explanation (requires LLM)
result = agent.query("Explain this query: SELECT ...")
print(result.answer)
Question Types
The agent automatically classifies questions and routes them to the appropriate tool:
| Question Type | Example Questions | Tool Used |
|---|---|---|
| Lineage Backward | "Where does X come from?", "Source of Y" | trace_backward |
| Lineage Forward | "What depends on X?", "Impact of Y" | trace_forward |
| Schema Tables | "What tables exist?", "List all tables" | list_tables |
| Schema Columns | "What columns does X have?", "Schema of Y" | get_table_schema |
| Schema Search | "Find columns named X", "Search for Y" | search_columns |
| Governance PII | "Which columns are PII?", "Sensitive data" | find_pii_columns |
| Governance Owner | "Who owns X?", "Ownership of Y" | get_owners |
| SQL Generate | "Write SQL to...", "Query to get..." | generate_sql |
| SQL Explain | "What does this query do?", "Explain SQL" | explain_query |
AgentResult
All queries return an AgentResult object:
@dataclass
class AgentResult:
answer: str # Natural language answer
question_type: QuestionType # Detected question type
tool_used: str | None # Name of tool that was used
tool_result: ToolResult | None # Raw tool result
data: dict | None # Structured data
error: str | None # Error message if failed
Accessing Results
result = agent.query("Where does revenue come from?")
# Human-readable answer
print(result.answer)
# "Column analytics.revenue.total is derived from: raw.orders.amount"
# What type of question was detected
print(result.question_type)
# QuestionType.LINEAGE_BACKWARD
# Which tool handled it
print(result.tool_used)
# "trace_backward"
# Structured data for programmatic use
print(result.data)
# {"column": "analytics.revenue.total", "sources": ["raw.orders.amount"]}
QuestionType Enum
from clgraph.agent import QuestionType
class QuestionType(Enum):
LINEAGE_BACKWARD = "lineage_backward" # Where does X come from?
LINEAGE_FORWARD = "lineage_forward" # What depends on X?
LINEAGE_PATH = "lineage_path" # How does X relate to Y?
SCHEMA_TABLES = "schema_tables" # What tables exist?
SCHEMA_COLUMNS = "schema_columns" # What columns does X have?
SCHEMA_SEARCH = "schema_search" # Find columns matching X
GOVERNANCE_PII = "governance_pii" # Which columns are PII?
GOVERNANCE_OWNER = "governance_owner" # Who owns X?
SQL_GENERATE = "sql_generate" # Write SQL to...
SQL_EXPLAIN = "sql_explain" # What does this query do?
GENERAL = "general" # General question
Example Queries
Lineage Questions
# Backward tracing
result = agent.query("Where does analytics.customer_metrics.lifetime_value come from?")
print(result.answer)
# "Column analytics.customer_metrics.lifetime_value is derived from: raw.orders.total_amount"
# Forward impact
result = agent.query("What depends on raw.orders.customer_id?")
print(result.answer)
# "Column raw.orders.customer_id impacts: analytics.customer_metrics.customer_id, ..."
# Using natural variations
agent.query("Source of the revenue column")
agent.query("Trace back customer_id")
agent.query("What uses the email column?")
agent.query("If I change order_date, what's affected?")
Schema Questions
# List tables
result = agent.query("What tables are available?")
print(result.data["tables"])
# Table schema
result = agent.query("What columns does marts.customer_360 have?")
for col in result.data["columns"]:
print(f" {col['name']}: {col['description']}")
# Search
result = agent.query("Find columns containing 'revenue'")
print(f"Found {len(result.data['matches'])} matches")
Governance Questions
# PII detection
result = agent.query("Which columns contain PII?")
for col in result.data["pii_columns"]:
print(f" {col['full_name']}: {col['tags']}")
# Ownership
result = agent.query("Who owns the customer_360 table?")
print(result.data["owner"])
SQL Questions (Requires LLM)
# Generate SQL
result = agent.query("Write SQL to get top 10 customers by lifetime value")
print(result.data["sql"])
# SELECT customer_id, lifetime_value
# FROM analytics.customer_metrics
# ORDER BY lifetime_value DESC
# LIMIT 10
# Explain SQL
sql = "SELECT user_id, SUM(amount) FROM orders GROUP BY 1"
result = agent.query(f"What does this query do? {sql}")
print(result.answer)
Available Tools
The agent has access to 16 tools:
| # | Tool Name | Description |
|---|---|---|
| 1 | trace_backward |
Trace column to ultimate sources |
| 2 | trace_forward |
Find downstream dependencies |
| 3 | get_lineage_path |
Find path between two columns |
| 4 | get_table_lineage |
Get lineage summary for a table |
| 5 | list_tables |
List all tables in pipeline |
| 6 | get_table_schema |
Get table columns and types |
| 7 | get_relationships |
Get table dependencies |
| 8 | search_columns |
Search columns by pattern |
| 9 | get_execution_order |
Get query execution order |
| 10 | find_pii_columns |
Find PII-flagged columns |
| 11 | get_owners |
Get ownership information |
| 12 | get_columns_by_tag |
Find columns by tag |
| 13 | list_tags |
List all tags |
| 14 | check_data_quality |
Check for quality issues |
| 15 | generate_sql |
Generate SQL from question (LLM) |
| 16 | explain_query |
Explain SQL query (LLM) |
Using with LangGraph
For more complex agent workflows, combine with LangGraph:
from langgraph.graph import StateGraph
from clgraph.agent import LineageAgent
# Create lineage agent
lineage_agent = LineageAgent(pipeline, llm=llm)
# Define state
class State(TypedDict):
question: str
lineage_result: AgentResult
final_answer: str
# Define nodes
def query_lineage(state: State) -> State:
result = lineage_agent.query(state["question"])
return {"lineage_result": result}
def format_answer(state: State) -> State:
result = state["lineage_result"]
return {"final_answer": f"Based on lineage: {result.answer}"}
# Build graph
workflow = StateGraph(State)
workflow.add_node("query", query_lineage)
workflow.add_node("format", format_answer)
workflow.add_edge("query", "format")
workflow.set_entry_point("query")
app = workflow.compile()
Error Handling
result = agent.query("Invalid question about nonexistent.column")
if result.error:
print(f"Error: {result.error}")
else:
print(result.answer)
# Check success via tool_result
if result.tool_result and not result.tool_result.success:
print(f"Tool failed: {result.tool_result.message}")
Best Practices
- Be Specific: Include table and column names when asking about specific data
# Good
agent.query("Where does analytics.revenue.total come from?")
# Less specific
agent.query("Where does revenue come from?")
- Use Natural Language: The agent understands various phrasings
# All equivalent
agent.query("Source of customer_id")
agent.query("Where does customer_id come from?")
agent.query("What feeds into customer_id?")
agent.query("Trace back customer_id")
- Check Question Type: Verify the agent understood your intent
result = agent.query("...")
if result.question_type == QuestionType.GENERAL:
# Agent didn't recognize the pattern - rephrase
pass
- Use Structured Data: For programmatic use, access
result.datainstead of parsingresult.answer