Skip to content

Commit 72b0a2b

Browse files
committed
Cursor functionality is complete. More tests to be added.
1 parent 51fca52 commit 72b0a2b

File tree

6 files changed

+806
-13
lines changed

6 files changed

+806
-13
lines changed

arangoasync/cursor.py

+244-5
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,262 @@
11
__all__ = ["Cursor"]
22

33

4+
from collections import deque
5+
from typing import Any, Deque, List, Optional
6+
7+
from arangoasync.errno import HTTP_NOT_FOUND
8+
from arangoasync.exceptions import (
9+
CursorCloseError,
10+
CursorCountError,
11+
CursorEmptyError,
12+
CursorNextError,
13+
CursorStateError,
14+
)
415
from arangoasync.executor import ApiExecutor
5-
from arangoasync.typings import Json
16+
from arangoasync.request import Method, Request
17+
from arangoasync.response import Response
18+
from arangoasync.serialization import Deserializer, Serializer
19+
from arangoasync.typings import (
20+
Json,
21+
Jsons,
22+
QueryExecutionExtra,
23+
QueryExecutionPlan,
24+
QueryExecutionProfile,
25+
QueryExecutionStats,
26+
)
627

728

829
class Cursor:
930
"""Cursor API wrapper.
1031
1132
Cursors fetch query results from ArangoDB server in batches. Cursor objects
1233
are *stateful* as they store the fetched items in-memory. They must not be
13-
shared across threads without proper locking mechanism.
34+
shared across threads without a proper locking mechanism.
1435
1536
Args:
1637
executor: Required to execute the API requests.
17-
data: Cursor initialization data.
38+
data: Cursor initialization data. Returned by the server when the query
39+
is created.
1840
"""
1941

2042
def __init__(self, executor: ApiExecutor, data: Json) -> None:
2143
self._executor = executor
22-
print(data)
23-
# TODO complete this
44+
self._cached: Optional[bool] = None
45+
self._count: Optional[int] = None
46+
self._extra = QueryExecutionExtra({})
47+
self._has_more: Optional[bool] = None
48+
self._id: Optional[str] = None
49+
self._next_batch_id: Optional[str] = None
50+
self._batch: Deque[Any] = deque()
51+
self._update(data)
52+
53+
async def __aiter__(self) -> "Cursor":
54+
return self
55+
56+
async def __anext__(self) -> Any:
57+
return await self.next()
58+
59+
async def __aenter__(self) -> "Cursor":
60+
return self
61+
62+
async def __aexit__(self, *_: Any) -> None:
63+
await self.close(ignore_missing=True)
64+
65+
def __len__(self) -> int:
66+
if self._count is None:
67+
raise CursorCountError("Cursor count not enabled")
68+
return self._count
69+
70+
def __repr__(self) -> str:
71+
return f"<Cursor {self._id}>" if self._id else "<Cursor>"
72+
73+
@property
74+
def cached(self) -> Optional[bool]:
75+
"""Whether the result was served from the query cache or not."""
76+
return self._cached
77+
78+
@property
79+
def count(self) -> Optional[int]:
80+
"""The total number of result documents available."""
81+
return self._count
82+
83+
@property
84+
def extra(self) -> QueryExecutionExtra:
85+
"""Extra information about the query execution."""
86+
return self._extra
87+
88+
@property
89+
def has_more(self) -> Optional[bool]:
90+
"""Whether there are more results available on the server."""
91+
return self._has_more
92+
93+
@property
94+
def id(self) -> Optional[str]:
95+
"""Cursor ID."""
96+
return self._id
97+
98+
@property
99+
def next_batch_id(self) -> Optional[str]:
100+
"""ID of the batch after current one."""
101+
return self._next_batch_id
102+
103+
@property
104+
def batch(self) -> Deque[Any]:
105+
"""Return the current batch of results."""
106+
return self._batch
107+
108+
@property
109+
def serializer(self) -> Serializer[Json]:
110+
"""Return the serializer."""
111+
return self._executor.serializer
112+
113+
@property
114+
def deserializer(self) -> Deserializer[Json, Jsons]:
115+
"""Return the deserializer."""
116+
return self._executor.deserializer
117+
118+
@property
119+
def statistics(self) -> QueryExecutionStats:
120+
"""Query statistics."""
121+
return self.extra.stats
122+
123+
@property
124+
def profile(self) -> QueryExecutionProfile:
125+
"""Query profiling information."""
126+
return self.extra.profile
127+
128+
@property
129+
def plan(self) -> QueryExecutionPlan:
130+
"""Execution plan for the query."""
131+
return self.extra.plan
132+
133+
@property
134+
def warnings(self) -> List[Json]:
135+
"""Warnings generated during query execution."""
136+
return self.extra.warnings
137+
138+
def empty(self) -> bool:
139+
"""Check if the current batch is empty."""
140+
return len(self._batch) == 0
141+
142+
async def next(self) -> Any:
143+
"""Retrieve and pop the next item.
144+
145+
If current batch is empty/depleted, an API request is automatically
146+
sent to fetch the next batch from the server and update the cursor.
147+
148+
Returns:
149+
Any: Next item.
150+
151+
Raises:
152+
StopAsyncIteration: If there are no more items to retrieve.
153+
CursorNextError: If the cursor failed to fetch the next batch.
154+
CursorStateError: If the cursor ID is not set.
155+
"""
156+
if self.empty():
157+
if not self.has_more:
158+
raise StopAsyncIteration
159+
await self.fetch()
160+
return self.pop()
161+
162+
def pop(self) -> Any:
163+
"""Pop the next item from the current batch.
164+
165+
If current batch is empty/depleted, an exception is raised. You must
166+
call :func:`arangoasync.cursor.Cursor.fetch` to manually fetch the next
167+
batch from server.
168+
169+
Returns:
170+
Any: Next item from the current batch.
171+
172+
Raises:
173+
CursorEmptyError: If the current batch is empty.
174+
"""
175+
try:
176+
return self._batch.popleft()
177+
except IndexError:
178+
raise CursorEmptyError("Current batch is empty")
179+
180+
async def fetch(self, batch_id: Optional[str] = None) -> List[Any]:
181+
"""Fetch the next batch from the server and update the cursor.
182+
183+
Args:
184+
batch_id (str | None): ID of the batch to fetch. If not set, the
185+
next batch after the current one is fetched.
186+
187+
Returns:
188+
List[Any]: New batch results.
189+
190+
Raises:
191+
CursorNextError: If the cursor is empty.
192+
CursorStateError: If the cursor ID is not set.
193+
194+
References:
195+
- `read-the-next-batch-from-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor>`__
196+
- `read-a-batch-from-the-cursor-again <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-a-batch-from-the-cursor-again>`__
197+
""" # noqa: E501
198+
if self._id is None:
199+
raise CursorStateError("Cursor ID is not set")
200+
201+
endpoint = f"/_api/cursor/{self._id}"
202+
if batch_id is not None:
203+
endpoint += f"/{batch_id}"
204+
205+
request = Request(
206+
method=Method.POST,
207+
endpoint=endpoint,
208+
)
209+
210+
def response_handler(resp: Response) -> List[Any]:
211+
if not resp.is_success:
212+
raise CursorNextError(resp, request)
213+
return self._update(self.deserializer.loads(resp.raw_body))
214+
215+
return await self._executor.execute(request, response_handler)
216+
217+
async def close(self, ignore_missing: bool = False) -> bool:
218+
"""Close the cursor and free any server resources associated with it.
219+
220+
Args:
221+
ignore_missing (bool): Do not raise an exception on missing cursor.
222+
223+
Returns:
224+
bool: `True` if the cursor was closed successfully. `False` if there
225+
was no cursor to close. If there is no cursor associated with the
226+
query, `False` is returned.
227+
228+
Raises:
229+
CursorCloseError: If the cursor failed to close.
230+
231+
References:
232+
- `delete-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#delete-a-cursor>`__
233+
""" # noqa: E501
234+
if self._id is None:
235+
return False
236+
237+
request = Request(
238+
method=Method.DELETE,
239+
endpoint=f"/_api/cursor/{self._id}",
240+
)
241+
242+
def response_handler(resp: Response) -> bool:
243+
if resp.is_success:
244+
return True
245+
if resp.status_code == HTTP_NOT_FOUND and ignore_missing:
246+
return False
247+
raise CursorCloseError(resp, request)
248+
249+
return await self._executor.execute(request, response_handler)
250+
251+
def _update(self, data: Json) -> List[Any]:
252+
"""Update the cursor with the new data."""
253+
if "id" in data:
254+
self._id = data.get("id")
255+
self._cached = data.get("cached")
256+
self._count = data.get("count")
257+
self._extra = QueryExecutionExtra(data.get("extra", dict()))
258+
self._has_more = data.get("hasMore")
259+
self._next_batch_id = data.get("nextBatchId")
260+
result: List[Any] = data.get("result", list())
261+
self._batch.extend(result)
262+
return result

arangoasync/exceptions.py

+20
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,26 @@ class ClientConnectionError(ArangoClientError):
103103
"""The request was unable to reach the server."""
104104

105105

106+
class CursorCloseError(ArangoServerError):
107+
"""Failed to delete the cursor result from server."""
108+
109+
110+
class CursorCountError(ArangoClientError, TypeError):
111+
"""The cursor count was not enabled."""
112+
113+
114+
class CursorEmptyError(ArangoClientError):
115+
"""The current batch in cursor was empty."""
116+
117+
118+
class CursorNextError(ArangoServerError):
119+
"""Failed to retrieve the next result batch from server."""
120+
121+
122+
class CursorStateError(ArangoClientError):
123+
"""The cursor object was in a bad state."""
124+
125+
106126
class DatabaseCreateError(ArangoServerError):
107127
"""Failed to create database."""
108128

0 commit comments

Comments
 (0)