Skip to content

Bulk embed #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions graphiti_core/edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ async def get_by_group_ids(
async def get_by_node_uuid(cls, driver: AsyncDriver, node_uuid: str):
query: LiteralString = (
"""
MATCH (n:Entity {uuid: $node_uuid})-[e:RELATES_TO]-(m:Entity)
"""
MATCH (n:Entity {uuid: $node_uuid})-[e:RELATES_TO]-(m:Entity)
"""
+ ENTITY_EDGE_RETURN
)
records, _, _ = await driver.execute_query(
Expand Down Expand Up @@ -468,3 +468,9 @@ def get_community_edge_from_record(record: Any):
target_node_uuid=record['target_node_uuid'],
created_at=record['created_at'].to_native(),
)


async def create_entity_edge_embeddings(embedder: EmbedderClient, edges: list[EntityEdge]):
fact_embeddings = await embedder.create_batch([edge.fact for edge in edges])
for edge, fact_embedding in zip(edges, fact_embeddings, strict=True):
edge.fact_embedding = fact_embedding
3 changes: 3 additions & 0 deletions graphiti_core/embedder/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ async def create(
self, input_data: str | list[str] | Iterable[int] | Iterable[Iterable[int]]
) -> list[float]:
pass

async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
raise NotImplementedError()
10 changes: 10 additions & 0 deletions graphiti_core/embedder/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@ async def create(
)

return result.embeddings[0].values

async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
# Generate embeddings
result = await self.client.aio.models.embed_content(
model=self.config.embedding_model or DEFAULT_EMBEDDING_MODEL,
contents=input_data_list,
config=types.EmbedContentConfig(output_dimensionality=self.config.embedding_dim),
)

return [embedding.values for embedding in result.embeddings]
6 changes: 6 additions & 0 deletions graphiti_core/embedder/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ async def create(
input=input_data, model=self.config.embedding_model
)
return result.data[0].embedding[: self.config.embedding_dim]

async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
result = await self.client.embeddings.create(
input=input_data_list, model=self.config.embedding_model
)
return [embedding.embedding[: self.config.embedding_dim] for embedding in result.data]
7 changes: 7 additions & 0 deletions graphiti_core/embedder/voyage.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,10 @@ async def create(

result = await self.client.embed(input_list, model=self.config.embedding_model)
return [float(x) for x in result.embeddings[0][: self.config.embedding_dim]]

async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
result = await self.client.embed(input_data_list, model=self.config.embedding_model)
return [
[float(x) for x in embedding[: self.config.embedding_dim]]
for embedding in result.embeddings
]
10 changes: 8 additions & 2 deletions graphiti_core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ async def save(self, driver: AsyncDriver):
async def get_by_uuid(cls, driver: AsyncDriver, uuid: str):
query = (
"""
MATCH (n:Entity {uuid: $uuid})
"""
MATCH (n:Entity {uuid: $uuid})
"""
+ ENTITY_NODE_RETURN
)
records, _, _ = await driver.execute_query(
Expand Down Expand Up @@ -560,3 +560,9 @@ def get_community_node_from_record(record: Any) -> CommunityNode:
created_at=record['created_at'].to_native(),
summary=record['summary'],
)


async def create_entity_node_embeddings(embedder: EmbedderClient, nodes: list[EntityNode]):
name_embeddings = await embedder.create_batch([node.name for node in nodes])
for node, name_embedding in zip(nodes, name_embeddings, strict=True):
node.name_embedding = name_embedding
12 changes: 8 additions & 4 deletions graphiti_core/utils/maintenance/edge_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from datetime import datetime
from time import time

from graphiti_core.edges import CommunityEdge, EntityEdge, EpisodicEdge
from graphiti_core.edges import (
CommunityEdge,
EntityEdge,
EpisodicEdge,
create_entity_edge_embeddings,
)
from graphiti_core.graphiti_types import GraphitiClients
from graphiti_core.helpers import MAX_REFLEXION_ITERATIONS, semaphore_gather
from graphiti_core.llm_client import LLMClient
Expand Down Expand Up @@ -152,8 +157,7 @@ async def extract_edges(
f'Created new edge: {edge.name} from (UUID: {edge.source_node_uuid}) to (UUID: {edge.target_node_uuid})'
)

# calculate embeddings
await semaphore_gather(*[edge.generate_embedding(embedder) for edge in edges])
await create_entity_edge_embeddings(embedder, edges)

logger.debug(f'Extracted edges: {[(e.name, e.uuid) for e in edges]}')

Expand Down Expand Up @@ -214,7 +218,7 @@ async def resolve_extracted_edges(
llm_client = clients.llm_client

related_edges_lists: list[list[EntityEdge]] = await get_relevant_edges(
driver, extracted_edges, SearchFilters(), 0.8
driver, extracted_edges, SearchFilters()
)

logger.debug(
Expand Down
6 changes: 3 additions & 3 deletions graphiti_core/utils/maintenance/node_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from graphiti_core.graphiti_types import GraphitiClients
from graphiti_core.helpers import MAX_REFLEXION_ITERATIONS, semaphore_gather
from graphiti_core.llm_client import LLMClient
from graphiti_core.nodes import EntityNode, EpisodeType, EpisodicNode
from graphiti_core.nodes import EntityNode, EpisodeType, EpisodicNode, create_entity_node_embeddings
from graphiti_core.prompts import prompt_library
from graphiti_core.prompts.dedupe_nodes import NodeDuplicate
from graphiti_core.prompts.extract_nodes import EntityClassification, ExtractedNodes, MissedEntities
Expand Down Expand Up @@ -211,7 +211,7 @@ async def extract_nodes(
extracted_nodes.append(new_node)
logger.debug(f'Created new node: {new_node.name} (UUID: {new_node.uuid})')

await semaphore_gather(*[node.generate_name_embedding(embedder) for node in extracted_nodes])
await create_entity_node_embeddings(embedder, extracted_nodes)

logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}')
return extracted_nodes
Expand Down Expand Up @@ -279,7 +279,7 @@ async def resolve_extracted_nodes(

# Find relevant nodes already in the graph
existing_nodes_lists: list[list[EntityNode]] = await get_relevant_nodes(
driver, extracted_nodes, SearchFilters(), 0.8
driver, extracted_nodes, SearchFilters()
)

uuid_map: dict[str, str] = {}
Expand Down