Skip to content

Commit 238d0c7

Browse files
authored
feat: Support Redis Cluster (#35)
Add support for Redis Cluster in the Store implementations: Detect when the client is connected to a Redis Cluster or Enterprise node If using Cluster, use single-key commands instead of multi-key commands, including transactions. Allow users to override cluster detection. This is useful with something like the Redis Enterprise proxy, which can be used from a non-cluster-aware client. This is "Part One" because our lower-level dependency, RedisVL, also needs to be updated to support cluster-aware clients. However, with the changes in this PR, users who use Redis clusters through a proxy (with a non-cluster-aware client, e.g., Redis, not RedisCluster) can set cluster_mode=True so we change our multi-key command behavior.
1 parent ea9391d commit 238d0c7

File tree

12 files changed

+1066
-233
lines changed

12 files changed

+1066
-233
lines changed

langgraph/checkpoint/redis/aio.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,10 @@ async def __aexit__(
131131
) -> None:
132132
"""Async context manager exit."""
133133
if self._owns_its_client:
134-
await self._redis.aclose() # type: ignore[attr-defined]
135-
await self._redis.connection_pool.disconnect()
134+
await self._redis.aclose()
135+
coro = self._redis.connection_pool.disconnect()
136+
if coro:
137+
await coro
136138

137139
# Prevent RedisVL from attempting to close the client
138140
# on an event loop in a separate thread.

langgraph/checkpoint/redis/ashallow.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,10 @@ async def __aexit__(
134134
tb: Optional[TracebackType],
135135
) -> None:
136136
if self._owns_its_client:
137-
await self._redis.aclose() # type: ignore[attr-defined]
138-
await self._redis.connection_pool.disconnect()
137+
await self._redis.aclose()
138+
coro = self._redis.connection_pool.disconnect()
139+
if coro:
140+
await coro
139141

140142
# Prevent RedisVL from attempting to close the client
141143
# on an event loop in a separate thread.

langgraph/checkpoint/redis/base.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def set_client_info(self) -> None:
154154

155155
try:
156156
# Try to use client_setinfo command if available
157-
self._redis.client_setinfo("LIB-NAME", __full_lib_name__) # type: ignore
157+
self._redis.client_setinfo("LIB-NAME", __full_lib_name__)
158158
except (ResponseError, AttributeError):
159159
# Fall back to a simple echo if client_setinfo is not available
160160
try:
@@ -174,7 +174,7 @@ async def aset_client_info(self) -> None:
174174

175175
try:
176176
# Try to use client_setinfo command if available
177-
await self._redis.client_setinfo("LIB-NAME", client_info) # type: ignore
177+
await self._redis.client_setinfo("LIB-NAME", client_info)
178178
except (ResponseError, AttributeError):
179179
# Fall back to a simple echo if client_setinfo is not available
180180
try:
@@ -468,17 +468,17 @@ def put_writes(
468468
# UPSERT case - only update specific fields
469469
if key_exists:
470470
# Update only channel, type, and blob fields
471-
pipeline.set(key, "$.channel", write_obj["channel"]) # type: ignore[arg-type]
472-
pipeline.set(key, "$.type", write_obj["type"]) # type: ignore[arg-type]
473-
pipeline.set(key, "$.blob", write_obj["blob"]) # type: ignore[arg-type]
471+
pipeline.set(key, "$.channel", write_obj["channel"])
472+
pipeline.set(key, "$.type", write_obj["type"])
473+
pipeline.set(key, "$.blob", write_obj["blob"])
474474
else:
475475
# For new records, set the complete object
476-
pipeline.set(key, "$", write_obj) # type: ignore[arg-type]
476+
pipeline.set(key, "$", write_obj)
477477
created_keys.append(key)
478478
else:
479479
# INSERT case - only insert if doesn't exist
480480
if not key_exists:
481-
pipeline.set(key, "$", write_obj) # type: ignore[arg-type]
481+
pipeline.set(key, "$", write_obj)
482482
created_keys.append(key)
483483

484484
pipeline.execute()

langgraph/checkpoint/redis/shallow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,10 @@ def put_writes(
555555
pipeline.set(key, "$.blob", write_obj["blob"])
556556
else:
557557
# For new records, set the complete object
558-
pipeline.set(key, "$", write_obj) # type: ignore[arg-type]
558+
pipeline.set(key, "$", write_obj)
559559
else:
560560
# INSERT case
561-
pipeline.set(key, "$", write_obj) # type: ignore[arg-type]
561+
pipeline.set(key, "$", write_obj)
562562

563563
pipeline.execute()
564564

0 commit comments

Comments
 (0)