Skip to content

Async Jobs Support #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions arangoasync/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
AQLQueryTrackingSetError,
AQLQueryValidateError,
)
from arangoasync.executor import ApiExecutor
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.result import Result
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import (
Json,
Expand All @@ -34,7 +35,6 @@
QueryExplainOptions,
QueryProperties,
QueryTrackingConfiguration,
Result,
)


Expand Down Expand Up @@ -326,7 +326,14 @@ async def execute(
def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
raise AQLQueryExecuteError(resp, request)
return Cursor(self._executor, self.deserializer.loads(resp.raw_body))
if self._executor.context == "async":
# We cannot have a cursor getting back async jobs
executor: NonAsyncExecutor = DefaultApiExecutor(
self._executor.connection
)
else:
executor = cast(NonAsyncExecutor, self._executor)
return Cursor(executor, self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)

Expand Down
2 changes: 1 addition & 1 deletion arangoasync/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from arangoasync.executor import ApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.result import Result
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import (
CollectionProperties,
IndexProperties,
Json,
Jsons,
Params,
Result,
)

T = TypeVar("T")
Expand Down
4 changes: 2 additions & 2 deletions arangoasync/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
CursorNextError,
CursorStateError,
)
from arangoasync.executor import ApiExecutor
from arangoasync.executor import NonAsyncExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
Expand All @@ -39,7 +39,7 @@ class Cursor:
is created.
"""

def __init__(self, executor: ApiExecutor, data: Json) -> None:
def __init__(self, executor: NonAsyncExecutor, data: Json) -> None:
self._executor = executor
self._cached: Optional[bool] = None
self._count: Optional[int] = None
Expand Down
117 changes: 114 additions & 3 deletions arangoasync/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Database",
"StandardDatabase",
"TransactionDatabase",
"AsyncDatabase",
]


Expand All @@ -13,6 +14,8 @@
from arangoasync.connection import Connection
from arangoasync.errno import HTTP_FORBIDDEN, HTTP_NOT_FOUND
from arangoasync.exceptions import (
AsyncJobClearError,
AsyncJobListError,
CollectionCreateError,
CollectionDeleteError,
CollectionListError,
Expand Down Expand Up @@ -41,9 +44,15 @@
UserReplaceError,
UserUpdateError,
)
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor
from arangoasync.executor import (
ApiExecutor,
AsyncApiExecutor,
DefaultApiExecutor,
TransactionApiExecutor,
)
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.result import Result
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import (
CollectionInfo,
Expand All @@ -53,7 +62,6 @@
Jsons,
KeyOptions,
Params,
Result,
ServerStatusInformation,
UserInfo,
)
Expand Down Expand Up @@ -1314,7 +1322,7 @@ def response_handler(resp: Response) -> str:
return cast(str, result["id"])

transaction_id = await self._executor.execute(request, response_handler)
return TransactionDatabase(self.connection, transaction_id)
return TransactionDatabase(self.connection, cast(str, transaction_id))

def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""Fetch an existing transaction.
Expand All @@ -1328,6 +1336,86 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""
return TransactionDatabase(self.connection, transaction_id)

def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase":
"""Begin async execution.

Args:
return_result (bool): If set to `True`, API executions return instances of
`arangoasync.job.AsyncJob`, which you can be used to retrieve
results from server once available. Otherwise, API executions
return `None` and no results are stored on server.

Returns:
AsyncDatabase: Database API wrapper tailored for async execution.
"""
return AsyncDatabase(self.connection, return_result)

async def async_jobs(
self, status: str, count: Optional[int] = None
) -> Result[List[str]]:
"""Return IDs of async jobs with given status.

Args:
status (str): Job status (e.g. "pending", "done").
count (int | None): Max number of job IDs to return.

Returns:
list: List of job IDs.

Raises:
AsyncJobListError: If retrieval fails.

References:
- `list-async-jobs-by-status-or-get-the-status-of-specific-job <https://docs.arangodb.com/stable/develop/http-api/jobs/#list-async-jobs-by-status-or-get-the-status-of-specific-job>`__
""" # noqa: E501
params: Params = {}
if count is not None:
params["count"] = count

request = Request(
method=Method.GET, endpoint=f"/_api/job/{status}", params=params
)

def response_handler(resp: Response) -> List[str]:
if resp.is_success:
return cast(List[str], self.deserializer.loads(resp.raw_body))
raise AsyncJobListError(resp, request)

return await self._executor.execute(request, response_handler)

async def clear_async_jobs(self, threshold: Optional[float] = None) -> None:
"""Clear async job results from the server.

Async jobs that are still queued or running are not stopped.
Clients can use this method to perform an eventual garbage
collection of job results.

Args:
threshold (float | None): If specified, only the job results created
prior to the threshold (a Unix timestamp) are deleted. Otherwise,
all job results are deleted.

Raises:
AsyncJobClearError: If the operation fails.

References:
- `delete-async-job-results <https://docs.arangodb.com/stable/develop/http-api/jobs/#delete-async-job-results>`__
""" # noqa: E501
if threshold is None:
request = Request(method=Method.DELETE, endpoint="/_api/job/all")
else:
request = Request(
method=Method.DELETE,
endpoint="/_api/job/expired",
params={"stamp": threshold},
)

def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AsyncJobClearError(resp, request)

await self._executor.execute(request, response_handler)


class TransactionDatabase(Database):
"""Database API tailored specifically for
Expand Down Expand Up @@ -1420,3 +1508,26 @@ def response_handler(resp: Response) -> None:
raise TransactionAbortError(resp, request)

await self._standard_executor.execute(request, response_handler)


class AsyncDatabase(Database):
"""Database API wrapper tailored specifically for async execution.

See :func:`arangoasync.database.StandardDatabase.begin_async_execution`.

Args:
connection (Connection): HTTP connection.
return_result (bool): If set to `True`, API executions return instances of
:class:`arangoasync.job.AsyncJob`, which you can use to retrieve results
from server once available. If set to `False`, API executions return `None`
and no results are stored on server.

References:
- `jobs <https://docs.arangodb.com/stable/develop/http-api/jobs/>`__
""" # noqa: E501

def __init__(self, connection: Connection, return_result: bool) -> None:
super().__init__(executor=AsyncApiExecutor(connection, return_result))

def __repr__(self) -> str:
return f"<AsyncDatabase {self.name}>"
24 changes: 24 additions & 0 deletions arangoasync/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,30 @@ class AQLQueryValidateError(ArangoServerError):
"""Failed to parse and validate query."""


class AsyncExecuteError(ArangoServerError):
"""Failed to execute async API request."""


class AsyncJobCancelError(ArangoServerError):
"""Failed to cancel async job."""


class AsyncJobClearError(ArangoServerError):
"""Failed to clear async job results."""


class AsyncJobListError(ArangoServerError):
"""Failed to retrieve async jobs."""


class AsyncJobResultError(ArangoServerError):
"""Failed to retrieve async job result."""


class AsyncJobStatusError(ArangoServerError):
"""Failed to retrieve async job status."""


class AuthHeaderError(ArangoClientError):
"""The authentication header could not be determined."""

Expand Down
90 changes: 80 additions & 10 deletions arangoasync/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from typing import Callable, TypeVar
__all__ = [
"ApiExecutor",
"DefaultApiExecutor",
"NonAsyncExecutor",
"TransactionApiExecutor",
"AsyncApiExecutor",
]

from typing import Callable, Optional, TypeVar

from arangoasync.connection import Connection
from arangoasync.exceptions import AsyncExecuteError
from arangoasync.job import AsyncJob
from arangoasync.request import Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
Expand All @@ -9,10 +19,10 @@
T = TypeVar("T")


class DefaultApiExecutor:
"""Default API executor.
class ExecutorContext:
"""Base class for API executors.

Responsible for executing requests and handling responses.
Not to be exported publicly.

Args:
connection: HTTP connection.
Expand All @@ -25,10 +35,6 @@ def __init__(self, connection: Connection) -> None:
def connection(self) -> Connection:
return self._conn

@property
def context(self) -> str:
return "default"

@property
def db_name(self) -> str:
return self._conn.db_name
Expand All @@ -47,6 +53,23 @@ def serialize(self, data: Json) -> str:
def deserialize(self, data: bytes) -> Json:
return self.deserializer.loads(data)


class DefaultApiExecutor(ExecutorContext):
"""Default API executor.

Responsible for executing requests and handling responses.

Args:
connection: HTTP connection.
"""

def __init__(self, connection: Connection) -> None:
super().__init__(connection)

@property
def context(self) -> str:
return "default"

async def execute(
self, request: Request, response_handler: Callable[[Response], T]
) -> T:
Expand All @@ -60,7 +83,7 @@ async def execute(
return response_handler(response)


class TransactionApiExecutor(DefaultApiExecutor):
class TransactionApiExecutor(ExecutorContext):
"""Executes transaction API requests.

Args:
Expand Down Expand Up @@ -95,4 +118,51 @@ async def execute(
return response_handler(response)


ApiExecutor = DefaultApiExecutor | TransactionApiExecutor
class AsyncApiExecutor(ExecutorContext):
"""Executes asynchronous API requests (jobs).

Args:
connection: HTTP connection.
return_result: If set to `True`, API executions return instances of
:class:`arangoasync.job.AsyncJob` and results can be retrieved from server
once available. If set to `False`, API executions return `None` and no
results are stored on server.
"""

def __init__(self, connection: Connection, return_result: bool) -> None:
super().__init__(connection)
self._return_result = return_result

@property
def context(self) -> str:
return "async"

async def execute(
self, request: Request, response_handler: Callable[[Response], T]
) -> Optional[AsyncJob[T]]:
"""Execute an API request asynchronously.

Args:
request: HTTP request.
response_handler: HTTP response handler.

Returns: `AsyncJob` job or `None` if **return_result** parameter was set to
`False` during initialization.
"""
if self._return_result:
request.headers["x-arango-async"] = "store"
else:
request.headers["x-arango-async"] = "true"

response = await self._conn.send_request(request)
if not response.is_success:
raise AsyncExecuteError(response, request)
if not self._return_result:
return None

job_id = response.headers["x-arango-async-id"]
return AsyncJob(self._conn, job_id, response_handler)


ApiExecutor = DefaultApiExecutor | TransactionApiExecutor | AsyncApiExecutor
NonAsyncExecutor = DefaultApiExecutor | TransactionApiExecutor
Loading
Loading