@@ -296,10 +296,12 @@ def _create_zk_chroot(self):
296
296
env = self .kafka_run_class_env ()
297
297
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
298
298
299
- if proc .wait () != 0 or proc .returncode != 0 :
299
+ stdout , stderr = proc .communicate ()
300
+
301
+ if proc .returncode != 0 :
300
302
self .out ("Failed to create Zookeeper chroot node" )
301
- self .out (proc . stdout . read () )
302
- self .out (proc . stderr . read () )
303
+ self .out (stdout )
304
+ self .out (stderr )
303
305
raise RuntimeError ("Failed to create Zookeeper chroot node" )
304
306
self .out ("Kafka chroot created in Zookeeper!" )
305
307
@@ -458,13 +460,12 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
458
460
args .append ('--if-not-exists' )
459
461
env = self .kafka_run_class_env ()
460
462
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
461
- ret = proc .wait ()
462
- if ret != 0 or proc .returncode != 0 :
463
- output = proc .stdout .read ()
464
- if not 'kafka.common.TopicExistsException' in output :
463
+ stdout , stderr = proc .communicate ()
464
+ if proc .returncode != 0 :
465
+ if not 'kafka.common.TopicExistsException' in stdout :
465
466
self .out ("Failed to create topic %s" % (topic_name ,))
466
- self .out (output )
467
- self .out (proc . stderr . read () )
467
+ self .out (stdout )
468
+ self .out (stderr )
468
469
raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
469
470
470
471
def create_topics (self , topic_names , num_partitions = None , replication_factor = None ):
0 commit comments