Skip to content

Commit a856dc4

Browse files
Remove all vendoring (dpkp#169)
Now that the codebase has been modernised by using pyupgrade, we can also remove all backported vendor modules, and all uses of them.
1 parent fcca556 commit a856dc4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+90
-2733
lines changed

.covrc

Lines changed: 0 additions & 3 deletions
This file was deleted.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ test-local: build-integration
2929
cov-local: build-integration
3030
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
3131
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
32-
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
32+
--cov-report html $(FLAGS) kafka test
3333
@echo "open file://`pwd`/htmlcov/index.html"
3434

3535
# Check the readme for syntax errors, which can lead to invalid formatting on

benchmarks/consumer_performance.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import threading
1111
import traceback
1212

13-
from kafka.vendor.six.moves import range
14-
1513
from kafka import KafkaConsumer, KafkaProducer
1614
from test.fixtures import KafkaFixture, ZookeeperFixture
1715

benchmarks/producer_performance.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import threading
1010
import traceback
1111

12-
from kafka.vendor.six.moves import range
13-
1412
from kafka import KafkaProducer
1513
from test.fixtures import KafkaFixture, ZookeeperFixture
1614

benchmarks/varint_speed.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env python
2-
from __future__ import print_function
32
import pyperf
4-
from kafka.vendor import six
53

64

75
test_data = [
@@ -67,6 +65,10 @@
6765
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))
6866

6967

68+
def int2byte(i):
69+
return bytes((i),)
70+
71+
7072
def _assert_valid_enc(enc_func):
7173
for encoded, decoded in test_data:
7274
assert enc_func(decoded) == encoded, decoded
@@ -116,7 +118,7 @@ def encode_varint_1(num):
116118
_assert_valid_enc(encode_varint_1)
117119

118120

119-
def encode_varint_2(value, int2byte=six.int2byte):
121+
def encode_varint_2(value, int2byte=int2byte):
120122
value = (value << 1) ^ (value >> 63)
121123

122124
bits = value & 0x7f
@@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
151153
assert res == encoded
152154

153155

154-
def encode_varint_4(value, int2byte=six.int2byte):
156+
def encode_varint_4(value, int2byte=int2byte):
155157
value = (value << 1) ^ (value >> 63)
156158

157159
if value <= 0x7f: # 1 byte
@@ -301,22 +303,13 @@ def size_of_varint_2(value):
301303
_assert_valid_size(size_of_varint_2)
302304

303305

304-
if six.PY3:
305-
def _read_byte(memview, pos):
306-
""" Read a byte from memoryview as an integer
307-
308-
Raises:
309-
IndexError: if position is out of bounds
310-
"""
311-
return memview[pos]
312-
else:
313-
def _read_byte(memview, pos):
314-
""" Read a byte from memoryview as an integer
306+
def _read_byte(memview, pos):
307+
""" Read a byte from memoryview as an integer
315308
316309
Raises:
317310
IndexError: if position is out of bounds
318311
"""
319-
return ord(memview[pos])
312+
return memview[pos]
320313

321314

322315
def decode_varint_1(buffer, pos=0):

kafka/admin/acl_resource.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
from kafka.errors import IllegalArgumentError
1+
from enum import IntEnum
22

3-
# enum in stdlib as of py3.4
4-
try:
5-
from enum import IntEnum # pylint: disable=import-error
6-
except ImportError:
7-
# vendored backport module
8-
from kafka.vendor.enum34 import IntEnum
3+
from kafka.errors import IllegalArgumentError
94

105

116
class ResourceType(IntEnum):

kafka/admin/client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import socket
55

66
from . import ConfigResourceType
7-
from kafka.vendor import six
87

98
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
109
ACLResourcePatternType

kafka/admin/config_resource.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
# enum in stdlib as of py3.4
2-
try:
3-
from enum import IntEnum # pylint: disable=import-error
4-
except ImportError:
5-
# vendored backport module
6-
from kafka.vendor.enum34 import IntEnum
1+
from enum import IntEnum
72

83

94
class ConfigResourceType(IntEnum):

kafka/client_async.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,12 @@
22
import copy
33
import logging
44
import random
5+
import selectors
56
import socket
67
import threading
78
import time
89
import weakref
910

10-
# selectors in stdlib as of py3.4
11-
try:
12-
import selectors # pylint: disable=import-error
13-
except ImportError:
14-
# vendored backport module
15-
from kafka.vendor import selectors34 as selectors
16-
17-
from kafka.vendor import six
18-
1911
from kafka.cluster import ClusterMetadata
2012
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
2113
from kafka import errors as Errors
@@ -25,9 +17,6 @@
2517
from kafka.metrics.stats.rate import TimeUnit
2618
from kafka.protocol.metadata import MetadataRequest
2719
from kafka.util import Dict, WeakMethod
28-
# Although this looks unused, it actually monkey-patches socket.socketpair()
29-
# and should be left in as long as we're using socket.socketpair() in this file
30-
from kafka.vendor import socketpair
3120
from kafka.version import __version__
3221

3322
log = logging.getLogger('kafka.client')

kafka/cluster.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import threading
55
import time
66

7-
from kafka.vendor import six
8-
97
from kafka import errors as Errors
108
from kafka.conn import collect_hosts
119
from kafka.future import Future

kafka/codec.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import platform
44
import struct
55

6-
from kafka.vendor import six
7-
from kafka.vendor.six.moves import range
8-
96
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
107
_XERIAL_V1_FORMAT = 'bccccccBii'
118
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

kafka/conn.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
11
import copy
22
import errno
33
import logging
4+
import selectors
45
from random import shuffle, uniform
56

6-
# selectors in stdlib as of py3.4
7-
try:
8-
import selectors # pylint: disable=import-error
9-
except ImportError:
10-
# vendored backport module
11-
from kafka.vendor import selectors34 as selectors
12-
137
import socket
148
import threading
159
import time
1610

17-
from kafka.vendor import six
18-
1911
from kafka import sasl
2012
import kafka.errors as Errors
2113
from kafka.future import Future
@@ -565,8 +557,6 @@ def _send_bytes(self, data):
565557
except (SSLWantReadError, SSLWantWriteError):
566558
break
567559
except (ConnectionError, TimeoutError) as e:
568-
if six.PY2 and e.errno == errno.EWOULDBLOCK:
569-
break
570560
raise
571561
except BlockingIOError:
572562
break
@@ -863,8 +853,6 @@ def _recv(self):
863853
except (SSLWantReadError, SSLWantWriteError):
864854
break
865855
except (ConnectionError, TimeoutError) as e:
866-
if six.PY2 and e.errno == errno.EWOULDBLOCK:
867-
break
868856
log.exception('%s: Error receiving network data'
869857
' closing socket', self)
870858
err = Errors.KafkaConnectionError(e)

kafka/consumer/fetcher.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import sys
66
import time
77

8-
from kafka.vendor import six
9-
108
import kafka.errors as Errors
119
from kafka.future import Future
1210
from kafka.metrics.stats import Avg, Count, Max, Rate

kafka/consumer/group.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
77

8-
from kafka.vendor import six
9-
108
from kafka.client_async import KafkaClient, selectors
119
from kafka.consumer.fetcher import Fetcher
1210
from kafka.consumer.subscription_state import SubscriptionState

kafka/consumer/subscription_state.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import logging
33
import re
44

5-
from kafka.vendor import six
6-
75
from kafka.errors import IllegalStateError
86
from kafka.protocol.offset import OffsetResetStrategy
97
from kafka.structs import OffsetAndMetadata

kafka/coordinator/assignors/range.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
from __future__ import absolute_import
2-
31
import collections
42
import logging
53

6-
from kafka.vendor import six
7-
84
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
95
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
106

@@ -34,14 +30,14 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
3430
@classmethod
3531
def assign(cls, cluster, member_metadata):
3632
consumers_per_topic = collections.defaultdict(list)
37-
for member, metadata in six.iteritems(member_metadata):
33+
for member, metadata in member_metadata.items():
3834
for topic in metadata.subscription:
3935
consumers_per_topic[topic].append(member)
4036

4137
# construct {member_id: {topic: [partition, ...]}}
4238
assignment = collections.defaultdict(dict)
4339

44-
for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
40+
for topic, consumers_for_topic in consumers_per_topic.items():
4541
partitions = cluster.partitions_for_topic(topic)
4642
if partitions is None:
4743
log.warning('No partition metadata for topic %s', topic)

kafka/coordinator/assignors/roundrobin.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from __future__ import absolute_import
2-
31
import collections
42
import itertools
53
import logging
64

7-
from kafka.vendor import six
8-
95
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
106
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
117
from kafka.structs import TopicPartition
@@ -51,7 +47,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
5147
@classmethod
5248
def assign(cls, cluster, member_metadata):
5349
all_topics = set()
54-
for metadata in six.itervalues(member_metadata):
50+
for metadata in member_metadata.values():
5551
all_topics.update(metadata.subscription)
5652

5753
all_topic_partitions = []

kafka/coordinator/assignors/sticky/partition_movements.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from collections import defaultdict, namedtuple
33
from copy import deepcopy
44

5-
from kafka.vendor import six
6-
75
log = logging.getLogger(__name__)
86

97

@@ -74,7 +72,7 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
7472
return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))
7573

7674
def are_sticky(self):
77-
for topic, movements in six.iteritems(self.partition_movements_by_topic):
75+
for topic, movements in self.partition_movements_by_topic.items():
7876
movement_pairs = set(movements.keys())
7977
if self._has_cycles(movement_pairs):
8078
log.error(

0 commit comments

Comments
 (0)