Skip to content

Commit 6db022c

Browse files
committed
test_subscription_state
1 parent dfca19c commit 6db022c

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

test/test_subscription_state.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from __future__ import absolute_import
2+
3+
import pytest
4+
5+
from kafka import TopicPartition
6+
from kafka.consumer.subscription_state import SubscriptionState, TopicPartitionState
7+
from kafka.vendor import six
8+
9+
10+
def test_type_error():
11+
s = SubscriptionState()
12+
with pytest.raises(TypeError):
13+
s.subscribe(topics='foo')
14+
15+
s.subscribe(topics=['foo'])
16+
17+
18+
def test_change_subscription():
19+
s = SubscriptionState()
20+
s.subscribe(topics=['foo'])
21+
assert s.subscription == set(['foo'])
22+
s.change_subscription(['bar'])
23+
assert s.subscription == set(['bar'])
24+
25+
26+
def test_group_subscribe():
27+
s = SubscriptionState()
28+
s.subscribe(topics=['foo'])
29+
assert s.subscription == set(['foo'])
30+
s.group_subscribe(['bar'])
31+
assert s.subscription == set(['foo'])
32+
assert s._group_subscription == set(['foo', 'bar'])
33+
34+
s.reset_group_subscription()
35+
assert s.subscription == set(['foo'])
36+
assert s._group_subscription == set(['foo'])
37+
38+
39+
def test_assign_from_subscribed():
40+
s = SubscriptionState()
41+
s.subscribe(topics=['foo'])
42+
with pytest.raises(ValueError):
43+
s.assign_from_subscribed([TopicPartition('bar', 0)])
44+
45+
s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
46+
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])
47+
assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)])
48+
assert all([not s.has_valid_position for s in six.itervalues(s.assignment)])

0 commit comments

Comments
 (0)