Skip to content

Commit 0a8a814

Browse files
committed
Update documentation on thread-safety and add API for pregel
1 parent 9ca1045 commit 0a8a814

13 files changed

+269
-25
lines changed

arango/async.py

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ class AsyncExecution(Connection):
2626
instance (which holds the result of the request) is returned each
2727
time an API request is queued, otherwise ``None`` is returned
2828
:type return_result: bool
29+
30+
.. warning::
31+
Asynchronous execution is currently an experimental feature and is not
32+
thread-safe.
2933
"""
3034

3135
def __init__(self, connection, return_result=True):

arango/batch.py

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class BatchExecution(Connection):
2828
so far are committed even if an exception is raised before existing
2929
out of the context (default: ``False``)
3030
:type commit_on_error: bool
31+
32+
.. warning::
33+
Batch execution is currently an experimental feature and is not
34+
thread-safe.
3135
"""
3236

3337
def __init__(self, connection, return_result=True, commit_on_error=False):

arango/client.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -955,13 +955,13 @@ def revoke_user_access(self, username, database):
955955
def async_jobs(self, status, count=None):
956956
"""Return the IDs of asynchronous jobs with the specified status.
957957
958-
:param status: the job status (``"pending"`` or ``"done"``)
958+
:param status: The job status (``"pending"`` or ``"done"``).
959959
:type status: str | unicode
960-
:param count: the maximum number of job IDs to return
960+
:param count: The maximum number of job IDs to return.
961961
:type count: int
962-
:returns: the list of job IDs
962+
:returns: The list of job IDs.
963963
:rtype: [str]
964-
:raises arango.exceptions.AsyncJobListError: if the retrieval fails
964+
:raises arango.exceptions.AsyncJobListError: If the retrieval fails.
965965
966966
.. note::
967967
Only the root user can access this method. For non-root users,
@@ -979,13 +979,13 @@ def async_jobs(self, status, count=None):
979979
def clear_async_jobs(self, threshold=None):
980980
"""Delete asynchronous job results from the server.
981981
982-
:param threshold: if specified, only the job results created prior to
982+
:param threshold: If specified, only the job results created prior to
983983
the threshold (a unix timestamp) are deleted, otherwise *all* job
984-
results are deleted
984+
results are deleted.
985985
:type threshold: int
986-
:returns: whether the deletion of results was successful
986+
:returns: Whether the deletion of results was successful.
987987
:rtype: bool
988-
:raises arango.exceptions.AsyncJobClearError: if the operation fails
988+
:raises arango.exceptions.AsyncJobClearError: If the operation fails.
989989
990990
.. note::
991991
Async jobs currently queued or running are not stopped.

arango/database.py

+76-8
Original file line numberDiff line numberDiff line change
@@ -1150,13 +1150,13 @@ def revoke_user_access(self, username, database=None):
11501150
def async_jobs(self, status, count=None):
11511151
"""Return the IDs of asynchronous jobs with the specified status.
11521152
1153-
:param status: the job status (``"pending"`` or ``"done"``)
1153+
:param status: The job status (``"pending"`` or ``"done"``).
11541154
:type status: str | unicode
1155-
:param count: the maximum number of job IDs to return
1155+
:param count: The maximum number of job IDs to return.
11561156
:type count: int
1157-
:returns: the list of job IDs
1157+
:returns: The list of job IDs.
11581158
:rtype: [str]
1159-
:raises arango.exceptions.AsyncJobListError: if the retrieval fails
1159+
:raises arango.exceptions.AsyncJobListError: If the retrieval fails.
11601160
"""
11611161
res = self._conn.get(
11621162
'/_api/job/{}'.format(status),
@@ -1169,13 +1169,13 @@ def async_jobs(self, status, count=None):
11691169
def clear_async_jobs(self, threshold=None):
11701170
"""Delete asynchronous job results from the server.
11711171
1172-
:param threshold: if specified, only the job results created prior to
1172+
:param threshold: If specified, only the job results created prior to
11731173
the threshold (a unix timestamp) are deleted, otherwise *all* job
1174-
results are deleted
1174+
results are deleted.
11751175
:type threshold: int
1176-
:returns: whether the deletion of results was successful
1176+
:returns: Whether the deletion of results was successful.
11771177
:rtype: bool
1178-
:raises arango.exceptions.AsyncJobClearError: if the operation fails
1178+
:raises arango.exceptions.AsyncJobClearError: If the operation fails.
11791179
11801180
.. note::
11811181
Async jobs currently queued or running are not stopped.
@@ -1190,3 +1190,71 @@ def clear_async_jobs(self, threshold=None):
11901190
if res.status_code in HTTP_OK:
11911191
return True
11921192
raise AsyncJobClearError(res)
1193+
1194+
###############
1195+
# Pregel Jobs #
1196+
###############
1197+
1198+
def create_pregel_job(self, algorithm, graph):
1199+
"""Start/create a Pregel job.
1200+
1201+
:param algorithm: The name of the algorithm (e.g. ``"pagerank"``).
1202+
:type algorithm: str | unicode
1203+
:param graph: The name of the graph.
1204+
:type graph: str | unicode
1205+
:returns: The ID of the Pregel job.
1206+
:rtype: int
1207+
:raises arango.exceptions.PregelJobCreateError: If the operation fails.
1208+
1209+
"""
1210+
res = self._conn.post(
1211+
'/_api/control_pregel',
1212+
data={
1213+
'algorithm': algorithm,
1214+
'graphName': graph,
1215+
}
1216+
)
1217+
if res.status_code in HTTP_OK:
1218+
return res.body
1219+
raise PregelJobCreateError(res)
1220+
1221+
def pregel_job(self, job_id):
1222+
"""Return the details of a Pregel job.
1223+
1224+
:param job_id: The Pregel job ID.
1225+
:type job_id: int
1226+
:returns: The details of the Pregel job.
1227+
:rtype: dict
1228+
:raises arango.exceptions.PregelJobGetError: If the lookup fails.
1229+
"""
1230+
res = self._conn.get(
1231+
'/_api/control_pregel/{}'.format(job_id)
1232+
)
1233+
if res.status_code in HTTP_OK:
1234+
return {
1235+
'aggregators': res.body['aggregators'],
1236+
'edge_count': res.body.get('edgeCount'),
1237+
'gss': res.body['gss'],
1238+
'received_count': res.body['receivedCount'],
1239+
'send_count': res.body['sendCount'],
1240+
'state': res.body['state'],
1241+
'total_runtime': res.body['totalRuntime'],
1242+
'vertex_count': res.body.get('vertexCount')
1243+
}
1244+
raise PregelJobGetError(res)
1245+
1246+
def delete_pregel_job(self, job_id):
1247+
"""Cancel/delete a Pregel job.
1248+
1249+
:param job_id: The Pregel job ID.
1250+
:type job_id: int
1251+
:returns: ``True`` if the Pregel job was successfully cancelled.
1252+
:rtype: bool
1253+
:raises arango.exceptions.PregelJobDeleteError: If the deletion fails.
1254+
"""
1255+
res = self._conn.delete(
1256+
'/_api/control_pregel/{}'.format(job_id)
1257+
)
1258+
if res.status_code in HTTP_OK:
1259+
return True
1260+
raise PregelJobDeleteError(res)

arango/exceptions.py

+15
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,21 @@ class AsyncJobResultError(ArangoError):
448448
class AsyncJobClearError(ArangoError):
449449
"""Failed to delete the asynchronous job result from the server."""
450450

451+
#####################
452+
# Pregel Exceptions #
453+
#####################
454+
455+
class PregelJobCreateError(ArangoError):
456+
"""Failed to start/create a Pregel job."""
457+
458+
459+
class PregelJobGetError(ArangoError):
460+
"""Failed to retrieve a Pregel job."""
461+
462+
463+
class PregelJobDeleteError(ArangoError):
464+
"""Failed to cancel/delete a Pregel job."""
465+
451466

452467
###########################
453468
# Cluster Test Exceptions #

arango/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = '3.10.1'
1+
VERSION = '3.11.0'

docs/async.rst

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ fire-and-forget style. The results of the requests can be retrieved later via
1212
The user should be mindful of the server-side memory while using
1313
asynchronous executions with a large number of requests.
1414

15+
.. warning::
16+
Asynchronous execution is currently an experimental feature and is not
17+
thread-safe.
18+
1519
Here is an example showing how asynchronous executions can be used:
1620

1721
.. code-block:: python

docs/batch.rst

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ retrieved via :ref:`BatchJob` objects.
1212
The user should be mindful of the client-side memory while using batch
1313
executions with a large number of requests.
1414

15+
.. warning::
16+
Batch execution is currently an experimental feature and is not
17+
thread-safe.
18+
1519
Here is an example showing how batch executions can be used:
1620

1721
.. code-block:: python

docs/index.rst

+2-7
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,6 @@ You may need to use ``sudo`` depending on your environment.
4747
.. _PyPi: https://pypi.python.org/pypi/python-arango
4848
.. _GitHub: https://github.com/joowani/python-arango
4949

50-
A Note on Thread Safety and Eventlet
51-
========
52-
53-
This driver should be compatible with eventlet for the most part. By default, python-arango makes API calls using the requests library, which eventlet seems to be able to monkey patch.
54-
55-
Assuming that, all python-arango APIs except Batch Execution and Asynchronous Execution should be thread-safe.
56-
5750

5851
Contents
5952
========
@@ -76,6 +69,8 @@ Contents
7669
user
7770
task
7871
wal
72+
pregel
73+
threading
7974
errors
8075
logging
8176
classes

docs/logging.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ The logging output for above would look something like this:
4848
In order to see the full request information, turn on logging for the requests_
4949
library which **python-arango** uses under the hood:
5050

51-
.. _requests: https://github.com/kennethreitz/requests
51+
.. _requests: https://github.com/requests/requests
5252

5353
.. code-block:: python
5454

docs/pregel.rst

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
.. _pregel-page:
2+
3+
Pregel
4+
------
5+
6+
**Python-arango** provides APIs for distributed iterative graph processing
7+
(Pregel). For more information, please refer to the ArangoDB manual
8+
`here <https://docs.arangodb.com/Manual/Graphs/Pregel/>`__.
9+
10+
Here is an example showing how Pregel jobs can be started, fetched or cancelled:
11+
12+
.. code-block:: python
13+
14+
from arango import ArangoClient
15+
16+
client = ArangoClient()
17+
db = client.db('my_database')
18+
db.create_graph('my_graph')
19+
20+
# Create and start a new Pregel job
21+
job_id = db.create_pregel_job(algorithm='pagerank', graph='my_graph')
22+
23+
# Get the details of a Pregel job by its ID
24+
job = db.pregel_job(job_id)
25+
print(job['aggregators'])
26+
print(job['edge_count'])
27+
print(job['gss'])
28+
print(job['received_count'])
29+
print(job['send_count'])
30+
print(job['state'])
31+
print(job['total_runtime'])
32+
print(job['vertex_count'])
33+
34+
# Delete/cancel a Pregel job by its ID
35+
db.delete_pregel_job(job_id)
36+
37+
Refer to class :class:`arango.database.Database` for more details on the methods
38+
for Pregel jobs.

docs/threading.rst

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
.. _multithreading-page:
2+
3+
Multithreading
4+
--------------
5+
6+
7+
Notes on Eventlet
8+
=================
9+
10+
**Python-arango** should be compatible with eventlet_ *for the most part*.
11+
By default, **python-arango** makes API calls to ArangoDB using the requests_
12+
library which can be monkeypatched:
13+
14+
.. code-block:: python
15+
16+
import eventlet
17+
requests = eventlet.import_patched("requests")
18+
19+
.. _requests: https://github.com/requests/requests
20+
.. _eventlet: http://eventlet.net
21+
22+
Assuming the requests library is used and monkeypatched properly, all
23+
python-arango APIs except :ref:`Batch Execution <batch-page>` and
24+
:ref:`Async Execution <async-page>` should be thread-safe.

tests/test_pregel.py

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from __future__ import absolute_import, unicode_literals
2+
3+
4+
import pytest
5+
6+
from arango import ArangoClient
7+
from arango.exceptions import (
8+
PregelJobCreateError,
9+
PregelJobGetError,
10+
PregelJobDeleteError
11+
)
12+
13+
from .utils import (
14+
generate_db_name,
15+
generate_col_name,
16+
generate_graph_name,
17+
)
18+
19+
arango_client = ArangoClient()
20+
db_name = generate_db_name()
21+
db = arango_client.create_database(db_name)
22+
graph_name = generate_graph_name()
23+
graph = db.create_graph(graph_name)
24+
from_col_name = generate_col_name()
25+
to_col_name = generate_col_name()
26+
edge_col_name = generate_col_name()
27+
graph.create_vertex_collection(from_col_name)
28+
graph.create_vertex_collection(to_col_name)
29+
graph.create_edge_definition(
30+
edge_col_name, [from_col_name], [to_col_name]
31+
)
32+
33+
34+
def teardown_module(*_):
35+
arango_client.delete_database(db_name, ignore_missing=True)
36+
37+
38+
@pytest.mark.order1
39+
def test_start_pregel_job():
40+
# Test start_pregel_job with page rank algorithm (happy path)
41+
job_id = db.create_pregel_job('pagerank', graph_name)
42+
assert isinstance(job_id, int)
43+
44+
# Test start_pregel_job with unsupported algorithm
45+
with pytest.raises(PregelJobCreateError):
46+
db.create_pregel_job('unsupported_algorithm', graph_name)
47+
48+
49+
@pytest.mark.order2
50+
def test_get_pregel_job():
51+
# Create a test Pregel job
52+
job_id = db.create_pregel_job('pagerank', graph_name)
53+
54+
# Test pregel_job with existing job ID (happy path)
55+
job = db.pregel_job(job_id)
56+
assert isinstance(job['aggregators'], dict)
57+
assert isinstance(job['gss'], int)
58+
assert isinstance(job['received_count'], int)
59+
assert isinstance(job['send_count'], int)
60+
assert isinstance(job['total_runtime'], float)
61+
assert job['state'] == 'running'
62+
assert 'edge_count' in job
63+
assert 'vertex_count' in job
64+
65+
# Test pregel_job with an invalid job ID
66+
with pytest.raises(PregelJobGetError):
67+
db.pregel_job(-1)
68+
69+
70+
@pytest.mark.order3
71+
def test_delete_pregel_job():
72+
# Create a test Pregel job
73+
job_id = db.create_pregel_job('pagerank', graph_name)
74+
75+
# Get the newly created job
76+
job = db.pregel_job(job_id)
77+
assert job['state'] == 'running'
78+
79+
# Test delete_pregel_job with existing job ID (happy path)
80+
assert db.delete_pregel_job(job_id) == True
81+
82+
# The fetch for the same job should now fail
83+
with pytest.raises(PregelJobGetError):
84+
db.pregel_job(job_id)
85+
86+
# Test delete_pregel_job with an invalid job ID
87+
with pytest.raises(PregelJobDeleteError):
88+
db.delete_pregel_job(-1)

0 commit comments

Comments
 (0)