Skip to content

Commit e1bc630

Browse files
committed
Adding support for async jobs
1 parent 6e89b40 commit e1bc630

File tree

10 files changed

+511
-18
lines changed

10 files changed

+511
-18
lines changed

arangoasync/aql.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from arangoasync.executor import ApiExecutor
2727
from arangoasync.request import Method, Request
2828
from arangoasync.response import Response
29+
from arangoasync.result import Result
2930
from arangoasync.serialization import Deserializer, Serializer
3031
from arangoasync.typings import (
3132
Json,
@@ -34,7 +35,6 @@
3435
QueryExplainOptions,
3536
QueryProperties,
3637
QueryTrackingConfiguration,
37-
Result,
3838
)
3939

4040

arangoasync/collection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
from arangoasync.executor import ApiExecutor
2424
from arangoasync.request import Method, Request
2525
from arangoasync.response import Response
26+
from arangoasync.result import Result
2627
from arangoasync.serialization import Deserializer, Serializer
2728
from arangoasync.typings import (
2829
CollectionProperties,
2930
IndexProperties,
3031
Json,
3132
Jsons,
3233
Params,
33-
Result,
3434
)
3535

3636
T = TypeVar("T")

arangoasync/database.py

+113-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from arangoasync.connection import Connection
1414
from arangoasync.errno import HTTP_FORBIDDEN, HTTP_NOT_FOUND
1515
from arangoasync.exceptions import (
16+
AsyncJobClearError,
17+
AsyncJobListError,
1618
CollectionCreateError,
1719
CollectionDeleteError,
1820
CollectionListError,
@@ -41,9 +43,15 @@
4143
UserReplaceError,
4244
UserUpdateError,
4345
)
44-
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor
46+
from arangoasync.executor import (
47+
ApiExecutor,
48+
AsyncApiExecutor,
49+
DefaultApiExecutor,
50+
TransactionApiExecutor,
51+
)
4552
from arangoasync.request import Method, Request
4653
from arangoasync.response import Response
54+
from arangoasync.result import Result
4755
from arangoasync.serialization import Deserializer, Serializer
4856
from arangoasync.typings import (
4957
CollectionInfo,
@@ -53,7 +61,6 @@
5361
Jsons,
5462
KeyOptions,
5563
Params,
56-
Result,
5764
ServerStatusInformation,
5865
UserInfo,
5966
)
@@ -1314,7 +1321,7 @@ def response_handler(resp: Response) -> str:
13141321
return cast(str, result["id"])
13151322

13161323
transaction_id = await self._executor.execute(request, response_handler)
1317-
return TransactionDatabase(self.connection, transaction_id)
1324+
return TransactionDatabase(self.connection, cast(str, transaction_id))
13181325

13191326
def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
13201327
"""Fetch an existing transaction.
@@ -1328,6 +1335,86 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
13281335
"""
13291336
return TransactionDatabase(self.connection, transaction_id)
13301337

1338+
def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase":
1339+
"""Begin async execution.
1340+
1341+
Args:
1342+
return_result (bool): If set to `True`, API executions return instances of
1343+
`arangoasync.job.AsyncJob`, which you can be used to retrieve
1344+
results from server once available. Otherwise, API executions
1345+
return `None` and no results are stored on server.
1346+
1347+
Returns:
1348+
AsyncDatabase: Database API wrapper tailored for async execution.
1349+
"""
1350+
return AsyncDatabase(self.connection, return_result)
1351+
1352+
async def async_jobs(
1353+
self, status: str, count: Optional[int] = None
1354+
) -> Result[List[str]]:
1355+
"""Return IDs of async jobs with given status.
1356+
1357+
Args:
1358+
status (str): Job status (e.g. "pending", "done").
1359+
count (int | None): Max number of job IDs to return.
1360+
1361+
Returns:
1362+
list: List of job IDs.
1363+
1364+
Raises:
1365+
AsyncJobListError: If retrieval fails.
1366+
1367+
References:
1368+
- `list-async-jobs-by-status-or-get-the-status-of-specific-job <https://docs.arangodb.com/stable/develop/http-api/jobs/#list-async-jobs-by-status-or-get-the-status-of-specific-job>`__
1369+
""" # noqa: E501
1370+
params: Params = {}
1371+
if count is not None:
1372+
params["count"] = count
1373+
1374+
request = Request(
1375+
method=Method.GET, endpoint=f"/_api/job/{status}", params=params
1376+
)
1377+
1378+
def response_handler(resp: Response) -> List[str]:
1379+
if resp.is_success:
1380+
return cast(List[str], self.deserializer.loads(resp.raw_body))
1381+
raise AsyncJobListError(resp, request)
1382+
1383+
return await self._executor.execute(request, response_handler)
1384+
1385+
async def clear_async_jobs(self, threshold: Optional[float] = None) -> None:
1386+
"""Clear async job results from the server.
1387+
1388+
Async jobs that are still queued or running are not stopped.
1389+
Clients can use this method to perform an eventual garbage
1390+
collection of job results.
1391+
1392+
Args:
1393+
threshold (float | None): If specified, only the job results created
1394+
prior to the threshold (a Unix timestamp) are deleted. Otherwise,
1395+
all job results are deleted.
1396+
1397+
Raises:
1398+
AsyncJobClearError: If the operation fails.
1399+
1400+
References:
1401+
- `delete-async-job-results <https://docs.arangodb.com/stable/develop/http-api/jobs/#delete-async-job-results>`__
1402+
""" # noqa: E501
1403+
if threshold is None:
1404+
request = Request(method=Method.DELETE, endpoint="/_api/job/all")
1405+
else:
1406+
request = Request(
1407+
method=Method.DELETE,
1408+
endpoint="/_api/job/expired",
1409+
params={"stamp": threshold},
1410+
)
1411+
1412+
def response_handler(resp: Response) -> None:
1413+
if not resp.is_success:
1414+
raise AsyncJobClearError(resp, request)
1415+
1416+
await self._executor.execute(request, response_handler)
1417+
13311418

13321419
class TransactionDatabase(Database):
13331420
"""Database API tailored specifically for
@@ -1420,3 +1507,26 @@ def response_handler(resp: Response) -> None:
14201507
raise TransactionAbortError(resp, request)
14211508

14221509
await self._standard_executor.execute(request, response_handler)
1510+
1511+
1512+
class AsyncDatabase(Database):
1513+
"""Database API wrapper tailored specifically for async execution.
1514+
1515+
See :func:`arangoasync.database.StandardDatabase.begin_async_execution`.
1516+
1517+
Args:
1518+
connection (Connection): HTTP connection.
1519+
return_result (bool): If set to `True`, API executions return instances of
1520+
:class:`arangoasync.job.AsyncJob`, which you can use to retrieve results
1521+
from server once available. If set to `False`, API executions return `None`
1522+
and no results are stored on server.
1523+
1524+
References:
1525+
- `jobs <https://docs.arangodb.com/stable/develop/http-api/jobs/>`__
1526+
""" # noqa: E501
1527+
1528+
def __init__(self, connection: Connection, return_result: bool) -> None:
1529+
super().__init__(executor=AsyncApiExecutor(connection, return_result))
1530+
1531+
def __repr__(self) -> str:
1532+
return f"<AsyncDatabase {self.name}>"

arangoasync/exceptions.py

+24
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,30 @@ class AQLQueryValidateError(ArangoServerError):
135135
"""Failed to parse and validate query."""
136136

137137

138+
class AsyncExecuteError(ArangoServerError):
139+
"""Failed to execute async API request."""
140+
141+
142+
class AsyncJobCancelError(ArangoServerError):
143+
"""Failed to cancel async job."""
144+
145+
146+
class AsyncJobClearError(ArangoServerError):
147+
"""Failed to clear async job results."""
148+
149+
150+
class AsyncJobListError(ArangoServerError):
151+
"""Failed to retrieve async jobs."""
152+
153+
154+
class AsyncJobResultError(ArangoServerError):
155+
"""Failed to retrieve async job result."""
156+
157+
158+
class AsyncJobStatusError(ArangoServerError):
159+
"""Failed to retrieve async job status."""
160+
161+
138162
class AuthHeaderError(ArangoClientError):
139163
"""The authentication header could not be determined."""
140164

arangoasync/executor.py

+50-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
from typing import Callable, TypeVar
1+
from typing import Callable, Optional, TypeVar
22

33
from arangoasync.connection import Connection
4+
from arangoasync.exceptions import AsyncExecuteError
5+
from arangoasync.job import AsyncJob
46
from arangoasync.request import Request
57
from arangoasync.response import Response
68
from arangoasync.serialization import Deserializer, Serializer
@@ -95,4 +97,50 @@ async def execute(
9597
return response_handler(response)
9698

9799

98-
ApiExecutor = DefaultApiExecutor | TransactionApiExecutor
100+
class AsyncApiExecutor(DefaultApiExecutor):
101+
"""Executes asynchronous API requests (jobs).
102+
103+
Args:
104+
connection: HTTP connection.
105+
return_result: If set to `True`, API executions return instances of
106+
:class:`arangoasync.job.AsyncJob` and results can be retrieved from server
107+
once available. If set to `False`, API executions return `None` and no
108+
results are stored on server.
109+
"""
110+
111+
def __init__(self, connection: Connection, return_result: bool) -> None:
112+
super().__init__(connection)
113+
self._return_result = return_result
114+
115+
@property
116+
def context(self) -> str:
117+
return "async"
118+
119+
async def execute(
120+
self, request: Request, response_handler: Callable[[Response], T]
121+
) -> Optional[AsyncJob[T]]:
122+
"""Execute an API request asynchronously.
123+
124+
Args:
125+
request: HTTP request.
126+
response_handler: HTTP response handler.
127+
128+
Returns: `AsyncJob` job or `None` if **return_result** parameter was set to
129+
`False` during initialization.
130+
"""
131+
if self._return_result:
132+
request.headers["x-arango-async"] = "store"
133+
else:
134+
request.headers["x-arango-async"] = "true"
135+
136+
response = await self._conn.send_request(request)
137+
if not response.is_success:
138+
raise AsyncExecuteError(response, request)
139+
if not self._return_result:
140+
return None
141+
142+
job_id = response.headers["x-arango-async-id"]
143+
return AsyncJob(self._conn, job_id, response_handler)
144+
145+
146+
ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor

0 commit comments

Comments
 (0)