Skip to content

Commit f6ac46c

Browse files
committed
Rewrite of integration tests using pytest instead of unittest.TestCase.
In preparation for the modification of the integration tests to cover GSSAPI authentication I had to make a few changes to the existing test cases, which were written using unittest.TestCase. After some discussions with @dpkp and @jeffwidman I decided to first move those test cases to pytest, which is the preferred test framework for the project.
1 parent 2e0312e commit f6ac46c

18 files changed

+1916
-1869
lines changed

test/conftest.py

Lines changed: 146 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,166 @@
11
from __future__ import absolute_import
22

3-
import os
4-
3+
import inspect
54
import pytest
5+
import time
66

7-
from test.fixtures import KafkaFixture, ZookeeperFixture
7+
from decorator import decorator, decorate
88

9+
from kafka import MultiProcessConsumer, SimpleConsumer
10+
from test.fixtures import KafkaFixture, ZookeeperFixture, get_simple_consumer, get_simple_producer, get_keyed_producer, get_base_producer, get_simple_client
11+
from test.testutil import kafka_version, version_str_to_list, random_string, wait_for_kafka_client_topic_update, wait_for_simple_client_topic_update
912

1013
@pytest.fixture(scope="module")
1114
def version():
12-
if 'KAFKA_VERSION' not in os.environ:
13-
return ()
14-
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
15-
15+
return kafka_version()
1616

1717
@pytest.fixture(scope="module")
18-
def zookeeper(version, request):
19-
assert version
18+
def zookeeper():
2019
zk = ZookeeperFixture.instance()
2120
yield zk
2221
zk.close()
2322

23+
_params = {}
24+
def _set_params(func, *args, **kw):
25+
ret = func(*args, **kw)
26+
return ret
27+
28+
def set_params(**params):
29+
global _params
30+
31+
def real_set_params(func, *args, **kwargs):
32+
_params[func.__name__] = params
33+
ret = decorate(func, _set_params)
34+
return ret
35+
36+
return real_set_params
37+
38+
# this must be called from the module where the broker is used
39+
def set_broker_params(**params):
40+
global _params
41+
caller = inspect.stack()[1]
42+
if isinstance(caller, tuple):
43+
filename = caller[1]
44+
else:
45+
filename = inspect.stack()[1].filename
46+
_params[filename] = params
47+
48+
@pytest.fixture(scope='module')
49+
def broker_params(request):
50+
global _params
51+
return _params[request.node.fspath] if request.node.fspath in _params else None
52+
53+
@pytest.fixture
54+
def simple_consumer_params(request):
55+
return _get_params(request)
56+
57+
@pytest.fixture
58+
def base_producer_params(request):
59+
return _get_params(request)
60+
61+
@pytest.fixture
62+
def simple_producer_params(request):
63+
return _get_params(request)
64+
65+
@pytest.fixture
66+
def keyed_producer_params(request):
67+
return _get_params(request)
68+
69+
@pytest.fixture
70+
def kafka_consumer_params(request):
71+
return _get_params(request)
72+
73+
@pytest.fixture
74+
def kafka_producer_params(request):
75+
return _get_params(request)
76+
77+
def _get_params(request):
78+
global _params
79+
if request._pyfuncitem.name in _params and request.fixturename.replace('_params','') in _params[request._pyfuncitem.name]:
80+
return _params[request._pyfuncitem.name][request.fixturename.replace('_params','')]
81+
else:
82+
return None
2483

2584
@pytest.fixture(scope="module")
26-
def kafka_broker(version, zookeeper, request):
27-
assert version
28-
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
29-
partitions=4)
30-
yield k
31-
k.close()
85+
def kafka_broker(kafka_brokers):
86+
return kafka_brokers[0]
3287

88+
@pytest.fixture(scope="module")
89+
def kafka_brokers(version, zookeeper, broker_params):
90+
assert version, 'KAFKA_VERSION must be specified to run integration tests'
91+
params = {} if broker_params is None else broker_params.copy()
92+
params.setdefault('partitions', 2)
93+
num_brokers = params.pop('num_brokers', 1)
94+
brokers = tuple(KafkaFixture.instance(x, zookeeper.host, zookeeper.port, **params) for x in range(num_brokers))
95+
yield brokers
96+
for broker in brokers:
97+
broker.close()
98+
99+
@pytest.fixture
100+
def simple_client(kafka_broker, request, topic):
101+
s = time.time()
102+
client = get_simple_client(kafka_broker, client_id='%s_client' % (request.node.name,))
103+
client.ensure_topic_exists(topic)
104+
yield client
105+
client.close()
106+
107+
@pytest.fixture
108+
def simple_consumer(version, simple_client, topic, simple_consumer_params):
109+
params = {} if simple_consumer_params is None else simple_consumer_params.copy()
110+
consumer = get_simple_consumer(simple_client, topic, **params)
111+
yield consumer
112+
consumer.stop()
113+
114+
@pytest.fixture
115+
def base_producer(version, simple_client, topic, base_producer_params):
116+
params = {} if base_producer_params is None else base_producer_params.copy()
117+
producer = get_base_producer(simple_client, **params)
118+
yield producer
119+
producer.stop()
120+
121+
@pytest.fixture
122+
def simple_producer(version, simple_client, topic, simple_producer_params):
123+
params = {} if simple_producer_params is None else simple_producer_params.copy()
124+
producer = get_simple_producer(simple_client, **params)
125+
yield producer
126+
producer.stop()
127+
128+
@pytest.fixture
129+
def keyed_producer(version, simple_client, topic, keyed_producer_params):
130+
params = {} if keyed_producer_params is None else keyed_producer_params.copy()
131+
producer = get_keyed_producer(simple_client, **params)
132+
yield producer
133+
producer.stop()
134+
135+
@pytest.fixture
136+
def kafka_client(kafka_broker, request):
137+
s = time.time()
138+
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
139+
yield client
140+
client.close()
141+
142+
@pytest.fixture
143+
def kafka_consumer(kafka_broker, topic, request, kafka_consumer_params):
144+
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
145+
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
146+
(consumer,) = kafka_broker.get_consumers(cnt=1, topics=[topic], **params)
147+
yield consumer
148+
consumer.close()
149+
150+
@pytest.fixture
151+
def kafka_producer(kafka_broker, request, kafka_producer_params):
152+
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
153+
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
154+
(producer,) = kafka_broker.get_producers(cnt=1, **params)
155+
yield producer
156+
producer.close()
157+
158+
@pytest.fixture
159+
def topic(version, kafka_broker, request):
160+
s = time.time()
161+
topic_name = '%s_%s' % (request.node.name, random_string(10))
162+
kafka_broker.create_topics([topic_name])
163+
return topic_name
33164

34165
@pytest.fixture
35166
def conn(mocker):

0 commit comments

Comments
 (0)