Skip to content

Commit 369478a

Browse files
committed
Remove old/unused errors; reorder; KafkaTimeout -> retriable
1 parent e2c3b80 commit 369478a

File tree

1 file changed

+52
-72
lines changed

1 file changed

+52
-72
lines changed

kafka/errors.py

+52-72
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,44 @@ def __str__(self):
1616
super(KafkaError, self).__str__())
1717

1818

19+
class Cancelled(KafkaError):
20+
retriable = True
21+
22+
23+
class CommitFailedError(KafkaError):
24+
def __init__(self, *args, **kwargs):
25+
super(CommitFailedError, self).__init__(
26+
"""Commit cannot be completed since the group has already
27+
rebalanced and assigned the partitions to another member.
28+
This means that the time between subsequent calls to poll()
29+
was longer than the configured max_poll_interval_ms, which
30+
typically implies that the poll loop is spending too much
31+
time message processing. You can address this either by
32+
increasing the rebalance timeout with max_poll_interval_ms,
33+
or by reducing the maximum size of batches returned in poll()
34+
with max_poll_records.
35+
""", *args, **kwargs)
36+
37+
38+
class IllegalArgumentError(KafkaError):
39+
pass
40+
41+
1942
class IllegalStateError(KafkaError):
2043
pass
2144

2245

23-
class IllegalArgumentError(KafkaError):
46+
class IncompatibleBrokerVersion(KafkaError):
2447
pass
2548

2649

27-
class NoBrokersAvailable(KafkaError):
28-
retriable = True
29-
invalid_metadata = True
50+
class KafkaConfigurationError(KafkaError):
51+
pass
3052

3153

32-
class NodeNotReadyError(KafkaError):
54+
class KafkaConnectionError(KafkaError):
3355
retriable = True
56+
invalid_metadata = True
3457

3558

3659
class KafkaProtocolError(KafkaError):
@@ -41,47 +64,41 @@ class CorrelationIdError(KafkaProtocolError):
4164
retriable = True
4265

4366

44-
class Cancelled(KafkaError):
67+
class KafkaTimeoutError(KafkaError):
4568
retriable = True
4669

4770

48-
class TooManyInFlightRequests(KafkaError):
71+
class MetadataEmptyBrokerList(KafkaError):
4972
retriable = True
5073

5174

52-
class StaleMetadata(KafkaError):
75+
class NoBrokersAvailable(KafkaError):
5376
retriable = True
5477
invalid_metadata = True
5578

5679

57-
class MetadataEmptyBrokerList(KafkaError):
80+
class NodeNotReadyError(KafkaError):
5881
retriable = True
5982

6083

61-
class UnrecognizedBrokerVersion(KafkaError):
84+
class QuotaViolationError(KafkaError):
6285
pass
6386

6487

65-
class IncompatibleBrokerVersion(KafkaError):
66-
pass
88+
class StaleMetadata(KafkaError):
89+
retriable = True
90+
invalid_metadata = True
6791

6892

69-
class CommitFailedError(KafkaError):
70-
def __init__(self, *args, **kwargs):
71-
super(CommitFailedError, self).__init__(
72-
"""Commit cannot be completed since the group has already
73-
rebalanced and assigned the partitions to another member.
74-
This means that the time between subsequent calls to poll()
75-
was longer than the configured max_poll_interval_ms, which
76-
typically implies that the poll loop is spending too much
77-
time message processing. You can address this either by
78-
increasing the rebalance timeout with max_poll_interval_ms,
79-
or by reducing the maximum size of batches returned in poll()
80-
with max_poll_records.
81-
""", *args, **kwargs)
93+
class TooManyInFlightRequests(KafkaError):
94+
retriable = True
95+
8296

97+
class UnrecognizedBrokerVersion(KafkaError):
98+
pass
8399

84-
class AuthenticationMethodNotSupported(KafkaError):
100+
101+
class UnsupportedCodecError(KafkaError):
85102
pass
86103

87104

@@ -97,6 +114,10 @@ def __str__(self):
97114
super(BrokerResponseError, self).__str__())
98115

99116

117+
class AuthorizationError(BrokerResponseError):
118+
pass
119+
120+
100121
class NoError(BrokerResponseError):
101122
errno = 0
102123
message = 'NO_ERROR'
@@ -332,21 +353,21 @@ class InvalidCommitOffsetSizeError(BrokerResponseError):
332353
' because of oversize metadata.')
333354

334355

335-
class TopicAuthorizationFailedError(BrokerResponseError):
356+
class TopicAuthorizationFailedError(AuthorizationError):
336357
errno = 29
337358
message = 'TOPIC_AUTHORIZATION_FAILED'
338359
description = ('Returned by the broker when the client is not authorized to'
339360
' access the requested topic.')
340361

341362

342-
class GroupAuthorizationFailedError(BrokerResponseError):
363+
class GroupAuthorizationFailedError(AuthorizationError):
343364
errno = 30
344365
message = 'GROUP_AUTHORIZATION_FAILED'
345366
description = ('Returned by the broker when the client is not authorized to'
346367
' access a particular groupId.')
347368

348369

349-
class ClusterAuthorizationFailedError(BrokerResponseError):
370+
class ClusterAuthorizationFailedError(AuthorizationError):
350371
errno = 31
351372
message = 'CLUSTER_AUTHORIZATION_FAILED'
352373
description = ('Returned by the broker when the client is not authorized to'
@@ -493,7 +514,7 @@ class TransactionCoordinatorFencedError(BrokerResponseError):
493514
retriable = False
494515

495516

496-
class TransactionalIdAuthorizationFailedError(BrokerResponseError):
517+
class TransactionalIdAuthorizationFailedError(AuthorizationError):
497518
errno = 53
498519
message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
499520
description = 'Transactional Id authorization failed.'
@@ -578,7 +599,7 @@ class DelegationTokenRequestNotAllowedError(BrokerResponseError):
578599
retriable = False
579600

580601

581-
class DelegationTokenAuthorizationFailedError(BrokerResponseError):
602+
class DelegationTokenAuthorizationFailedError(AuthorizationError):
582603
errno = 65
583604
message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
584605
description = 'Delegation Token authorization failed.'
@@ -1027,47 +1048,6 @@ class VoterNotFoundError(BrokerResponseError):
10271048
retriable = False
10281049

10291050

1030-
class KafkaUnavailableError(KafkaError):
1031-
pass
1032-
1033-
1034-
class KafkaTimeoutError(KafkaError):
1035-
pass
1036-
1037-
1038-
class FailedPayloadsError(KafkaError):
1039-
def __init__(self, payload, *args):
1040-
super(FailedPayloadsError, self).__init__(*args)
1041-
self.payload = payload
1042-
1043-
1044-
class KafkaConnectionError(KafkaError):
1045-
retriable = True
1046-
invalid_metadata = True
1047-
1048-
1049-
class ProtocolError(KafkaError):
1050-
pass
1051-
1052-
1053-
class UnsupportedCodecError(KafkaError):
1054-
pass
1055-
1056-
1057-
class KafkaConfigurationError(KafkaError):
1058-
pass
1059-
1060-
1061-
class QuotaViolationError(KafkaError):
1062-
pass
1063-
1064-
1065-
class AsyncProducerQueueFull(KafkaError):
1066-
def __init__(self, failed_msgs, *args):
1067-
super(AsyncProducerQueueFull, self).__init__(*args)
1068-
self.failed_msgs = failed_msgs
1069-
1070-
10711051
def _iter_broker_errors():
10721052
for name, obj in inspect.getmembers(sys.modules[__name__]):
10731053
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:

0 commit comments

Comments
 (0)