Skip to content

Add Request/Response structs for kafka broker 1.0.0 #1368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
Expand Down
41 changes: 39 additions & 2 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class SaslHandShakeResponse_v0(Response):
)


class SaslHandShakeResponse_v1(Response):
API_KEY = 17
API_VERSION = 1
SCHEMA = SaslHandShakeResponse_v0.SCHEMA


class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
Expand All @@ -294,5 +300,36 @@ class SaslHandShakeRequest_v0(Request):
('mechanism', String('utf-8'))
)

SaslHandShakeRequest = [SaslHandShakeRequest_v0]
SaslHandShakeResponse = [SaslHandShakeResponse_v0]

class SaslHandShakeRequest_v1(Request):
API_KEY = 17
API_VERSION = 1
RESPONSE_TYPE = SaslHandShakeResponse_v1
SCHEMA = SaslHandShakeRequest_v0.SCHEMA


SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]


class SaslAuthenticateResponse_v0(Request):
API_KEY = 36
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('error_message', String('utf-8')),
('sasl_auth_bytes', Bytes)
)


class SaslAuthenticateRequest_v0(Request):
API_KEY = 36
API_VERSION = 0
RESPONSE_TYPE = SaslAuthenticateResponse_v0
SCHEMA = Schema(
('sasl_auth_bytes', Bytes)
)


SaslAuthenticateRequest = [SaslAuthenticateRequest_v0]
SaslAuthenticateResponse = [SaslAuthenticateResponse_v0]
28 changes: 26 additions & 2 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class FetchResponse_v5(Response):
)


class FetchResponse_v6(Response):
"""
Same as FetchResponse_v5. The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
"""
API_KEY = 1
API_VERSION = 6
SCHEMA = FetchResponse_v5.SCHEMA


class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
Expand Down Expand Up @@ -174,11 +184,25 @@ class FetchRequest_v5(Request):
)


class FetchRequest_v6(Request):
"""
The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
"""
API_KEY = 1
API_VERSION = 6
RESPONSE_TYPE = FetchResponse_v6
SCHEMA = FetchRequest_v5.SCHEMA


FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
FetchRequest_v6
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
FetchResponse_v6
]
2 changes: 1 addition & 1 deletion kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class JoinGroupRequest_v2(Request):
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
]
JoinGroupResponse = [
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1
Copy link
Contributor

@jeffwidman jeffwidman Feb 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 👍

JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2
]


Expand Down
43 changes: 41 additions & 2 deletions kafka/protocol/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ class MetadataResponse_v4(Response):
SCHEMA = MetadataResponse_v3.SCHEMA


class MetadataResponse_v5(Response):
API_KEY = 3
API_VERSION = 5
SCHEMA = Schema(
('throttle_time_ms', Int32),
('brokers', Array(
('node_id', Int32),
('host', String('utf-8')),
('port', Int32),
('rack', String('utf-8')))),
('cluster_id', String('utf-8')),
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('offline_replicas', Array(Int32))))))
)


class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
Expand Down Expand Up @@ -151,11 +177,24 @@ class MetadataRequest_v4(Request):
NO_TOPICS = None # Empty array (len 0) for topics returns no topics


class MetadataRequest_v5(Request):
"""
The v5 metadata request is the same as v4.
An additional field for offline_replicas has been added to the v5 metadata response
"""
API_KEY = 3
API_VERSION = 5
RESPONSE_TYPE = MetadataResponse_v5
SCHEMA = MetadataRequest_v4.SCHEMA
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
NO_TOPICS = None # Empty array (len 0) for topics returns no topics


MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
MetadataRequest_v3, MetadataRequest_v4
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
MetadataResponse_v3, MetadataResponse_v4
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
]
88 changes: 59 additions & 29 deletions kafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,52 +52,67 @@ class ProduceResponse_v3(Response):
SCHEMA = ProduceResponse_v2.SCHEMA


class ProduceRequest_v0(Request):
class ProduceResponse_v4(Response):
"""
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
"""
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
API_VERSION = 4
SCHEMA = ProduceResponse_v3.SCHEMA


class ProduceResponse_v5(Response):
API_KEY = 0
API_VERSION = 5
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
('error_code', Int16),
('offset', Int64),
('timestamp', Int64),
('log_start_offset', Int64))))),
('throttle_time_ms', Int32)
)


class ProduceRequest(Request):
API_KEY = 0

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v1(Request):
API_KEY = 0
class ProduceRequest_v0(ProduceRequest):
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
)


class ProduceRequest_v1(ProduceRequest):
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v2(Request):
API_KEY = 0
class ProduceRequest_v2(ProduceRequest):
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v3(Request):
API_KEY = 0
class ProduceRequest_v3(ProduceRequest):
API_VERSION = 3
RESPONSE_TYPE = ProduceResponse_v3
SCHEMA = Schema(
Expand All @@ -111,17 +126,32 @@ class ProduceRequest_v3(Request):
('messages', Bytes)))))
)

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True

class ProduceRequest_v4(ProduceRequest):
"""
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
"""
API_VERSION = 4
RESPONSE_TYPE = ProduceResponse_v4
SCHEMA = ProduceRequest_v3.SCHEMA


class ProduceRequest_v5(ProduceRequest):
"""
Same as v4. The version number is bumped since the v5 response includes an additional
partition level field: the log_start_offset.
"""
API_VERSION = 5
RESPONSE_TYPE = ProduceResponse_v5
SCHEMA = ProduceRequest_v4.SCHEMA


ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
ProduceRequest_v3
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
ProduceResponse_v2
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
]