Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.

Support threaded transport also for background or server processes #932

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions raven/transport/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
class AsyncWorker(object):
_terminator = object()

def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT, interactive=True):
check_threads()
self._queue = Queue(-1)
self._lock = threading.Lock()
self._thread = None
self._thread_for_pid = None
self.options = {
'shutdown_timeout': shutdown_timeout,
'interactive': interactive,
}
self.start()

Expand Down Expand Up @@ -66,18 +67,23 @@ def main_thread_terminated(self):

if not self._timed_queue_join(initial_timeout):
# if that didn't work, wait a bit longer
# NB that size is an approximation, because other threads may
# add or remove items
size = self._queue.qsize()

print("Sentry is attempting to send %i pending error messages"
% size)
print("Waiting up to %s seconds" % timeout)
if self.options['interactive']:
# we can only ask questions on interactive terminals...
# otherwise, simply wait until the timeout is reached

if os.name == 'nt':
print("Press Ctrl-Break to quit")
else:
print("Press Ctrl-C to quit")
# NB that size is an approximation, because other threads
# may add or remove items
size = self._queue.qsize()
print("Sentry is attempting to send "
"%i pending error messages"
% size)
print("Waiting up to %s seconds" % timeout)

if os.name == 'nt':
print("Press Ctrl-Break to quit")
else:
print("Press Ctrl-C to quit")

self._timed_queue_join(timeout - initial_timeout)

Expand Down Expand Up @@ -180,3 +186,11 @@ def send_sync(self, url, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
self.get_worker().queue(
self.send_sync, url, data, headers, success_cb, failure_cb)


class ThreadedBackgroundHTTPTransport(ThreadedHTTPTransport):

def get_worker(self):
if not hasattr(self, '_worker') or not self._worker.is_alive():
self._worker = AsyncWorker(interactive=False)
return self._worker
132 changes: 80 additions & 52 deletions tests/transport/threaded/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from raven.base import Client
from raven.transport.threaded import ThreadedHTTPTransport
from raven.transport.threaded import ThreadedBackgroundHTTPTransport
from raven.utils.urlparse import urlparse


Expand All @@ -24,6 +25,20 @@ def send_sync(self, url, data, headers, success_cb, failure_cb):
self.events.append((url, data, headers, success_cb, failure_cb))


class DummyBackgroundThreadedScheme(ThreadedBackgroundHTTPTransport):
def __init__(self, *args, **kwargs):
super(ThreadedBackgroundHTTPTransport, self).__init__(*args, **kwargs)
self.events = []
self.send_delay = 0

def send_sync(self, url, data, headers, success_cb, failure_cb):
# delay sending the message, to allow us to test that the shutdown
# hook waits correctly
time.sleep(self.send_delay)

self.events.append((url, data, headers, success_cb, failure_cb))


class LoggingThreadedScheme(ThreadedHTTPTransport):
def __init__(self, filename, *args, **kwargs):
super(LoggingThreadedScheme, self).__init__(*args, **kwargs)
Expand All @@ -34,6 +49,16 @@ def send_sync(self, url, data, headers, success_cb, failure_cb):
log.write("{0} {1}\n".format(os.getpid(), data['message']))


class LoggingBackgroundThreadedScheme(ThreadedBackgroundHTTPTransport):
def __init__(self, filename, *args, **kwargs):
super(LoggingBackgroundThreadedScheme, self).__init__(*args, **kwargs)
self.filename = filename

def send_sync(self, url, data, headers, success_cb, failure_cb):
with open(self.filename, 'a') as log:
log.write("{0} {1}\n".format(os.getpid(), data['message']))


class ThreadedTransportTest(TestCase):
def setUp(self):
self.url = "threaded+http://some_username:some_password@localhost:8143/1"
Expand All @@ -50,81 +75,84 @@ def test_does_send(self, send):
self.assertEqual(send.call_count, 1)

def test_shutdown_waits_for_send(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
for scheme in (DummyThreadedScheme, DummyBackgroundThreadedScheme):
url = urlparse(self.url)
transport = scheme()
transport.send_delay = 0.5

data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(url, data, None, None, None)
data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(url, data, None, None, None)

time.sleep(0.1)
time.sleep(0.1)

# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()

self.assertEqual(len(transport.events), 1)
self.assertEqual(len(transport.events), 1)

def test_fork_spawns_anew(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
for scheme in (DummyThreadedScheme, DummyBackgroundThreadedScheme):
transport = scheme()
transport.send_delay = 0.5

data = self.client.build_msg('raven.events.Message', message='foo')
data = self.client.build_msg('raven.events.Message', message='foo')

pid = os.fork()
if pid == 0:
time.sleep(0.1)
pid = os.fork()
if pid == 0:
time.sleep(0.1)

transport.async_send(url, data, None, None, None)
transport.async_send(url, data, None, None, None)

# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()

self.assertEqual(len(transport.events), 1)
# Use os._exit here so that py.test gets not confused about
# what the hell we're doing here.
os._exit(0)
else:
os.waitpid(pid, 0)
self.assertEqual(len(transport.events), 1)
# Use os._exit here so that py.test gets not confused about
# what the hell we're doing here.
os._exit(0)
else:
os.waitpid(pid, 0)

def test_fork_with_active_worker(self):
# Test threaded transport when forking with an active worker.
# Forking a process doesn't clone the worker thread - make sure
# logging from both processes still works.
event1 = self.client.build_msg('raven.events.Message', message='parent')
event2 = self.client.build_msg('raven.events.Message', message='child')
url = urlparse(self.url)
fd, filename = mkstemp()
try:
os.close(fd)
transport = LoggingThreadedScheme(filename)
for scheme in (LoggingThreadedScheme, LoggingBackgroundThreadedScheme):
url = urlparse(self.url)
fd, filename = mkstemp()
try:
os.close(fd)
transport = scheme(filename)

# Log from the parent process - starts the worker thread
transport.async_send(url, event1, None, None, None)
childpid = os.fork()
# Log from the parent process - starts the worker thread
transport.async_send(url, event1, None, None, None)
childpid = os.fork()

if childpid == 0:
# Log from the child process
transport.async_send(url, event2, None, None, None)
if childpid == 0:
# Log from the child process
transport.async_send(url, event2, None, None, None)

# Ensure threaded worker has finished
transport.get_worker().stop()
os._exit(0)
# Ensure threaded worker has finished
transport.get_worker().stop()
os._exit(0)

# Wait for the child process to finish
os.waitpid(childpid, 0)
assert os.path.isfile(filename)
# Wait for the child process to finish
os.waitpid(childpid, 0)
assert os.path.isfile(filename)

# Ensure threaded worker has finished
transport.get_worker().stop()
# Ensure threaded worker has finished
transport.get_worker().stop()

with open(filename, 'r') as logfile:
events = dict(x.strip().split() for x in logfile.readlines())
with open(filename, 'r') as logfile:
events = dict(x.strip().split() for x in logfile.readlines())

# Check parent and child both logged successfully
assert events == {
str(os.getpid()): 'parent',
str(childpid): 'child',
}
finally:
os.remove(filename)
# Check parent and child both logged successfully
assert events == {
str(os.getpid()): 'parent',
str(childpid): 'child',
}
finally:
os.remove(filename)