Skip to main content
Knowledge graphs combine structured data with LLM reasoning, enabling powerful question-answering and discovery capabilities. This chapter covers building and querying knowledge graphs with AI.

Entity and Relationship Extraction

Basic Entity Extraction

from openai import OpenAI
from dataclasses import dataclass
import json


@dataclass
class Entity:
    """An extracted entity."""
    name: str
    type: str
    properties: dict = None


@dataclass
class Relationship:
    """A relationship between entities."""
    source: str
    target: str
    type: str
    properties: dict = None


class EntityExtractor:
    """Extract entities and relationships from text."""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = OpenAI()
        self.model = model
        self.entity_types = [
            "Person", "Organization", "Location", "Product",
            "Event", "Technology", "Concept"
        ]
        self.relationship_types = [
            "works_for", "located_in", "founded", "acquired",
            "partners_with", "uses", "created", "manages"
        ]
    
    def extract(self, text: str) -> dict:
        """Extract entities and relationships from text."""
        prompt = f"""Extract entities and relationships from this text.

Entity types: {', '.join(self.entity_types)}
Relationship types: {', '.join(self.relationship_types)}

Text:
{text}

Return JSON:
{{
    "entities": [
        {{"name": "entity name", "type": "entity type", "properties": {{}}}}
    ],
    "relationships": [
        {{"source": "entity1", "target": "entity2", "type": "relationship type", "properties": {{}}}}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        
        entities = [
            Entity(
                name=e["name"],
                type=e["type"],
                properties=e.get("properties", {})
            )
            for e in data.get("entities", [])
        ]
        
        relationships = [
            Relationship(
                source=r["source"],
                target=r["target"],
                type=r["type"],
                properties=r.get("properties", {})
            )
            for r in data.get("relationships", [])
        ]
        
        return {
            "entities": entities,
            "relationships": relationships
        }


# Usage
extractor = EntityExtractor()

text = """
Apple Inc., founded by Steve Jobs in Cupertino, California, 
acquired Beats Electronics in 2014. The company uses advanced 
machine learning technology in its products. Tim Cook, who 
previously worked at Compaq, now manages Apple's operations.
"""

result = extractor.extract(text)

print("Entities:")
for entity in result["entities"]:
    print(f"  {entity.name} ({entity.type})")

print("\nRelationships:")
for rel in result["relationships"]:
    print(f"  {rel.source} --[{rel.type}]--> {rel.target}")

Coreference Resolution

from openai import OpenAI
import json


class CoreferenceResolver:
    """Resolve pronouns and references to entities."""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = OpenAI()
        self.model = model
    
    def resolve(self, text: str) -> dict:
        """Resolve coreferences in text."""
        prompt = f"""Resolve all pronouns and references in this text.
For each reference (he, she, it, they, the company, etc.), identify what entity it refers to.

Text:
{text}

Return JSON:
{{
    "resolved_text": "text with references replaced by entity names",
    "resolutions": [
        {{"original": "pronoun/reference", "resolved_to": "entity name", "context": "surrounding words"}}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        return json.loads(response.choices[0].message.content)


# Usage
resolver = CoreferenceResolver()

text = """
Microsoft was founded by Bill Gates. He later stepped down as CEO, 
but the company continued to grow. They acquired LinkedIn in 2016.
"""

result = resolver.resolve(text)
print(f"Resolved text:\n{result['resolved_text']}")
print(f"\nResolutions:")
for r in result["resolutions"]:
    print(f"  '{r['original']}' -> '{r['resolved_to']}'")

Neo4j Integration

Building a Knowledge Graph

from neo4j import GraphDatabase
from openai import OpenAI
from dataclasses import dataclass
import json


class KnowledgeGraph:
    """Knowledge graph backed by Neo4j."""
    
    def __init__(
        self,
        neo4j_uri: str,
        neo4j_user: str,
        neo4j_password: str,
        model: str = "gpt-4o-mini"
    ):
        self.driver = GraphDatabase.driver(
            neo4j_uri,
            auth=(neo4j_user, neo4j_password)
        )
        self.client = OpenAI()
        self.model = model
    
    def close(self):
        """Close database connection."""
        self.driver.close()
    
    def add_entity(self, name: str, entity_type: str, properties: dict = None):
        """Add an entity node to the graph."""
        properties = properties or {}
        properties["name"] = name
        
        query = f"""
        MERGE (n:{entity_type} {{name: $name}})
        SET n += $properties
        RETURN n
        """
        
        with self.driver.session() as session:
            session.run(query, name=name, properties=properties)
    
    def add_relationship(
        self,
        source: str,
        target: str,
        rel_type: str,
        properties: dict = None
    ):
        """Add a relationship between entities."""
        properties = properties or {}
        
        query = f"""
        MATCH (a {{name: $source}})
        MATCH (b {{name: $target}})
        MERGE (a)-[r:{rel_type}]->(b)
        SET r += $properties
        RETURN r
        """
        
        with self.driver.session() as session:
            session.run(
                query,
                source=source,
                target=target,
                properties=properties
            )
    
    def ingest_text(self, text: str):
        """Extract entities and relationships from text and add to graph."""
        # Extract using LLM
        prompt = f"""Extract entities and relationships from this text.

Text:
{text}

Return JSON:
{{
    "entities": [
        {{"name": "entity", "type": "Person|Organization|Location|Concept", "properties": {{}}}}
    ],
    "relationships": [
        {{"source": "entity1", "target": "entity2", "type": "RELATIONSHIP_TYPE", "properties": {{}}}}
    ]
}}

Use UPPERCASE_WITH_UNDERSCORES for relationship types."""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        
        # Add entities
        for entity in data.get("entities", []):
            self.add_entity(
                entity["name"],
                entity["type"],
                entity.get("properties", {})
            )
        
        # Add relationships
        for rel in data.get("relationships", []):
            self.add_relationship(
                rel["source"],
                rel["target"],
                rel["type"],
                rel.get("properties", {})
            )
        
        return data
    
    def query_cypher(self, query: str) -> list:
        """Execute a Cypher query."""
        with self.driver.session() as session:
            result = session.run(query)
            return [record.data() for record in result]
    
    def natural_language_query(self, question: str) -> str:
        """Convert natural language to Cypher and execute."""
        # Get schema context
        schema = self._get_schema()
        
        prompt = f"""Convert this question to a Cypher query.

Graph schema:
{schema}

Question: {question}

Return only the Cypher query, no explanation:"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        cypher = response.choices[0].message.content.strip()
        cypher = cypher.replace("```cypher", "").replace("```", "").strip()
        
        # Execute query
        results = self.query_cypher(cypher)
        
        # Generate natural language response
        answer_prompt = f"""Answer this question based on the query results.

Question: {question}
Results: {json.dumps(results)}

Answer concisely:"""
        
        answer = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": answer_prompt}]
        )
        
        return answer.choices[0].message.content
    
    def _get_schema(self) -> str:
        """Get graph schema for query generation."""
        query = """
        CALL db.schema.visualization()
        """
        try:
            with self.driver.session() as session:
                result = session.run("CALL db.labels()")
                labels = [r["label"] for r in result]
                
                result = session.run("CALL db.relationshipTypes()")
                rel_types = [r["relationshipType"] for r in result]
                
            return f"Node labels: {labels}\nRelationship types: {rel_types}"
        except Exception:
            return "Schema not available"


# Usage
kg = KnowledgeGraph(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password"
)

# Ingest documents
text = """
OpenAI, based in San Francisco, developed GPT-4. Sam Altman leads OpenAI.
Microsoft invested $10 billion in OpenAI and integrated GPT-4 into Bing.
Google, OpenAI's competitor, developed Gemini in Mountain View.
"""

kg.ingest_text(text)

# Query naturally
answer = kg.natural_language_query("Which companies are in San Francisco?")
print(answer)

kg.close()

GraphRAG Pattern

from openai import OpenAI
from dataclasses import dataclass
import json
from typing import Optional


@dataclass
class GraphContext:
    """Context retrieved from knowledge graph."""
    entities: list
    relationships: list
    paths: list
    summaries: list


class GraphRAG:
    """RAG system combining knowledge graph with vector retrieval."""
    
    def __init__(
        self,
        kg_client,  # Neo4j or similar
        vector_store,  # Vector database client
        model: str = "gpt-4o-mini"
    ):
        self.client = OpenAI()
        self.model = model
        self.kg = kg_client
        self.vector_store = vector_store
    
    def _extract_query_entities(self, query: str) -> list[str]:
        """Extract entity names from query."""
        prompt = f"""Extract entity names mentioned in this query.

Query: {query}

Return JSON: {{"entities": ["entity1", "entity2"]}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        return data.get("entities", [])
    
    def _get_graph_context(
        self,
        entities: list[str],
        max_hops: int = 2
    ) -> GraphContext:
        """Retrieve context from knowledge graph."""
        all_entities = []
        all_relationships = []
        all_paths = []
        
        for entity in entities:
            # Get entity and its neighbors
            query = f"""
            MATCH (n {{name: $name}})
            OPTIONAL MATCH (n)-[r]-(m)
            RETURN n, r, m
            LIMIT 50
            """
            
            results = self.kg.query_cypher(query.replace("$name", f"'{entity}'"))
            
            for result in results:
                if result.get("n"):
                    all_entities.append(result["n"])
                if result.get("m"):
                    all_entities.append(result["m"])
                if result.get("r"):
                    all_relationships.append(result["r"])
            
            # Get paths between query entities
            if len(entities) > 1:
                for other_entity in entities:
                    if other_entity != entity:
                        path_query = f"""
                        MATCH path = shortestPath(
                            (a {{name: '{entity}'}})-[*..{max_hops}]-(b {{name: '{other_entity}'}})
                        )
                        RETURN path
                        LIMIT 5
                        """
                        paths = self.kg.query_cypher(path_query)
                        all_paths.extend(paths)
        
        return GraphContext(
            entities=all_entities,
            relationships=all_relationships,
            paths=all_paths,
            summaries=[]
        )
    
    def _get_vector_context(self, query: str, k: int = 5) -> list[str]:
        """Retrieve relevant documents from vector store."""
        results = self.vector_store.similarity_search(query, k=k)
        return [doc.page_content for doc in results]
    
    def _format_graph_context(self, context: GraphContext) -> str:
        """Format graph context for prompt."""
        lines = ["Graph Knowledge:"]
        
        # Entities
        seen_entities = set()
        for entity in context.entities:
            if isinstance(entity, dict) and entity.get("name"):
                name = entity["name"]
                if name not in seen_entities:
                    seen_entities.add(name)
                    lines.append(f"- Entity: {name}")
        
        # Relationships
        for rel in context.relationships:
            if isinstance(rel, dict):
                lines.append(f"- Relationship: {rel}")
        
        return "\n".join(lines)
    
    def query(self, question: str) -> dict:
        """Answer question using both graph and vector context."""
        # Extract entities from question
        entities = self._extract_query_entities(question)
        
        # Get graph context
        graph_context = self._get_graph_context(entities)
        graph_text = self._format_graph_context(graph_context)
        
        # Get vector context
        vector_context = self._get_vector_context(question)
        vector_text = "\n".join([f"Document: {doc}" for doc in vector_context])
        
        # Combine and answer
        prompt = f"""Answer this question using the provided context.

{graph_text}

Document Context:
{vector_text}

Question: {question}

Provide a comprehensive answer based on both the graph relationships and document content:"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return {
            "answer": response.choices[0].message.content,
            "entities_used": entities,
            "graph_context": graph_context,
            "sources": vector_context
        }

Entity Linking and Resolution

from openai import OpenAI
import json
from dataclasses import dataclass


@dataclass
class LinkedEntity:
    """An entity linked to canonical form."""
    mention: str
    canonical_name: str
    entity_type: str
    confidence: float
    external_id: Optional[str] = None


class EntityLinker:
    """Link entity mentions to canonical entities."""
    
    def __init__(
        self,
        knowledge_base: dict = None,
        model: str = "gpt-4o-mini"
    ):
        self.client = OpenAI()
        self.model = model
        self.knowledge_base = knowledge_base or {}
    
    def add_to_kb(self, canonical: str, aliases: list[str], entity_type: str):
        """Add entity to knowledge base."""
        self.knowledge_base[canonical] = {
            "aliases": aliases,
            "type": entity_type
        }
    
    def link_entities(self, text: str) -> list[LinkedEntity]:
        """Link entity mentions in text to canonical forms."""
        # Get KB context
        kb_context = json.dumps(self.knowledge_base, indent=2)
        
        prompt = f"""Link entity mentions in this text to their canonical forms.

Knowledge Base:
{kb_context}

Text:
{text}

For each entity mention, determine:
1. The canonical name from the knowledge base (if found)
2. The entity type
3. Confidence (0-1)

Return JSON:
{{
    "linked_entities": [
        {{
            "mention": "text as it appears",
            "canonical_name": "canonical form",
            "entity_type": "type",
            "confidence": 0.95
        }}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        
        return [
            LinkedEntity(
                mention=e["mention"],
                canonical_name=e["canonical_name"],
                entity_type=e["entity_type"],
                confidence=e["confidence"]
            )
            for e in data.get("linked_entities", [])
        ]
    
    def resolve_duplicates(self, entities: list[str]) -> dict:
        """Resolve duplicate entities to canonical forms."""
        prompt = f"""These entity names may refer to the same things.
Group them by what they refer to and pick a canonical name for each group.

Entities: {entities}

Return JSON:
{{
    "groups": [
        {{
            "canonical": "canonical name",
            "variants": ["variant1", "variant2"],
            "type": "entity type"
        }}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        return json.loads(response.choices[0].message.content)


# Usage
linker = EntityLinker()

# Add known entities
linker.add_to_kb(
    "Apple Inc.",
    ["Apple", "AAPL", "Apple Computer"],
    "Organization"
)
linker.add_to_kb(
    "Microsoft Corporation",
    ["Microsoft", "MSFT", "MS"],
    "Organization"
)
linker.add_to_kb(
    "Tim Cook",
    ["Timothy Cook", "Cook"],
    "Person"
)

text = "Cook announced that Apple would partner with MS on new features."
linked = linker.link_entities(text)

for entity in linked:
    print(f"'{entity.mention}' -> '{entity.canonical_name}' ({entity.confidence:.0%})")

Graph-Based Question Answering

from openai import OpenAI
import json


class GraphQA:
    """Question answering over knowledge graphs."""
    
    def __init__(
        self,
        kg_client,
        model: str = "gpt-4o-mini"
    ):
        self.client = OpenAI()
        self.model = model
        self.kg = kg_client
    
    def decompose_question(self, question: str) -> list[dict]:
        """Decompose complex question into sub-questions."""
        prompt = f"""Decompose this question into simpler sub-questions that can be answered from a knowledge graph.

Question: {question}

Return JSON:
{{
    "sub_questions": [
        {{"question": "simple question", "depends_on": [indices of prior questions]}}
    ],
    "reasoning": "how sub-questions combine to answer original"
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        return json.loads(response.choices[0].message.content)
    
    def generate_cypher(self, question: str, schema: str) -> str:
        """Generate Cypher query for a question."""
        prompt = f"""Generate a Cypher query to answer this question.

Graph schema:
{schema}

Question: {question}

Return only the Cypher query:"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        query = response.choices[0].message.content.strip()
        return query.replace("```cypher", "").replace("```", "").strip()
    
    def answer_with_reasoning(self, question: str) -> dict:
        """Answer question with step-by-step reasoning."""
        # Decompose question
        decomposition = self.decompose_question(question)
        
        # Answer each sub-question
        sub_answers = []
        for i, sub_q in enumerate(decomposition.get("sub_questions", [])):
            # Check dependencies
            dependencies = sub_q.get("depends_on", [])
            context = ""
            if dependencies:
                context = "Prior answers:\n" + "\n".join([
                    f"- {sub_answers[j]}" for j in dependencies if j < len(sub_answers)
                ])
            
            # Generate and execute query
            schema = self.kg._get_schema() if hasattr(self.kg, "_get_schema") else ""
            cypher = self.generate_cypher(sub_q["question"] + "\n" + context, schema)
            
            try:
                results = self.kg.query_cypher(cypher)
                sub_answers.append({
                    "question": sub_q["question"],
                    "query": cypher,
                    "results": results
                })
            except Exception as e:
                sub_answers.append({
                    "question": sub_q["question"],
                    "error": str(e)
                })
        
        # Synthesize final answer
        synthesis_prompt = f"""Synthesize an answer to this question from the sub-question results.

Original question: {question}

Sub-question results:
{json.dumps(sub_answers, indent=2)}

Reasoning: {decomposition.get('reasoning', '')}

Provide a clear, comprehensive answer:"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": synthesis_prompt}]
        )
        
        return {
            "question": question,
            "decomposition": decomposition,
            "sub_answers": sub_answers,
            "final_answer": response.choices[0].message.content
        }


# Usage
qa = GraphQA(kg_client=kg)

result = qa.answer_with_reasoning(
    "Which companies that Microsoft invested in are based in San Francisco?"
)

print(f"Question: {result['question']}")
print(f"Answer: {result['final_answer']}")

Incremental Graph Building

from openai import OpenAI
import json
from datetime import datetime


class IncrementalGraphBuilder:
    """Build knowledge graph incrementally from documents."""
    
    def __init__(
        self,
        kg_client,
        model: str = "gpt-4o-mini"
    ):
        self.client = OpenAI()
        self.model = model
        self.kg = kg_client
        self.processed_docs = set()
    
    def _get_existing_entities(self) -> list[str]:
        """Get list of existing entity names."""
        query = "MATCH (n) RETURN DISTINCT n.name as name LIMIT 1000"
        results = self.kg.query_cypher(query)
        return [r["name"] for r in results if r.get("name")]
    
    def process_document(
        self,
        doc_id: str,
        text: str,
        metadata: dict = None
    ) -> dict:
        """Process a document and update the graph."""
        if doc_id in self.processed_docs:
            return {"status": "already_processed"}
        
        # Get existing entities for context
        existing = self._get_existing_entities()
        
        prompt = f"""Extract entities and relationships from this document.
Link to existing entities where appropriate.

Existing entities in graph:
{existing[:50]}  # Truncated for prompt size

Document:
{text}

Return JSON:
{{
    "entities": [
        {{
            "name": "entity name",
            "type": "Person|Organization|Location|Concept|Event",
            "is_new": true/false,
            "properties": {{}},
            "linked_to_existing": "existing entity name or null"
        }}
    ],
    "relationships": [
        {{
            "source": "entity1",
            "target": "entity2", 
            "type": "RELATIONSHIP_TYPE",
            "properties": {{}}
        }}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        
        # Add entities
        entities_added = 0
        for entity in data.get("entities", []):
            # Use linked name if exists
            name = entity.get("linked_to_existing") or entity["name"]
            
            self.kg.add_entity(
                name=name,
                entity_type=entity["type"],
                properties={
                    **entity.get("properties", {}),
                    "source_doc": doc_id,
                    "updated_at": datetime.now().isoformat()
                }
            )
            entities_added += 1
        
        # Add relationships
        relationships_added = 0
        for rel in data.get("relationships", []):
            self.kg.add_relationship(
                source=rel["source"],
                target=rel["target"],
                rel_type=rel["type"],
                properties={
                    **rel.get("properties", {}),
                    "source_doc": doc_id
                }
            )
            relationships_added += 1
        
        self.processed_docs.add(doc_id)
        
        return {
            "status": "processed",
            "entities_added": entities_added,
            "relationships_added": relationships_added
        }
    
    def merge_duplicate_entities(self):
        """Find and merge duplicate entities."""
        # Get all entities
        query = "MATCH (n) RETURN n.name as name, labels(n) as labels"
        entities = self.kg.query_cypher(query)
        
        # Use LLM to find duplicates
        entity_names = [e["name"] for e in entities]
        
        prompt = f"""Identify duplicate entities that should be merged.

Entities: {entity_names}

Return JSON:
{{
    "merge_groups": [
        {{
            "canonical": "main entity name to keep",
            "duplicates": ["duplicate1", "duplicate2"]
        }}
    ]
}}"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        data = json.loads(response.choices[0].message.content)
        
        # Execute merges
        for group in data.get("merge_groups", []):
            canonical = group["canonical"]
            for duplicate in group.get("duplicates", []):
                # Move relationships and delete duplicate
                merge_query = f"""
                MATCH (dup {{name: '{duplicate}'}})
                MATCH (canon {{name: '{canonical}'}})
                CALL {{
                    WITH dup, canon
                    MATCH (dup)-[r]->(target)
                    MERGE (canon)-[r2:MERGED_REL]->(target)
                    DELETE r
                }}
                CALL {{
                    WITH dup, canon
                    MATCH (source)-[r]->(dup)
                    MERGE (source)-[r2:MERGED_REL]->(canon)
                    DELETE r
                }}
                DELETE dup
                """
                try:
                    self.kg.query_cypher(merge_query)
                except Exception as e:
                    print(f"Merge error: {e}")
        
        return data.get("merge_groups", [])
Knowledge Graph Best Practices
  • Define clear entity and relationship schemas upfront
  • Use entity linking to avoid duplicates
  • Combine graph queries with vector search for best results
  • Decompose complex questions into graph-traversable steps
  • Regularly merge and clean duplicate entities

Practice Exercise

Build a knowledge graph system that:
  1. Extracts entities and relationships from documents
  2. Links mentions to canonical entities
  3. Supports natural language queries
  4. Combines graph and vector retrieval
  5. Handles incremental updates
Focus on:
  • Accurate entity extraction
  • Proper relationship typing
  • Efficient graph queries
  • Clear answer synthesis from graph data