Skip to content

Batch Insert doesn't work #348

Open
Open
@gluonfield

Description

@gluonfield

I have took official graphiti example and tried to insert batch data. The operation resulted in errors

Connection closed
Traceback (most recent call last):
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/index_test.py", line 247, in <module>
    asyncio.run(main())
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/index_test.py", line 132, in main
    await graphiti.add_episode_bulk(episodes_batch)
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/graphiti.py", line 602, in add_episode_bulk
    raise e
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/graphiti.py", line 566, in add_episode_bulk
    (nodes, uuid_map), extracted_edges_timestamped = await semaphore_gather(
                                                     ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 96, in semaphore_gather
    return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 94, in _wrap_coroutine
    return await coroutine
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/bulk_utils.py", line 185, in dedupe_nodes_bulk
    compressed_nodes, compressed_map = await compress_nodes(llm_client, nodes, uuid_map)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/bulk_utils.py", line 311, in compress_nodes
    results = await semaphore_gather(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 96, in semaphore_gather
    return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 94, in _wrap_coroutine
    return await coroutine
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/maintenance/node_operations.py", line 391, in dedupe_node_list
    llm_response = await llm_client.generate_response(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/llm_client/openai_client.py", line 139, in generate_response
    response = await self._generate_response(messages, response_model, max_tokens)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/llm_client/openai_client.py", line 104, in _generate_response
    response = await self.client.beta.chat.completions.parse(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/openai/resources/beta/chat/completions.py", line 458, in parse
    "response_format": _type_to_response_format(response_format),
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/openai/lib/_parsing/_completions.py", line 255, in type_to_response_format_param
    raise TypeError(f"Unsupported response_format type - {response_format}")
TypeError: Unsupported response_format type - None

The code that reproduces the error is effectively official example.

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from logging import INFO

from dotenv import load_dotenv

from graphiti_core import Graphiti
from graphiti_core.graphiti import RawEpisode
from graphiti_core.nodes import EpisodeType
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_RRF

#################################################
# CONFIGURATION
#################################################
# Set up logging and environment variables for
# connecting to Neo4j database
#################################################

# Configure logging
logging.basicConfig(
    level=INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

load_dotenv()

# Neo4j connection parameters
# Make sure Neo4j Desktop is running with a local DBMS started
neo4j_uri = ...
neo4j_user = ...
neo4j_password = ...
if not neo4j_uri or not neo4j_user or not neo4j_password:
    raise ValueError("NEO4J_URI, NEO4J_USER, and NEO4J_PASSWORD must be set")


async def main():
    #################################################
    # INITIALIZATION
    #################################################
    # Connect to Neo4j and set up Graphiti indices
    # This is required before using other Graphiti
    # functionality
    #################################################

    # Initialize Graphiti with Neo4j connection
    graphiti = Graphiti(neo4j_uri, neo4j_user, neo4j_password)

    try:
        # Initialize the graph database with graphiti's indices. This only needs to be done once.
        await graphiti.build_indices_and_constraints(delete_existing=True)

        #################################################
        # ADDING EPISODES
        #################################################
        # Episodes are the primary units of information
        # in Graphiti. They can be text or structured JSON
        # and are automatically processed to extract entities
        # and relationships.
        #################################################

        # Example: Add Episodes
        # Episodes list containing both text and JSON episodes
        episodes = [
            {
                "content": "Kamala Harris is the Attorney General of California. She was previously "
                "the district attorney for San Francisco.",
                "type": EpisodeType.text,
                "description": "podcast transcript",
            },
            {
                "content": "As AG, Harris was in office from January 3, 2011 – January 3, 2017",
                "type": EpisodeType.text,
                "description": "podcast transcript",
            }
        ]

        # Add episodes to the graph
        episodes_batch: list[RawEpisode] = []
        for i, episode in enumerate(episodes[0:2]):
            episodes_batch.append(
                RawEpisode(
                    name=f"Freakonomics Radio {i}",
                    content=episode["content"],
                    source=episode["type"],
                    source_description=episode["description"],
                    reference_time=datetime.now(timezone.utc),
                )
            )

        print(episodes_batch)
        await graphiti.add_episode_bulk(episodes_batch)

    finally:
        #################################################
        # CLEANUP
        #################################################
        # Always close the connection to Neo4j when
        # finished to properly release resources
        #################################################

        # Close the connection
        await graphiti.close()
        print("\nConnection closed")


if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions