Skip to content

Commit 10ac7b7

Browse files
authored
Async Jobs Support (#39)
* Adding support for async jobs * New cursor test * Handling cursors separately
1 parent 6e89b40 commit 10ac7b7

12 files changed

+606
-30
lines changed

arangoasync/aql.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
AQLQueryTrackingSetError,
2424
AQLQueryValidateError,
2525
)
26-
from arangoasync.executor import ApiExecutor
26+
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor
2727
from arangoasync.request import Method, Request
2828
from arangoasync.response import Response
29+
from arangoasync.result import Result
2930
from arangoasync.serialization import Deserializer, Serializer
3031
from arangoasync.typings import (
3132
Json,
@@ -34,7 +35,6 @@
3435
QueryExplainOptions,
3536
QueryProperties,
3637
QueryTrackingConfiguration,
37-
Result,
3838
)
3939

4040

@@ -326,7 +326,14 @@ async def execute(
326326
def response_handler(resp: Response) -> Cursor:
327327
if not resp.is_success:
328328
raise AQLQueryExecuteError(resp, request)
329-
return Cursor(self._executor, self.deserializer.loads(resp.raw_body))
329+
if self._executor.context == "async":
330+
# We cannot have a cursor getting back async jobs
331+
executor: NonAsyncExecutor = DefaultApiExecutor(
332+
self._executor.connection
333+
)
334+
else:
335+
executor = cast(NonAsyncExecutor, self._executor)
336+
return Cursor(executor, self.deserializer.loads(resp.raw_body))
330337

331338
return await self._executor.execute(request, response_handler)
332339

arangoasync/collection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
from arangoasync.executor import ApiExecutor
2424
from arangoasync.request import Method, Request
2525
from arangoasync.response import Response
26+
from arangoasync.result import Result
2627
from arangoasync.serialization import Deserializer, Serializer
2728
from arangoasync.typings import (
2829
CollectionProperties,
2930
IndexProperties,
3031
Json,
3132
Jsons,
3233
Params,
33-
Result,
3434
)
3535

3636
T = TypeVar("T")

arangoasync/cursor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
CursorNextError,
1313
CursorStateError,
1414
)
15-
from arangoasync.executor import ApiExecutor
15+
from arangoasync.executor import NonAsyncExecutor
1616
from arangoasync.request import Method, Request
1717
from arangoasync.response import Response
1818
from arangoasync.serialization import Deserializer, Serializer
@@ -39,7 +39,7 @@ class Cursor:
3939
is created.
4040
"""
4141

42-
def __init__(self, executor: ApiExecutor, data: Json) -> None:
42+
def __init__(self, executor: NonAsyncExecutor, data: Json) -> None:
4343
self._executor = executor
4444
self._cached: Optional[bool] = None
4545
self._count: Optional[int] = None

arangoasync/database.py

+114-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"Database",
33
"StandardDatabase",
44
"TransactionDatabase",
5+
"AsyncDatabase",
56
]
67

78

@@ -13,6 +14,8 @@
1314
from arangoasync.connection import Connection
1415
from arangoasync.errno import HTTP_FORBIDDEN, HTTP_NOT_FOUND
1516
from arangoasync.exceptions import (
17+
AsyncJobClearError,
18+
AsyncJobListError,
1619
CollectionCreateError,
1720
CollectionDeleteError,
1821
CollectionListError,
@@ -41,9 +44,15 @@
4144
UserReplaceError,
4245
UserUpdateError,
4346
)
44-
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor
47+
from arangoasync.executor import (
48+
ApiExecutor,
49+
AsyncApiExecutor,
50+
DefaultApiExecutor,
51+
TransactionApiExecutor,
52+
)
4553
from arangoasync.request import Method, Request
4654
from arangoasync.response import Response
55+
from arangoasync.result import Result
4756
from arangoasync.serialization import Deserializer, Serializer
4857
from arangoasync.typings import (
4958
CollectionInfo,
@@ -53,7 +62,6 @@
5362
Jsons,
5463
KeyOptions,
5564
Params,
56-
Result,
5765
ServerStatusInformation,
5866
UserInfo,
5967
)
@@ -1314,7 +1322,7 @@ def response_handler(resp: Response) -> str:
13141322
return cast(str, result["id"])
13151323

13161324
transaction_id = await self._executor.execute(request, response_handler)
1317-
return TransactionDatabase(self.connection, transaction_id)
1325+
return TransactionDatabase(self.connection, cast(str, transaction_id))
13181326

13191327
def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
13201328
"""Fetch an existing transaction.
@@ -1328,6 +1336,86 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
13281336
"""
13291337
return TransactionDatabase(self.connection, transaction_id)
13301338

1339+
def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase":
1340+
"""Begin async execution.
1341+
1342+
Args:
1343+
return_result (bool): If set to `True`, API executions return instances of
1344+
`arangoasync.job.AsyncJob`, which you can be used to retrieve
1345+
results from server once available. Otherwise, API executions
1346+
return `None` and no results are stored on server.
1347+
1348+
Returns:
1349+
AsyncDatabase: Database API wrapper tailored for async execution.
1350+
"""
1351+
return AsyncDatabase(self.connection, return_result)
1352+
1353+
async def async_jobs(
1354+
self, status: str, count: Optional[int] = None
1355+
) -> Result[List[str]]:
1356+
"""Return IDs of async jobs with given status.
1357+
1358+
Args:
1359+
status (str): Job status (e.g. "pending", "done").
1360+
count (int | None): Max number of job IDs to return.
1361+
1362+
Returns:
1363+
list: List of job IDs.
1364+
1365+
Raises:
1366+
AsyncJobListError: If retrieval fails.
1367+
1368+
References:
1369+
- `list-async-jobs-by-status-or-get-the-status-of-specific-job <https://docs.arangodb.com/stable/develop/http-api/jobs/#list-async-jobs-by-status-or-get-the-status-of-specific-job>`__
1370+
""" # noqa: E501
1371+
params: Params = {}
1372+
if count is not None:
1373+
params["count"] = count
1374+
1375+
request = Request(
1376+
method=Method.GET, endpoint=f"/_api/job/{status}", params=params
1377+
)
1378+
1379+
def response_handler(resp: Response) -> List[str]:
1380+
if resp.is_success:
1381+
return cast(List[str], self.deserializer.loads(resp.raw_body))
1382+
raise AsyncJobListError(resp, request)
1383+
1384+
return await self._executor.execute(request, response_handler)
1385+
1386+
async def clear_async_jobs(self, threshold: Optional[float] = None) -> None:
1387+
"""Clear async job results from the server.
1388+
1389+
Async jobs that are still queued or running are not stopped.
1390+
Clients can use this method to perform an eventual garbage
1391+
collection of job results.
1392+
1393+
Args:
1394+
threshold (float | None): If specified, only the job results created
1395+
prior to the threshold (a Unix timestamp) are deleted. Otherwise,
1396+
all job results are deleted.
1397+
1398+
Raises:
1399+
AsyncJobClearError: If the operation fails.
1400+
1401+
References:
1402+
- `delete-async-job-results <https://docs.arangodb.com/stable/develop/http-api/jobs/#delete-async-job-results>`__
1403+
""" # noqa: E501
1404+
if threshold is None:
1405+
request = Request(method=Method.DELETE, endpoint="/_api/job/all")
1406+
else:
1407+
request = Request(
1408+
method=Method.DELETE,
1409+
endpoint="/_api/job/expired",
1410+
params={"stamp": threshold},
1411+
)
1412+
1413+
def response_handler(resp: Response) -> None:
1414+
if not resp.is_success:
1415+
raise AsyncJobClearError(resp, request)
1416+
1417+
await self._executor.execute(request, response_handler)
1418+
13311419

13321420
class TransactionDatabase(Database):
13331421
"""Database API tailored specifically for
@@ -1420,3 +1508,26 @@ def response_handler(resp: Response) -> None:
14201508
raise TransactionAbortError(resp, request)
14211509

14221510
await self._standard_executor.execute(request, response_handler)
1511+
1512+
1513+
class AsyncDatabase(Database):
1514+
"""Database API wrapper tailored specifically for async execution.
1515+
1516+
See :func:`arangoasync.database.StandardDatabase.begin_async_execution`.
1517+
1518+
Args:
1519+
connection (Connection): HTTP connection.
1520+
return_result (bool): If set to `True`, API executions return instances of
1521+
:class:`arangoasync.job.AsyncJob`, which you can use to retrieve results
1522+
from server once available. If set to `False`, API executions return `None`
1523+
and no results are stored on server.
1524+
1525+
References:
1526+
- `jobs <https://docs.arangodb.com/stable/develop/http-api/jobs/>`__
1527+
""" # noqa: E501
1528+
1529+
def __init__(self, connection: Connection, return_result: bool) -> None:
1530+
super().__init__(executor=AsyncApiExecutor(connection, return_result))
1531+
1532+
def __repr__(self) -> str:
1533+
return f"<AsyncDatabase {self.name}>"

arangoasync/exceptions.py

+24
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,30 @@ class AQLQueryValidateError(ArangoServerError):
135135
"""Failed to parse and validate query."""
136136

137137

138+
class AsyncExecuteError(ArangoServerError):
139+
"""Failed to execute async API request."""
140+
141+
142+
class AsyncJobCancelError(ArangoServerError):
143+
"""Failed to cancel async job."""
144+
145+
146+
class AsyncJobClearError(ArangoServerError):
147+
"""Failed to clear async job results."""
148+
149+
150+
class AsyncJobListError(ArangoServerError):
151+
"""Failed to retrieve async jobs."""
152+
153+
154+
class AsyncJobResultError(ArangoServerError):
155+
"""Failed to retrieve async job result."""
156+
157+
158+
class AsyncJobStatusError(ArangoServerError):
159+
"""Failed to retrieve async job status."""
160+
161+
138162
class AuthHeaderError(ArangoClientError):
139163
"""The authentication header could not be determined."""
140164

arangoasync/executor.py

+80-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1-
from typing import Callable, TypeVar
1+
__all__ = [
2+
"ApiExecutor",
3+
"DefaultApiExecutor",
4+
"NonAsyncExecutor",
5+
"TransactionApiExecutor",
6+
"AsyncApiExecutor",
7+
]
8+
9+
from typing import Callable, Optional, TypeVar
210

311
from arangoasync.connection import Connection
12+
from arangoasync.exceptions import AsyncExecuteError
13+
from arangoasync.job import AsyncJob
414
from arangoasync.request import Request
515
from arangoasync.response import Response
616
from arangoasync.serialization import Deserializer, Serializer
@@ -9,10 +19,10 @@
919
T = TypeVar("T")
1020

1121

12-
class DefaultApiExecutor:
13-
"""Default API executor.
22+
class ExecutorContext:
23+
"""Base class for API executors.
1424
15-
Responsible for executing requests and handling responses.
25+
Not to be exported publicly.
1626
1727
Args:
1828
connection: HTTP connection.
@@ -25,10 +35,6 @@ def __init__(self, connection: Connection) -> None:
2535
def connection(self) -> Connection:
2636
return self._conn
2737

28-
@property
29-
def context(self) -> str:
30-
return "default"
31-
3238
@property
3339
def db_name(self) -> str:
3440
return self._conn.db_name
@@ -47,6 +53,23 @@ def serialize(self, data: Json) -> str:
4753
def deserialize(self, data: bytes) -> Json:
4854
return self.deserializer.loads(data)
4955

56+
57+
class DefaultApiExecutor(ExecutorContext):
58+
"""Default API executor.
59+
60+
Responsible for executing requests and handling responses.
61+
62+
Args:
63+
connection: HTTP connection.
64+
"""
65+
66+
def __init__(self, connection: Connection) -> None:
67+
super().__init__(connection)
68+
69+
@property
70+
def context(self) -> str:
71+
return "default"
72+
5073
async def execute(
5174
self, request: Request, response_handler: Callable[[Response], T]
5275
) -> T:
@@ -60,7 +83,7 @@ async def execute(
6083
return response_handler(response)
6184

6285

63-
class TransactionApiExecutor(DefaultApiExecutor):
86+
class TransactionApiExecutor(ExecutorContext):
6487
"""Executes transaction API requests.
6588
6689
Args:
@@ -95,4 +118,51 @@ async def execute(
95118
return response_handler(response)
96119

97120

98-
ApiExecutor = DefaultApiExecutor | TransactionApiExecutor
121+
class AsyncApiExecutor(ExecutorContext):
122+
"""Executes asynchronous API requests (jobs).
123+
124+
Args:
125+
connection: HTTP connection.
126+
return_result: If set to `True`, API executions return instances of
127+
:class:`arangoasync.job.AsyncJob` and results can be retrieved from server
128+
once available. If set to `False`, API executions return `None` and no
129+
results are stored on server.
130+
"""
131+
132+
def __init__(self, connection: Connection, return_result: bool) -> None:
133+
super().__init__(connection)
134+
self._return_result = return_result
135+
136+
@property
137+
def context(self) -> str:
138+
return "async"
139+
140+
async def execute(
141+
self, request: Request, response_handler: Callable[[Response], T]
142+
) -> Optional[AsyncJob[T]]:
143+
"""Execute an API request asynchronously.
144+
145+
Args:
146+
request: HTTP request.
147+
response_handler: HTTP response handler.
148+
149+
Returns: `AsyncJob` job or `None` if **return_result** parameter was set to
150+
`False` during initialization.
151+
"""
152+
if self._return_result:
153+
request.headers["x-arango-async"] = "store"
154+
else:
155+
request.headers["x-arango-async"] = "true"
156+
157+
response = await self._conn.send_request(request)
158+
if not response.is_success:
159+
raise AsyncExecuteError(response, request)
160+
if not self._return_result:
161+
return None
162+
163+
job_id = response.headers["x-arango-async-id"]
164+
return AsyncJob(self._conn, job_id, response_handler)
165+
166+
167+
ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor
168+
NonAsyncExecutor = DefaultApiExecutor | TransactionApiExecutor

0 commit comments

Comments
 (0)