Skip to content

fix(store): honor limit parameter in Redis search operations (#30) #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions langgraph/store/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def _batch_search_ops(
query_vectors = dict(zip([idx for idx, _ in embedding_requests], vectors))

# Process each search operation
for (idx, op), (query_str, params) in zip(search_ops, queries):
for (idx, op), (query_str, params, limit, offset) in zip(search_ops, queries):
if op.query and idx in query_vectors:
# Vector similarity search
vector = query_vectors[idx]
Expand All @@ -376,7 +376,7 @@ def _batch_search_ops(
vector_field_name="embedding",
filter_expression=f"@prefix:{_namespace_to_text(op.namespace_prefix)}*",
return_fields=["prefix", "key", "vector_distance"],
num_results=op.limit,
num_results=limit, # Use the user-specified limit
)
vector_results = self.vector_index.query(vector_query)

Expand Down Expand Up @@ -469,8 +469,10 @@ def _batch_search_ops(
results[idx] = items
else:
# Regular search
query = Query(query_str)
# Get all potential matches for filtering
# Create a query with LIMIT and OFFSET parameters
query = Query(query_str).paging(offset, limit)

# Execute search with limit and offset applied by Redis
res = self.store_index.search(query)
items = []
refresh_keys = [] # Track keys that need TTL refreshed
Expand Down Expand Up @@ -505,10 +507,7 @@ def _batch_search_ops(

items.append(_row_to_search_item(_decode_ns(data["prefix"]), data))

# Apply pagination after filtering
if params:
limit, offset = params
items = items[offset : offset + limit]
# Note: Pagination is now handled by Redis, no need to slice items manually

# Refresh TTL if requested
if op.refresh_ttl and refresh_keys and self.ttl_config:
Expand Down
17 changes: 8 additions & 9 deletions langgraph/store/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ async def _batch_search_ops(
query_vectors = dict(zip([idx for idx, _ in embedding_requests], vectors))

# Process each search operation
for (idx, op), (query_str, params) in zip(search_ops, queries):
for (idx, op), (query_str, params, limit, offset) in zip(search_ops, queries):
if op.query and idx in query_vectors:
# Vector similarity search
vector = query_vectors[idx]
Expand All @@ -658,7 +658,7 @@ async def _batch_search_ops(
vector_field_name="embedding",
filter_expression=f"@prefix:{_namespace_to_text(op.namespace_prefix)}*",
return_fields=["prefix", "key", "vector_distance"],
num_results=op.limit,
num_results=limit, # Use the user-specified limit
)
)

Expand Down Expand Up @@ -722,8 +722,10 @@ async def _batch_search_ops(
results[idx] = items
else:
# Regular search
query = Query(query_str)
# Get all potential matches for filtering
# Create a query with LIMIT and OFFSET parameters
query = Query(query_str).paging(offset, limit)

# Execute search with limit and offset applied by Redis
res = await self.store_index.search(query)
items = []

Expand All @@ -746,12 +748,9 @@ async def _batch_search_ops(
continue
items.append(_row_to_search_item(_decode_ns(data["prefix"]), data))

# Apply pagination after filtering
if params:
limit, offset = params
items = items[offset : offset + limit]
# Note: Pagination is now handled by Redis, no need to slice items manually

results[idx] = items
results[idx] = items

async def _batch_list_namespaces_ops(
self,
Expand Down
8 changes: 5 additions & 3 deletions langgraph/store/redis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _prepare_batch_PUT_queries(
def _get_batch_search_queries(
self,
search_ops: Sequence[tuple[int, SearchOp]],
) -> tuple[list[tuple[str, list]], list[tuple[int, str]]]:
) -> tuple[list[tuple[str, list, int, int]], list[tuple[int, str]]]:
"""Convert search operations into Redis queries."""
queries = []
embedding_requests = []
Expand All @@ -413,8 +413,10 @@ def _get_batch_search_queries(
embedding_requests.append((idx, op.query))

query = " ".join(filter_conditions) if filter_conditions else "*"
params = [op.limit, op.offset] if op.limit or op.offset else []
queries.append((query, params))
limit = op.limit if op.limit is not None else 10
offset = op.offset if op.offset is not None else 0
params = [limit, offset]
queries.append((query, params, limit, offset))

return queries, embedding_requests

Expand Down
73 changes: 73 additions & 0 deletions tests/test_async_search_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Tests for AsyncRedisStore search limits."""

from __future__ import annotations

import pytest
import pytest_asyncio

from langgraph.store.redis import AsyncRedisStore


@pytest_asyncio.fixture(scope="function")
async def async_store(redis_url) -> AsyncRedisStore:
"""Fixture to create an AsyncRedisStore."""
async with AsyncRedisStore(redis_url) as store:
await store.setup() # Initialize indices
yield store


@pytest.mark.asyncio
async def test_async_search_with_larger_limit(async_store: AsyncRedisStore) -> None:
"""Test async search with limit > 10."""
# Create 15 test documents
for i in range(15):
await async_store.aput(
("test_namespace",), f"key{i}", {"data": f"value{i}", "index": i}
)

# Search with a limit of 15
results = await async_store.asearch(("test_namespace",), limit=15)

# Should return all 15 results
assert len(results) == 15, f"Expected 15 results, got {len(results)}"

# Verify we have all the items
result_keys = {item.key for item in results}
expected_keys = {f"key{i}" for i in range(15)}
assert result_keys == expected_keys


@pytest.mark.asyncio
async def test_async_vector_search_with_larger_limit(redis_url) -> None:
"""Test async vector search with limit > 10."""
from tests.embed_test_utils import CharacterEmbeddings

# Create vector store with embeddings
embeddings = CharacterEmbeddings(dims=4)
index_config = {
"dims": embeddings.dims,
"embed": embeddings,
"distance_type": "cosine",
"fields": ["text"],
}

async with AsyncRedisStore(redis_url, index=index_config) as store:
await store.setup()

# Create 15 test documents
for i in range(15):
# Create documents with slightly different texts
await store.aput(
("test_namespace",), f"key{i}", {"text": f"sample text {i}", "index": i}
)

# Search with a limit of 15
results = await store.asearch(("test_namespace",), query="sample", limit=15)

# Should return all 15 results
assert len(results) == 15, f"Expected 15 results, got {len(results)}"

# Verify we have all the items
result_keys = {item.key for item in results}
expected_keys = {f"key{i}" for i in range(15)}
assert result_keys == expected_keys
68 changes: 68 additions & 0 deletions tests/test_search_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Tests for RedisStore search limits."""

from __future__ import annotations

import pytest

from langgraph.store.redis import RedisStore


@pytest.fixture(scope="function")
def store(redis_url) -> RedisStore:
"""Fixture to create a Redis store."""
with RedisStore.from_conn_string(redis_url) as store:
store.setup() # Initialize indices
yield store


def test_search_with_larger_limit(store: RedisStore) -> None:
"""Test search with limit > 10."""
# Create 15 test documents
for i in range(15):
store.put(("test_namespace",), f"key{i}", {"data": f"value{i}", "index": i})

# Search with a limit of 15
results = store.search(("test_namespace",), limit=15)

# Should return all 15 results
assert len(results) == 15, f"Expected 15 results, got {len(results)}"

# Verify we have all the items
result_keys = {item.key for item in results}
expected_keys = {f"key{i}" for i in range(15)}
assert result_keys == expected_keys


def test_vector_search_with_larger_limit(redis_url) -> None:
"""Test vector search with limit > 10."""
from tests.embed_test_utils import CharacterEmbeddings

# Create vector store with embeddings
embeddings = CharacterEmbeddings(dims=4)
index_config = {
"dims": embeddings.dims,
"embed": embeddings,
"distance_type": "cosine",
"fields": ["text"],
}

with RedisStore.from_conn_string(redis_url, index=index_config) as store:
store.setup()

# Create 15 test documents
for i in range(15):
# Create documents with slightly different texts
store.put(
("test_namespace",), f"key{i}", {"text": f"sample text {i}", "index": i}
)

# Search with a limit of 15
results = store.search(("test_namespace",), query="sample", limit=15)

# Should return all 15 results
assert len(results) == 15, f"Expected 15 results, got {len(results)}"

# Verify we have all the items
result_keys = {item.key for item in results}
expected_keys = {f"key{i}" for i in range(15)}
assert result_keys == expected_keys