Skip to content

Commit ca4c754

Browse files
committed
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
1 parent 0a88ccf commit ca4c754

File tree

5 files changed

+229
-111
lines changed

5 files changed

+229
-111
lines changed

pylint.rc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[TYPECHECK]
22
ignored-classes=SyncManager,_socketobject
3+
generated-members=py.*
34

45
[MESSAGES CONTROL]
56
disable=E1129

test/conftest.py

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,111 @@
11
from __future__ import absolute_import
22

3-
import os
3+
import inspect
44

55
import pytest
6+
from decorator import decorate
67

78
from test.fixtures import KafkaFixture, ZookeeperFixture
8-
9+
from test.testutil import kafka_version, random_string
910

1011
@pytest.fixture(scope="module")
1112
def version():
12-
if 'KAFKA_VERSION' not in os.environ:
13-
return ()
14-
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
15-
13+
return kafka_version()
1614

1715
@pytest.fixture(scope="module")
18-
def zookeeper(version, request):
19-
assert version
16+
def zookeeper():
2017
zk = ZookeeperFixture.instance()
2118
yield zk
2219
zk.close()
2320

21+
_params = {}
22+
def _set_params(func, *args, **kw):
23+
ret = func(*args, **kw)
24+
return ret
25+
26+
def set_params(**params):
27+
global _params
28+
29+
def real_set_params(func, *args, **kwargs):
30+
_params[func.__name__] = params
31+
ret = decorate(func, _set_params)
32+
return ret
33+
34+
return real_set_params
35+
36+
# this must be called from the module where the broker is used
37+
def set_broker_params(**params):
38+
global _params
39+
caller = inspect.stack()[1]
40+
if isinstance(caller, tuple):
41+
filename = caller[1]
42+
else:
43+
filename = inspect.stack()[1].filename
44+
_params[filename] = params
45+
46+
@pytest.fixture(scope='module')
47+
def broker_params(request):
48+
global _params
49+
return _params[request.node.fspath] if request.node.fspath in _params else None
50+
51+
@pytest.fixture
52+
def kafka_consumer_params(request):
53+
return _get_params(request)
54+
55+
@pytest.fixture
56+
def kafka_producer_params(request):
57+
return _get_params(request)
58+
59+
def _get_params(request):
60+
global _params
61+
if request._pyfuncitem.name in _params and \
62+
request.fixturename.replace('_params', '') in _params[request._pyfuncitem.name]:
63+
return _params[request._pyfuncitem.name][request.fixturename.replace('_params', '')]
64+
return None
65+
66+
@pytest.fixture(scope="module")
67+
def kafka_broker(kafka_brokers):
68+
return kafka_brokers[0]
2469

2570
@pytest.fixture(scope="module")
26-
def kafka_broker(version, zookeeper, request):
27-
assert version
28-
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
29-
partitions=4)
30-
yield k
31-
k.close()
71+
def kafka_brokers(version, zookeeper, broker_params):
72+
assert version, 'KAFKA_VERSION must be specified to run integration tests'
73+
params = {} if broker_params is None else broker_params.copy()
74+
params.setdefault('partitions', 4)
75+
num_brokers = params.pop('num_brokers', 1)
76+
brokers = tuple(KafkaFixture.instance(x, zookeeper.host, zookeeper.port, **params)
77+
for x in range(num_brokers))
78+
yield brokers
79+
for broker in brokers:
80+
broker.close()
81+
82+
@pytest.fixture
83+
def kafka_client(kafka_broker, request):
84+
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
85+
yield client
86+
client.close()
3287

88+
@pytest.fixture
89+
def kafka_consumer(kafka_broker, topic, request, kafka_consumer_params):
90+
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
91+
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
92+
(consumer,) = kafka_broker.get_consumers(cnt=1, topics=[topic], **params)
93+
yield consumer
94+
consumer.close()
95+
96+
@pytest.fixture
97+
def kafka_producer(kafka_broker, request, kafka_producer_params):
98+
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
99+
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
100+
(producer,) = kafka_broker.get_producers(cnt=1, **params)
101+
yield producer
102+
producer.close()
103+
104+
@pytest.fixture
105+
def topic(kafka_broker, request):
106+
topic_name = '%s_%s' % (request.node.name, random_string(10))
107+
kafka_broker.create_topics([topic_name])
108+
return topic_name
33109

34110
@pytest.fixture
35111
def conn(mocker):

0 commit comments

Comments
 (0)