Entity and Relationship Extraction
Basic Entity Extraction
Copy
from openai import OpenAI
from dataclasses import dataclass
import json
@dataclass
class Entity:
"""An extracted entity."""
name: str
type: str
properties: dict = None
@dataclass
class Relationship:
"""A relationship between entities."""
source: str
target: str
type: str
properties: dict = None
class EntityExtractor:
"""Extract entities and relationships from text."""
def __init__(self, model: str = "gpt-4o-mini"):
self.client = OpenAI()
self.model = model
self.entity_types = [
"Person", "Organization", "Location", "Product",
"Event", "Technology", "Concept"
]
self.relationship_types = [
"works_for", "located_in", "founded", "acquired",
"partners_with", "uses", "created", "manages"
]
def extract(self, text: str) -> dict:
"""Extract entities and relationships from text."""
prompt = f"""Extract entities and relationships from this text.
Entity types: {', '.join(self.entity_types)}
Relationship types: {', '.join(self.relationship_types)}
Text:
{text}
Return JSON:
{{
"entities": [
{{"name": "entity name", "type": "entity type", "properties": {{}}}}
],
"relationships": [
{{"source": "entity1", "target": "entity2", "type": "relationship type", "properties": {{}}}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
entities = [
Entity(
name=e["name"],
type=e["type"],
properties=e.get("properties", {})
)
for e in data.get("entities", [])
]
relationships = [
Relationship(
source=r["source"],
target=r["target"],
type=r["type"],
properties=r.get("properties", {})
)
for r in data.get("relationships", [])
]
return {
"entities": entities,
"relationships": relationships
}
# Usage
extractor = EntityExtractor()
text = """
Apple Inc., founded by Steve Jobs in Cupertino, California,
acquired Beats Electronics in 2014. The company uses advanced
machine learning technology in its products. Tim Cook, who
previously worked at Compaq, now manages Apple's operations.
"""
result = extractor.extract(text)
print("Entities:")
for entity in result["entities"]:
print(f" {entity.name} ({entity.type})")
print("\nRelationships:")
for rel in result["relationships"]:
print(f" {rel.source} --[{rel.type}]--> {rel.target}")
Coreference Resolution
Copy
from openai import OpenAI
import json
class CoreferenceResolver:
"""Resolve pronouns and references to entities."""
def __init__(self, model: str = "gpt-4o-mini"):
self.client = OpenAI()
self.model = model
def resolve(self, text: str) -> dict:
"""Resolve coreferences in text."""
prompt = f"""Resolve all pronouns and references in this text.
For each reference (he, she, it, they, the company, etc.), identify what entity it refers to.
Text:
{text}
Return JSON:
{{
"resolved_text": "text with references replaced by entity names",
"resolutions": [
{{"original": "pronoun/reference", "resolved_to": "entity name", "context": "surrounding words"}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
# Usage
resolver = CoreferenceResolver()
text = """
Microsoft was founded by Bill Gates. He later stepped down as CEO,
but the company continued to grow. They acquired LinkedIn in 2016.
"""
result = resolver.resolve(text)
print(f"Resolved text:\n{result['resolved_text']}")
print(f"\nResolutions:")
for r in result["resolutions"]:
print(f" '{r['original']}' -> '{r['resolved_to']}'")
Neo4j Integration
Building a Knowledge Graph
Copy
from neo4j import GraphDatabase
from openai import OpenAI
from dataclasses import dataclass
import json
class KnowledgeGraph:
"""Knowledge graph backed by Neo4j."""
def __init__(
self,
neo4j_uri: str,
neo4j_user: str,
neo4j_password: str,
model: str = "gpt-4o-mini"
):
self.driver = GraphDatabase.driver(
neo4j_uri,
auth=(neo4j_user, neo4j_password)
)
self.client = OpenAI()
self.model = model
def close(self):
"""Close database connection."""
self.driver.close()
def add_entity(self, name: str, entity_type: str, properties: dict = None):
"""Add an entity node to the graph."""
properties = properties or {}
properties["name"] = name
query = f"""
MERGE (n:{entity_type} {{name: $name}})
SET n += $properties
RETURN n
"""
with self.driver.session() as session:
session.run(query, name=name, properties=properties)
def add_relationship(
self,
source: str,
target: str,
rel_type: str,
properties: dict = None
):
"""Add a relationship between entities."""
properties = properties or {}
query = f"""
MATCH (a {{name: $source}})
MATCH (b {{name: $target}})
MERGE (a)-[r:{rel_type}]->(b)
SET r += $properties
RETURN r
"""
with self.driver.session() as session:
session.run(
query,
source=source,
target=target,
properties=properties
)
def ingest_text(self, text: str):
"""Extract entities and relationships from text and add to graph."""
# Extract using LLM
prompt = f"""Extract entities and relationships from this text.
Text:
{text}
Return JSON:
{{
"entities": [
{{"name": "entity", "type": "Person|Organization|Location|Concept", "properties": {{}}}}
],
"relationships": [
{{"source": "entity1", "target": "entity2", "type": "RELATIONSHIP_TYPE", "properties": {{}}}}
]
}}
Use UPPERCASE_WITH_UNDERSCORES for relationship types."""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
# Add entities
for entity in data.get("entities", []):
self.add_entity(
entity["name"],
entity["type"],
entity.get("properties", {})
)
# Add relationships
for rel in data.get("relationships", []):
self.add_relationship(
rel["source"],
rel["target"],
rel["type"],
rel.get("properties", {})
)
return data
def query_cypher(self, query: str) -> list:
"""Execute a Cypher query."""
with self.driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
def natural_language_query(self, question: str) -> str:
"""Convert natural language to Cypher and execute."""
# Get schema context
schema = self._get_schema()
prompt = f"""Convert this question to a Cypher query.
Graph schema:
{schema}
Question: {question}
Return only the Cypher query, no explanation:"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
cypher = response.choices[0].message.content.strip()
cypher = cypher.replace("```cypher", "").replace("```", "").strip()
# Execute query
results = self.query_cypher(cypher)
# Generate natural language response
answer_prompt = f"""Answer this question based on the query results.
Question: {question}
Results: {json.dumps(results)}
Answer concisely:"""
answer = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": answer_prompt}]
)
return answer.choices[0].message.content
def _get_schema(self) -> str:
"""Get graph schema for query generation."""
query = """
CALL db.schema.visualization()
"""
try:
with self.driver.session() as session:
result = session.run("CALL db.labels()")
labels = [r["label"] for r in result]
result = session.run("CALL db.relationshipTypes()")
rel_types = [r["relationshipType"] for r in result]
return f"Node labels: {labels}\nRelationship types: {rel_types}"
except Exception:
return "Schema not available"
# Usage
kg = KnowledgeGraph(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
# Ingest documents
text = """
OpenAI, based in San Francisco, developed GPT-4. Sam Altman leads OpenAI.
Microsoft invested $10 billion in OpenAI and integrated GPT-4 into Bing.
Google, OpenAI's competitor, developed Gemini in Mountain View.
"""
kg.ingest_text(text)
# Query naturally
answer = kg.natural_language_query("Which companies are in San Francisco?")
print(answer)
kg.close()
GraphRAG Pattern
Copy
from openai import OpenAI
from dataclasses import dataclass
import json
from typing import Optional
@dataclass
class GraphContext:
"""Context retrieved from knowledge graph."""
entities: list
relationships: list
paths: list
summaries: list
class GraphRAG:
"""RAG system combining knowledge graph with vector retrieval."""
def __init__(
self,
kg_client, # Neo4j or similar
vector_store, # Vector database client
model: str = "gpt-4o-mini"
):
self.client = OpenAI()
self.model = model
self.kg = kg_client
self.vector_store = vector_store
def _extract_query_entities(self, query: str) -> list[str]:
"""Extract entity names from query."""
prompt = f"""Extract entity names mentioned in this query.
Query: {query}
Return JSON: {{"entities": ["entity1", "entity2"]}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
return data.get("entities", [])
def _get_graph_context(
self,
entities: list[str],
max_hops: int = 2
) -> GraphContext:
"""Retrieve context from knowledge graph."""
all_entities = []
all_relationships = []
all_paths = []
for entity in entities:
# Get entity and its neighbors
query = f"""
MATCH (n {{name: $name}})
OPTIONAL MATCH (n)-[r]-(m)
RETURN n, r, m
LIMIT 50
"""
results = self.kg.query_cypher(query.replace("$name", f"'{entity}'"))
for result in results:
if result.get("n"):
all_entities.append(result["n"])
if result.get("m"):
all_entities.append(result["m"])
if result.get("r"):
all_relationships.append(result["r"])
# Get paths between query entities
if len(entities) > 1:
for other_entity in entities:
if other_entity != entity:
path_query = f"""
MATCH path = shortestPath(
(a {{name: '{entity}'}})-[*..{max_hops}]-(b {{name: '{other_entity}'}})
)
RETURN path
LIMIT 5
"""
paths = self.kg.query_cypher(path_query)
all_paths.extend(paths)
return GraphContext(
entities=all_entities,
relationships=all_relationships,
paths=all_paths,
summaries=[]
)
def _get_vector_context(self, query: str, k: int = 5) -> list[str]:
"""Retrieve relevant documents from vector store."""
results = self.vector_store.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def _format_graph_context(self, context: GraphContext) -> str:
"""Format graph context for prompt."""
lines = ["Graph Knowledge:"]
# Entities
seen_entities = set()
for entity in context.entities:
if isinstance(entity, dict) and entity.get("name"):
name = entity["name"]
if name not in seen_entities:
seen_entities.add(name)
lines.append(f"- Entity: {name}")
# Relationships
for rel in context.relationships:
if isinstance(rel, dict):
lines.append(f"- Relationship: {rel}")
return "\n".join(lines)
def query(self, question: str) -> dict:
"""Answer question using both graph and vector context."""
# Extract entities from question
entities = self._extract_query_entities(question)
# Get graph context
graph_context = self._get_graph_context(entities)
graph_text = self._format_graph_context(graph_context)
# Get vector context
vector_context = self._get_vector_context(question)
vector_text = "\n".join([f"Document: {doc}" for doc in vector_context])
# Combine and answer
prompt = f"""Answer this question using the provided context.
{graph_text}
Document Context:
{vector_text}
Question: {question}
Provide a comprehensive answer based on both the graph relationships and document content:"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
return {
"answer": response.choices[0].message.content,
"entities_used": entities,
"graph_context": graph_context,
"sources": vector_context
}
Entity Linking and Resolution
Copy
from openai import OpenAI
import json
from dataclasses import dataclass
@dataclass
class LinkedEntity:
"""An entity linked to canonical form."""
mention: str
canonical_name: str
entity_type: str
confidence: float
external_id: Optional[str] = None
class EntityLinker:
"""Link entity mentions to canonical entities."""
def __init__(
self,
knowledge_base: dict = None,
model: str = "gpt-4o-mini"
):
self.client = OpenAI()
self.model = model
self.knowledge_base = knowledge_base or {}
def add_to_kb(self, canonical: str, aliases: list[str], entity_type: str):
"""Add entity to knowledge base."""
self.knowledge_base[canonical] = {
"aliases": aliases,
"type": entity_type
}
def link_entities(self, text: str) -> list[LinkedEntity]:
"""Link entity mentions in text to canonical forms."""
# Get KB context
kb_context = json.dumps(self.knowledge_base, indent=2)
prompt = f"""Link entity mentions in this text to their canonical forms.
Knowledge Base:
{kb_context}
Text:
{text}
For each entity mention, determine:
1. The canonical name from the knowledge base (if found)
2. The entity type
3. Confidence (0-1)
Return JSON:
{{
"linked_entities": [
{{
"mention": "text as it appears",
"canonical_name": "canonical form",
"entity_type": "type",
"confidence": 0.95
}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
return [
LinkedEntity(
mention=e["mention"],
canonical_name=e["canonical_name"],
entity_type=e["entity_type"],
confidence=e["confidence"]
)
for e in data.get("linked_entities", [])
]
def resolve_duplicates(self, entities: list[str]) -> dict:
"""Resolve duplicate entities to canonical forms."""
prompt = f"""These entity names may refer to the same things.
Group them by what they refer to and pick a canonical name for each group.
Entities: {entities}
Return JSON:
{{
"groups": [
{{
"canonical": "canonical name",
"variants": ["variant1", "variant2"],
"type": "entity type"
}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
# Usage
linker = EntityLinker()
# Add known entities
linker.add_to_kb(
"Apple Inc.",
["Apple", "AAPL", "Apple Computer"],
"Organization"
)
linker.add_to_kb(
"Microsoft Corporation",
["Microsoft", "MSFT", "MS"],
"Organization"
)
linker.add_to_kb(
"Tim Cook",
["Timothy Cook", "Cook"],
"Person"
)
text = "Cook announced that Apple would partner with MS on new features."
linked = linker.link_entities(text)
for entity in linked:
print(f"'{entity.mention}' -> '{entity.canonical_name}' ({entity.confidence:.0%})")
Graph-Based Question Answering
Copy
from openai import OpenAI
import json
class GraphQA:
"""Question answering over knowledge graphs."""
def __init__(
self,
kg_client,
model: str = "gpt-4o-mini"
):
self.client = OpenAI()
self.model = model
self.kg = kg_client
def decompose_question(self, question: str) -> list[dict]:
"""Decompose complex question into sub-questions."""
prompt = f"""Decompose this question into simpler sub-questions that can be answered from a knowledge graph.
Question: {question}
Return JSON:
{{
"sub_questions": [
{{"question": "simple question", "depends_on": [indices of prior questions]}}
],
"reasoning": "how sub-questions combine to answer original"
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def generate_cypher(self, question: str, schema: str) -> str:
"""Generate Cypher query for a question."""
prompt = f"""Generate a Cypher query to answer this question.
Graph schema:
{schema}
Question: {question}
Return only the Cypher query:"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
query = response.choices[0].message.content.strip()
return query.replace("```cypher", "").replace("```", "").strip()
def answer_with_reasoning(self, question: str) -> dict:
"""Answer question with step-by-step reasoning."""
# Decompose question
decomposition = self.decompose_question(question)
# Answer each sub-question
sub_answers = []
for i, sub_q in enumerate(decomposition.get("sub_questions", [])):
# Check dependencies
dependencies = sub_q.get("depends_on", [])
context = ""
if dependencies:
context = "Prior answers:\n" + "\n".join([
f"- {sub_answers[j]}" for j in dependencies if j < len(sub_answers)
])
# Generate and execute query
schema = self.kg._get_schema() if hasattr(self.kg, "_get_schema") else ""
cypher = self.generate_cypher(sub_q["question"] + "\n" + context, schema)
try:
results = self.kg.query_cypher(cypher)
sub_answers.append({
"question": sub_q["question"],
"query": cypher,
"results": results
})
except Exception as e:
sub_answers.append({
"question": sub_q["question"],
"error": str(e)
})
# Synthesize final answer
synthesis_prompt = f"""Synthesize an answer to this question from the sub-question results.
Original question: {question}
Sub-question results:
{json.dumps(sub_answers, indent=2)}
Reasoning: {decomposition.get('reasoning', '')}
Provide a clear, comprehensive answer:"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return {
"question": question,
"decomposition": decomposition,
"sub_answers": sub_answers,
"final_answer": response.choices[0].message.content
}
# Usage
qa = GraphQA(kg_client=kg)
result = qa.answer_with_reasoning(
"Which companies that Microsoft invested in are based in San Francisco?"
)
print(f"Question: {result['question']}")
print(f"Answer: {result['final_answer']}")
Incremental Graph Building
Copy
from openai import OpenAI
import json
from datetime import datetime
class IncrementalGraphBuilder:
"""Build knowledge graph incrementally from documents."""
def __init__(
self,
kg_client,
model: str = "gpt-4o-mini"
):
self.client = OpenAI()
self.model = model
self.kg = kg_client
self.processed_docs = set()
def _get_existing_entities(self) -> list[str]:
"""Get list of existing entity names."""
query = "MATCH (n) RETURN DISTINCT n.name as name LIMIT 1000"
results = self.kg.query_cypher(query)
return [r["name"] for r in results if r.get("name")]
def process_document(
self,
doc_id: str,
text: str,
metadata: dict = None
) -> dict:
"""Process a document and update the graph."""
if doc_id in self.processed_docs:
return {"status": "already_processed"}
# Get existing entities for context
existing = self._get_existing_entities()
prompt = f"""Extract entities and relationships from this document.
Link to existing entities where appropriate.
Existing entities in graph:
{existing[:50]} # Truncated for prompt size
Document:
{text}
Return JSON:
{{
"entities": [
{{
"name": "entity name",
"type": "Person|Organization|Location|Concept|Event",
"is_new": true/false,
"properties": {{}},
"linked_to_existing": "existing entity name or null"
}}
],
"relationships": [
{{
"source": "entity1",
"target": "entity2",
"type": "RELATIONSHIP_TYPE",
"properties": {{}}
}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
# Add entities
entities_added = 0
for entity in data.get("entities", []):
# Use linked name if exists
name = entity.get("linked_to_existing") or entity["name"]
self.kg.add_entity(
name=name,
entity_type=entity["type"],
properties={
**entity.get("properties", {}),
"source_doc": doc_id,
"updated_at": datetime.now().isoformat()
}
)
entities_added += 1
# Add relationships
relationships_added = 0
for rel in data.get("relationships", []):
self.kg.add_relationship(
source=rel["source"],
target=rel["target"],
rel_type=rel["type"],
properties={
**rel.get("properties", {}),
"source_doc": doc_id
}
)
relationships_added += 1
self.processed_docs.add(doc_id)
return {
"status": "processed",
"entities_added": entities_added,
"relationships_added": relationships_added
}
def merge_duplicate_entities(self):
"""Find and merge duplicate entities."""
# Get all entities
query = "MATCH (n) RETURN n.name as name, labels(n) as labels"
entities = self.kg.query_cypher(query)
# Use LLM to find duplicates
entity_names = [e["name"] for e in entities]
prompt = f"""Identify duplicate entities that should be merged.
Entities: {entity_names}
Return JSON:
{{
"merge_groups": [
{{
"canonical": "main entity name to keep",
"duplicates": ["duplicate1", "duplicate2"]
}}
]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
# Execute merges
for group in data.get("merge_groups", []):
canonical = group["canonical"]
for duplicate in group.get("duplicates", []):
# Move relationships and delete duplicate
merge_query = f"""
MATCH (dup {{name: '{duplicate}'}})
MATCH (canon {{name: '{canonical}'}})
CALL {{
WITH dup, canon
MATCH (dup)-[r]->(target)
MERGE (canon)-[r2:MERGED_REL]->(target)
DELETE r
}}
CALL {{
WITH dup, canon
MATCH (source)-[r]->(dup)
MERGE (source)-[r2:MERGED_REL]->(canon)
DELETE r
}}
DELETE dup
"""
try:
self.kg.query_cypher(merge_query)
except Exception as e:
print(f"Merge error: {e}")
return data.get("merge_groups", [])
Knowledge Graph Best Practices
- Define clear entity and relationship schemas upfront
- Use entity linking to avoid duplicates
- Combine graph queries with vector search for best results
- Decompose complex questions into graph-traversable steps
- Regularly merge and clean duplicate entities
Practice Exercise
Build a knowledge graph system that:- Extracts entities and relationships from documents
- Links mentions to canonical entities
- Supports natural language queries
- Combines graph and vector retrieval
- Handles incremental updates
- Accurate entity extraction
- Proper relationship typing
- Efficient graph queries
- Clear answer synthesis from graph data