Skip to content

Commit e247776

Browse files
authored
Add lock synchronization to Future success/failure (#2549)
1 parent cebfed2 commit e247776

File tree

1 file changed

+24
-13
lines changed

1 file changed

+24
-13
lines changed

kafka/future.py

+24-13
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import functools
44
import logging
5+
import threading
56

67
log = logging.getLogger(__name__)
78

@@ -15,6 +16,7 @@ def __init__(self):
1516
self.exception = None
1617
self._callbacks = []
1718
self._errbacks = []
19+
self._lock = threading.Lock()
1820

1921
def succeeded(self):
2022
return self.is_done and not bool(self.exception)
@@ -30,37 +32,46 @@ def retriable(self):
3032

3133
def success(self, value):
3234
assert not self.is_done, 'Future is already complete'
33-
self.value = value
34-
self.is_done = True
35+
with self._lock:
36+
self.value = value
37+
self.is_done = True
3538
if self._callbacks:
3639
self._call_backs('callback', self._callbacks, self.value)
3740
return self
3841

3942
def failure(self, e):
4043
assert not self.is_done, 'Future is already complete'
41-
self.exception = e if type(e) is not type else e()
42-
assert isinstance(self.exception, BaseException), (
44+
exception = e if type(e) is not type else e()
45+
assert isinstance(exception, BaseException), (
4346
'future failed without an exception')
44-
self.is_done = True
47+
with self._lock:
48+
self.exception = exception
49+
self.is_done = True
4550
self._call_backs('errback', self._errbacks, self.exception)
4651
return self
4752

4853
def add_callback(self, f, *args, **kwargs):
4954
if args or kwargs:
5055
f = functools.partial(f, *args, **kwargs)
51-
if self.is_done and not self.exception:
52-
self._call_backs('callback', [f], self.value)
53-
else:
54-
self._callbacks.append(f)
56+
with self._lock:
57+
if not self.is_done:
58+
self._callbacks.append(f)
59+
elif self.succeeded():
60+
self._lock.release()
61+
self._call_backs('callback', [f], self.value)
62+
self._lock.acquire()
5563
return self
5664

5765
def add_errback(self, f, *args, **kwargs):
5866
if args or kwargs:
5967
f = functools.partial(f, *args, **kwargs)
60-
if self.is_done and self.exception:
61-
self._call_backs('errback', [f], self.exception)
62-
else:
63-
self._errbacks.append(f)
68+
with self._lock:
69+
if not self.is_done:
70+
self._errbacks.append(f)
71+
elif self.failed():
72+
self._lock.release()
73+
self._call_backs('errback', [f], self.exception)
74+
self._lock.acquire()
6475
return self
6576

6677
def add_both(self, f, *args, **kwargs):

0 commit comments

Comments
 (0)