@@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
32
32
achieve something similar by manually assigning different partitions to each
33
33
consumer instance with config management tools like chef, ansible, etc. This
34
34
approach will work fine, though it does not support rebalancing on failures.
35
- See <https://kafka-python-ng.readthedocs.io/en/master/compatibility.html>
35
+
36
+ See https://kafka-python.readthedocs.io/en/master/compatibility.html
37
+
36
38
for more details.
37
39
38
40
Please note that the master branch may contain unreleased features. For release
39
41
documentation, please see readthedocs and/or python's inline help.
40
42
41
- >>> pip install kafka- python- ng
43
+
44
+ .. code-block :: bash
45
+
46
+ $ pip install kafka-python-ng
47
+
42
48
43
49
44
50
KafkaConsumer
@@ -48,89 +54,123 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
48
54
as possible to the official java client. Full support for coordinated
49
55
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.
50
56
51
- See <https://kafka-python-ng.readthedocs.io/en/master/apidoc/KafkaConsumer.html>
57
+
58
+ See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
59
+
52
60
for API and configuration details.
53
61
54
62
The consumer iterator returns ConsumerRecords, which are simple namedtuples
55
63
that expose basic message attributes: topic, partition, offset, key, and value:
56
64
57
- >>> from kafka import KafkaConsumer
58
- >>> consumer = KafkaConsumer(' my_favorite_topic' )
59
- >>> for msg in consumer:
60
- ... print (msg)
65
+ .. code-block :: python
61
66
62
- >>> # join a consumer group for dynamic partition assignment and offset commits
63
- >>> from kafka import KafkaConsumer
64
- >>> consumer = KafkaConsumer(' my_favorite_topic' , group_id = ' my_favorite_group' )
65
- >>> for msg in consumer:
66
- ... print (msg)
67
+ from kafka import KafkaConsumer
68
+ consumer = KafkaConsumer(' my_favorite_topic' )
69
+ for msg in consumer:
70
+ print (msg)
67
71
68
- >>> # manually assign the partition list for the consumer
69
- >>> from kafka import TopicPartition
70
- >>> consumer = KafkaConsumer(bootstrap_servers = ' localhost:1234' )
71
- >>> consumer.assign([TopicPartition(' foobar' , 2 )])
72
- >>> msg = next (consumer)
72
+ .. code-block :: python
73
73
74
- >>> # Deserialize msgpack-encoded values
75
- >>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
76
- >>> consumer.subscribe([ ' msgpackfoo ' ] )
77
- >>> for msg in consumer:
78
- ... assert isinstance (msg.value, dict )
74
+ # join a consumer group for dynamic partition assignment and offset commits
75
+ from kafka import KafkaConsumer
76
+ consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
77
+ for msg in consumer:
78
+ print (msg)
79
79
80
- >>> # Access record headers. The returned value is a list of tuples
81
- >>> # with str, bytes for key and value
82
- >>> for msg in consumer:
83
- ... print (msg.headers)
80
+ .. code-block :: python
84
81
85
- >>> # Get consumer metrics
86
- >>> metrics = consumer.metrics()
82
+ # manually assign the partition list for the consumer
83
+ from kafka import TopicPartition
84
+ consumer = KafkaConsumer(bootstrap_servers = ' localhost:1234' )
85
+ consumer.assign([TopicPartition(' foobar' , 2 )])
86
+ msg = next (consumer)
87
+
88
+ .. code-block :: python
89
+
90
+ # Deserialize msgpack-encoded values
91
+ consumer = KafkaConsumer(value_deserializer = msgpack.loads)
92
+ consumer.subscribe([' msgpackfoo' ])
93
+ for msg in consumer:
94
+ assert isinstance (msg.value, dict )
95
+
96
+ .. code-block :: python
97
+
98
+ # Access record headers. The returned value is a list of tuples
99
+ # with str, bytes for key and value
100
+ for msg in consumer:
101
+ print (msg.headers)
102
+
103
+ .. code-block :: python
104
+
105
+ # Get consumer metrics
106
+ metrics = consumer.metrics()
87
107
88
108
89
109
KafkaProducer
90
110
*************
91
111
92
112
KafkaProducer is a high-level, asynchronous message producer. The class is
93
113
intended to operate as similarly as possible to the official java client.
94
- See <https://kafka-python-ng.readthedocs.io/en/master/apidoc/KafkaProducer.html>
114
+
115
+ See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
116
+
95
117
for more details.
96
118
97
- >>> from kafka import KafkaProducer
98
- >>> producer = KafkaProducer(bootstrap_servers = ' localhost:1234' )
99
- >>> for _ in range (100 ):
100
- ... producer.send(' foobar' , b ' some_message_bytes' )
119
+ .. code-block :: python
120
+
121
+ from kafka import KafkaProducer
122
+ producer = KafkaProducer(bootstrap_servers = ' localhost:1234' )
123
+ for _ in range (100 ):
124
+ producer.send(' foobar' , b ' some_message_bytes' )
125
+
126
+ .. code-block :: python
127
+
128
+ # Block until a single message is sent (or timeout)
129
+ future = producer.send(' foobar' , b ' another_message' )
130
+ result = future.get(timeout = 60 )
131
+
132
+ .. code-block :: python
133
+
134
+ # Block until all pending messages are at least put on the network
135
+ # NOTE : This does not guarantee delivery or success! It is really
136
+ # only useful if you configure internal batching using linger_ms
137
+ producer.flush()
138
+
139
+ .. code-block :: python
101
140
102
- >>> # Block until a single message is sent (or timeout)
103
- >>> future = producer.send(' foobar' , b ' another_message' )
104
- >>> result = future.get(timeout = 60 )
141
+ # Use a key for hashed-partitioning
142
+ producer.send(' foobar' , key = b ' foo' , value = b ' bar' )
105
143
106
- >>> # Block until all pending messages are at least put on the network
107
- >>> # NOTE : This does not guarantee delivery or success! It is really
108
- >>> # only useful if you configure internal batching using linger_ms
109
- >>> producer.flush()
144
+ .. code-block :: python
110
145
111
- >>> # Use a key for hashed-partitioning
112
- >>> producer.send(' foobar' , key = b ' foo' , value = b ' bar' )
146
+ # Serialize json messages
147
+ import json
148
+ producer = KafkaProducer(value_serializer = lambda v : json.dumps(v).encode(' utf-8' ))
149
+ producer.send(' fizzbuzz' , {' foo' : ' bar' })
113
150
114
- >>> # Serialize json messages
115
- >>> import json
116
- >>> producer = KafkaProducer(value_serializer = lambda v : json.dumps(v).encode(' utf-8' ))
117
- >>> producer.send(' fizzbuzz' , {' foo' : ' bar' })
151
+ .. code-block :: python
118
152
119
- >>> # Serialize string keys
120
- >>> producer = KafkaProducer(key_serializer = str .encode)
121
- >>> producer.send(' flipflap' , key = ' ping' , value = b ' 1234' )
153
+ # Serialize string keys
154
+ producer = KafkaProducer(key_serializer = str .encode)
155
+ producer.send(' flipflap' , key = ' ping' , value = b ' 1234' )
122
156
123
- >>> # Compress messages
124
- >>> producer = KafkaProducer(compression_type = ' gzip' )
125
- >>> for i in range (1000 ):
126
- ... producer.send(' foobar' , b ' msg %d ' % i)
157
+ .. code-block :: python
127
158
128
- >>> # Include record headers. The format is list of tuples with string key
129
- >>> # and bytes value.
130
- >>> producer.send(' foobar' , value = b ' c29tZSB2YWx1ZQ==' , headers = [(' content-encoding' , b ' base64' )])
159
+ # Compress messages
160
+ producer = KafkaProducer(compression_type = ' gzip' )
161
+ for i in range (1000 ):
162
+ producer.send(' foobar' , b ' msg %d ' % i)
131
163
132
- >>> # Get producer performance metrics
133
- >>> metrics = producer.metrics()
164
+ .. code-block :: python
165
+
166
+ # Include record headers. The format is list of tuples with string key
167
+ # and bytes value.
168
+ producer.send(' foobar' , value = b ' c29tZSB2YWx1ZQ==' , headers = [(' content-encoding' , b ' base64' )])
169
+
170
+ .. code-block :: python
171
+
172
+ # Get producer performance metrics
173
+ metrics = producer.metrics()
134
174
135
175
136
176
Thread safety
@@ -154,16 +194,19 @@ kafka-python-ng supports the following compression formats:
154
194
- Zstandard (zstd)
155
195
156
196
gzip is supported natively, the others require installing additional libraries.
157
- See <https://kafka-python-ng.readthedocs.io/en/master/install.html> for more information.
197
+
198
+ See https://kafka-python.readthedocs.io/en/master/install.html for more information.
199
+
158
200
159
201
160
202
Optimized CRC32 Validation
161
203
**************************
162
204
163
205
Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure
164
206
python implementation for compatibility. To improve performance for high-throughput
165
- applications, kafka-python-ng will use `crc32c ` for optimized native code if installed.
166
- See <https://kafka-python-ng.readthedocs.io/en/master/install.html> for installation instructions.
207
+ applications, kafka-python will use `crc32c ` for optimized native code if installed.
208
+ See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.
209
+
167
210
See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.
168
211
169
212
0 commit comments