Skip to content

Commit f82a0de

Browse files
committed
Handling cursors separately
1 parent 6a8ce08 commit f82a0de

File tree

6 files changed

+80
-16
lines changed

6 files changed

+80
-16
lines changed

arangoasync/aql.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
AQLQueryTrackingSetError,
2424
AQLQueryValidateError,
2525
)
26-
from arangoasync.executor import ApiExecutor
26+
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor
2727
from arangoasync.request import Method, Request
2828
from arangoasync.response import Response
2929
from arangoasync.result import Result
@@ -326,7 +326,14 @@ async def execute(
326326
def response_handler(resp: Response) -> Cursor:
327327
if not resp.is_success:
328328
raise AQLQueryExecuteError(resp, request)
329-
return Cursor(self._executor, self.deserializer.loads(resp.raw_body))
329+
if self._executor.context == "async":
330+
# We cannot have a cursor getting back async jobs
331+
executor: NonAsyncExecutor = DefaultApiExecutor(
332+
self._executor.connection
333+
)
334+
else:
335+
executor = cast(NonAsyncExecutor, self._executor)
336+
return Cursor(executor, self.deserializer.loads(resp.raw_body))
330337

331338
return await self._executor.execute(request, response_handler)
332339

arangoasync/cursor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
CursorNextError,
1313
CursorStateError,
1414
)
15-
from arangoasync.executor import ApiExecutor
15+
from arangoasync.executor import NonAsyncExecutor
1616
from arangoasync.request import Method, Request
1717
from arangoasync.response import Response
1818
from arangoasync.serialization import Deserializer, Serializer
@@ -39,7 +39,7 @@ class Cursor:
3939
is created.
4040
"""
4141

42-
def __init__(self, executor: ApiExecutor, data: Json) -> None:
42+
def __init__(self, executor: NonAsyncExecutor, data: Json) -> None:
4343
self._executor = executor
4444
self._cached: Optional[bool] = None
4545
self._count: Optional[int] = None

arangoasync/database.py

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"Database",
33
"StandardDatabase",
44
"TransactionDatabase",
5+
"AsyncDatabase",
56
]
67

78

arangoasync/executor.py

+31-9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
__all__ = [
2+
"ApiExecutor",
3+
"DefaultApiExecutor",
4+
"NonAsyncExecutor",
5+
"TransactionApiExecutor",
6+
"AsyncApiExecutor",
7+
]
8+
19
from typing import Callable, Optional, TypeVar
210

311
from arangoasync.connection import Connection
@@ -11,10 +19,10 @@
1119
T = TypeVar("T")
1220

1321

14-
class DefaultApiExecutor:
15-
"""Default API executor.
22+
class ExecutorContext:
23+
"""Base class for API executors.
1624
17-
Responsible for executing requests and handling responses.
25+
Not to be exported publicly.
1826
1927
Args:
2028
connection: HTTP connection.
@@ -27,10 +35,6 @@ def __init__(self, connection: Connection) -> None:
2735
def connection(self) -> Connection:
2836
return self._conn
2937

30-
@property
31-
def context(self) -> str:
32-
return "default"
33-
3438
@property
3539
def db_name(self) -> str:
3640
return self._conn.db_name
@@ -49,6 +53,23 @@ def serialize(self, data: Json) -> str:
4953
def deserialize(self, data: bytes) -> Json:
5054
return self.deserializer.loads(data)
5155

56+
57+
class DefaultApiExecutor(ExecutorContext):
58+
"""Default API executor.
59+
60+
Responsible for executing requests and handling responses.
61+
62+
Args:
63+
connection: HTTP connection.
64+
"""
65+
66+
def __init__(self, connection: Connection) -> None:
67+
super().__init__(connection)
68+
69+
@property
70+
def context(self) -> str:
71+
return "default"
72+
5273
async def execute(
5374
self, request: Request, response_handler: Callable[[Response], T]
5475
) -> T:
@@ -62,7 +83,7 @@ async def execute(
6283
return response_handler(response)
6384

6485

65-
class TransactionApiExecutor(DefaultApiExecutor):
86+
class TransactionApiExecutor(ExecutorContext):
6687
"""Executes transaction API requests.
6788
6889
Args:
@@ -97,7 +118,7 @@ async def execute(
97118
return response_handler(response)
98119

99120

100-
class AsyncApiExecutor(DefaultApiExecutor):
121+
class AsyncApiExecutor(ExecutorContext):
101122
"""Executes asynchronous API requests (jobs).
102123
103124
Args:
@@ -144,3 +165,4 @@ async def execute(
144165

145166

146167
ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor
168+
NonAsyncExecutor = DefaultApiExecutor | TransactionApiExecutor

arangoasync/job.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ async def result(self) -> T:
9797
Raises:
9898
ArangoError: If the job raised an exception or there was a problem with
9999
the request.
100-
AsyncJobResultError: If retrieval fails.
100+
AsyncJobResultError: If retrieval fails, because job no longer exists or
101+
is still pending.
101102
102103
References:
103104
- `get-the-results-of-an-async-job <https://docs.arangodb.com/stable/develop/http-api/jobs/#get-the-results-of-an-async-job>`__
@@ -112,10 +113,11 @@ async def result(self) -> T:
112113
# The job result is available on the server
113114
return self._response_handler(response)
114115

115-
# The job is not known (anymore).
116-
# We can tell the status from the HTTP status code.
117116
if response.status_code == 204:
117+
# The job is still in the pending queue or not yet finished.
118118
raise AsyncJobResultError(response, request, self._not_done())
119+
# The job is not known (anymore).
120+
# We can tell the status from the HTTP status code.
119121
if response.error_code == HTTP_NOT_FOUND:
120122
raise AsyncJobResultError(response, request, self._not_found())
121123
raise AsyncJobResultError(response, request)

tests/test_async.py

+32
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
AQLQueryExecuteError,
88
AsyncJobCancelError,
99
AsyncJobListError,
10+
AsyncJobResultError,
1011
)
1112

1213

@@ -101,3 +102,34 @@ async def test_async_result(db, bad_db, doc_col, docs):
101102
job4 = await aql.execute("RETURN 1")
102103
await job4.wait()
103104
await job4.clear()
105+
106+
# Attempt to get the result of a pending job
107+
job5 = await aql.execute("RETURN SLEEP(5)")
108+
time.sleep(1)
109+
with pytest.raises(AsyncJobResultError):
110+
_ = await job5.result()
111+
await job5.wait()
112+
113+
114+
@pytest.mark.asyncio
115+
async def test_async_cursor(db, doc_col, docs):
116+
# Insert some documents first
117+
await asyncio.gather(*(doc_col.insert(doc) for doc in docs))
118+
119+
async_db = db.begin_async_execution()
120+
aql = async_db.aql
121+
job = await aql.execute(
122+
f"FOR d IN {doc_col.name} SORT d._key RETURN d",
123+
count=True,
124+
batch_size=1,
125+
ttl=1000,
126+
)
127+
await job.wait()
128+
129+
# Get the cursor. Bear in mind that its underlying executor is async.
130+
doc_cnt = 0
131+
cursor = await job.result()
132+
async with cursor as ctx:
133+
async for _ in ctx:
134+
doc_cnt += 1
135+
assert doc_cnt == len(docs)

0 commit comments

Comments
 (0)