Skip to content

JS Transactions #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 123 additions & 26 deletions arangoasync/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
]


from typing import List, Optional, Sequence, TypeVar, cast
from typing import Any, List, Optional, Sequence, TypeVar, cast
from warnings import warn

from arangoasync.collection import StandardCollection
from arangoasync.connection import Connection
Expand All @@ -27,6 +28,7 @@
ServerStatusError,
TransactionAbortError,
TransactionCommitError,
TransactionExecuteError,
TransactionInitError,
TransactionListError,
TransactionStatusError,
Expand Down Expand Up @@ -1079,6 +1081,105 @@ def response_handler(resp: Response) -> Json:

return await self._executor.execute(request, response_handler)

async def list_transactions(self) -> Result[Jsons]:
"""List all currently running stream transactions.

Returns:
list: List of transactions, with each transaction containing
an "id" and a "state" field.

Raises:
TransactionListError: If the operation fails on the server side.
"""
request = Request(method=Method.GET, endpoint="/_api/transaction")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TransactionListError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return cast(Jsons, result["transactions"])

return await self._executor.execute(request, response_handler)

async def execute_transaction(
self,
command: str,
params: Optional[Json] = None,
read: Optional[str | Sequence[str]] = None,
write: Optional[str | Sequence[str]] = None,
exclusive: Optional[str | Sequence[str]] = None,
allow_implicit: Optional[bool] = None,
wait_for_sync: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_transaction_size: Optional[int] = None,
) -> Result[Any]:
"""Execute a JavaScript Transaction.

Warning:
JavaScript Transactions are deprecated from ArangoDB v3.12.0 onward and
will be removed in a future version.

Args:
command (str): The actual transaction operations to be executed, in the
form of stringified JavaScript code.
params (dict): Optional parameters passed into the JavaScript command.
read (str | list | None): Name(s) of collections read during transaction.
write (str | list | None): Name(s) of collections written to during
transaction with shared access.
exclusive (str | list | None): Name(s) of collections written to during
transaction with exclusive access.
allow_implicit (bool | None): Allow reading from undeclared collections.
wait_for_sync (bool | None): If `True`, will force the transaction to write
all data to disk before returning.
lock_timeout (int | None): Timeout for waiting on collection locks. Setting
it to 0 will prevent ArangoDB from timing out while waiting for a lock.
max_transaction_size (int | None): Transaction size limit in bytes.

Returns:
Any: Result of the transaction.

Raises:
TransactionExecuteError: If the operation fails on the server side.

References:
- `execute-a-javascript-transaction <https://docs.arangodb.com/stable/develop/http-api/transactions/javascript-transactions/#execute-a-javascript-transaction>`__
""" # noqa: 501
m = "JavaScript Transactions are deprecated from ArangoDB v3.12.0 onward and will be removed in a future version." # noqa: E501
warn(m, DeprecationWarning, stacklevel=2)

collections = dict()
if read is not None:
collections["read"] = read
if write is not None:
collections["write"] = write
if exclusive is not None:
collections["exclusive"] = exclusive

data: Json = dict(collections=collections, action=command)
if params is not None:
data["params"] = params
if wait_for_sync is not None:
data["waitForSync"] = wait_for_sync
if allow_implicit is not None:
data["allowImplicit"] = allow_implicit
if lock_timeout is not None:
data["lockTimeout"] = lock_timeout
if max_transaction_size is not None:
data["maxTransactionSize"] = max_transaction_size

request = Request(
method=Method.POST,
endpoint="/_api/transaction",
data=self.serializer.dumps(data),
)

def response_handler(resp: Response) -> Any:
if not resp.is_success:
raise TransactionExecuteError(resp, request)
return self.deserializer.loads(resp.raw_body)["result"]

return await self._executor.execute(request, response_handler)


class StandardDatabase(Database):
"""Standard database API wrapper.
Expand Down Expand Up @@ -1119,7 +1220,7 @@ async def begin_transaction(
all data to disk before returning
allow_implicit (bool | None): Allow reading from undeclared collections.
lock_timeout (int | None): Timeout for waiting on collection locks. Setting
it to 0 will make ArangoDB not time out waiting for a lock.
it to 0 will prevent ArangoDB from timing out while waiting for a lock.
max_transaction_size (int | None): Transaction size limit in bytes.
allow_dirty_read (bool | None): If `True`, allows the Coordinator to ask any
shard replica for the data, not only the shard leader. This may result
Expand All @@ -1135,7 +1236,10 @@ async def begin_transaction(

Raises:
TransactionInitError: If the operation fails on the server side.
"""

References:
- `begin-a-stream-transaction <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/#begin-a-stream-transaction>`__
""" # noqa: E501
collections = dict()
if read is not None:
collections["read"] = read
Expand Down Expand Up @@ -1188,26 +1292,6 @@ def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""
return TransactionDatabase(self.connection, transaction_id)

async def list_transactions(self) -> Result[Jsons]:
"""List all currently running stream transactions.

Returns:
list: List of transactions, with each transaction containing
an "id" and a "state" field.

Raises:
TransactionListError: If the operation fails on the server side.
"""
request = Request(method=Method.GET, endpoint="/_api/transaction")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TransactionListError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return cast(Jsons, result["transactions"])

return await self._executor.execute(request, response_handler)


class TransactionDatabase(Database):
"""Database API tailored specifically for
Expand Down Expand Up @@ -1244,7 +1328,10 @@ async def transaction_status(self) -> str:

Raises:
TransactionStatusError: If the transaction is not found.
"""

References:
- `get-the-status-of-a-stream-transaction <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/#get-the-status-of-a-stream-transaction>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint=f"/_api/transaction/{self.transaction_id}",
Expand All @@ -1263,7 +1350,10 @@ async def commit_transaction(self) -> None:

Raises:
TransactionCommitError: If the operation fails on the server side.
"""

References:
- `commit-a-stream-transaction <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/#commit-a-stream-transaction>`__
""" # noqa: E501
request = Request(
method=Method.PUT,
endpoint=f"/_api/transaction/{self.transaction_id}",
Expand All @@ -1276,7 +1366,14 @@ def response_handler(resp: Response) -> None:
await self._executor.execute(request, response_handler)

async def abort_transaction(self) -> None:
"""Abort the transaction."""
"""Abort the transaction.

Raises:
TransactionAbortError: If the operation fails on the server side.

References:
- `abort-a-stream-transaction <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/#abort-a-stream-transaction>`__
""" # noqa: E501
request = Request(
method=Method.DELETE,
endpoint=f"/_api/transaction/{self.transaction_id}",
Expand Down
4 changes: 4 additions & 0 deletions arangoasync/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ class TransactionCommitError(ArangoServerError):
"""Failed to commit transaction."""


class TransactionExecuteError(ArangoServerError):
"""Failed to execute JavaScript transaction."""


class TransactionInitError(ArangoServerError):
"""Failed to initialize transaction."""

Expand Down
34 changes: 34 additions & 0 deletions tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,45 @@
from arangoasync.exceptions import (
TransactionAbortError,
TransactionCommitError,
TransactionExecuteError,
TransactionInitError,
TransactionStatusError,
)


@pytest.mark.asyncio
async def test_transaction_execute_raw(db, doc_col, docs):
# Test a valid JS transaction
doc = docs[0]
key = doc["_key"]
command = f"""
function (params) {{
var db = require('internal').db;
db.{doc_col.name}.save({{'_key': params.key, 'val': 1}});
return true;
}}
""" # noqa: E702 E231 E272 E202
result = await db.execute_transaction(
command=command,
params={"key": key},
write=[doc_col.name],
read=[doc_col.name],
exclusive=[doc_col.name],
wait_for_sync=False,
lock_timeout=1000,
max_transaction_size=100000,
allow_implicit=True,
)
assert result is True
doc = await doc_col.get(key)
assert doc is not None and doc["val"] == 1

# Test an invalid transaction
with pytest.raises(TransactionExecuteError) as err:
await db.execute_transaction(command="INVALID COMMAND")
assert err.value.error_code == BAD_PARAMETER


@pytest.mark.asyncio
async def test_transaction_document_insert(db, bad_db, doc_col, docs):
# Start a basic transaction
Expand Down
Loading