1
- import collectd
2
- import random
3
- import json
1
+ import ConfigParser as conf_p
2
+ import os , collectd , random , json
4
3
from kafka .client import KafkaClient
5
4
from kafka .producer import SimpleProducer
6
5
7
- brokers = "darkstar.lqm.io:9092"
8
- topic = "collectd"
6
+ COLLECTD_PYTHON_CONF = '/etc/collectd/collectd2kafka.py.conf'
7
+
8
+ def parse_types_file (path ):
9
+ f = open (path , 'r' )
10
+
11
+ types = {}
12
+ for line in f :
13
+ fields = line .split ()
14
+ if len (fields ) < 2 :
15
+ continue
16
+
17
+ type_name = fields [0 ]
18
+
19
+ if type_name [0 ] == '#' :
20
+ continue
21
+
22
+ v = []
23
+ for ds in fields [1 :]:
24
+ ds = ds .rstrip (',' )
25
+ ds_fields = ds .split (':' )
26
+
27
+ if len (ds_fields ) != 4 :
28
+ collectd .warning ('collectd2python: cannot parse data source %s on type %s' % ( ds , type_name ))
29
+ continue
30
+
31
+ v .append (ds_fields )
32
+
33
+ types [type_name ] = v
34
+
35
+ f .close ()
36
+ return types
37
+
38
+ def get_config ():
39
+ config = conf_p .ConfigParser ()
40
+ config .read (CONFIG_FILE )
41
+ global KAFKA_BROKERS
42
+ global TOPIC
43
+ global TYPES
44
+ KAFKA_BROKERS = []
45
+ TYPES = {}
46
+ for x in config .get ('collectd2kafka' , 'Brokers' )
47
+ KAFKA_BROKERS .append (x )
48
+ TOPIC = config .get ('collectd2kafka' , 'Topic' )
49
+ TYPES .update (parse_types_file (config .get ('collectd2kafka' , 'TypesDB' )))
9
50
10
51
def config_callback (conf ):
11
- global brokers , topic
52
+ global KAFKA_BROKERS
53
+ global TOPIC
54
+ global TYPES
55
+ KAFKA_BROKERS = []
56
+ TYPES = {}
57
+
58
+
12
59
for node in conf .children :
13
60
if node .key == 'Brokers' :
14
- brokers = node .values [0 ]
61
+ for x in node .values :
62
+ print x
63
+ KAFKA_BROKERS .append (x )
15
64
elif node .key == 'Topic' :
16
- topic = node .values [0 ]
65
+ TOPIC = node .values [0 ]
66
+ elif node .key == 'TypesDB' :
67
+ for x in node .values :
68
+ TYPES .update (parse_types_file (x ))
17
69
18
- collectd .register_config (config_callback );
19
70
20
71
def write_callback (v , data = None ):
72
+ if v .type not in TYPES :
73
+ collectd .warning ('collectd2kafka: cannot handle type %s. check types.db file?' % v .type )
74
+ return
75
+
76
+ v_type = TYPES [v .type ]
77
+
78
+ if len (v_type ) != len (v .values ):
79
+ collectd .warning ('collectd2kafka: more values than type %s' % v .type )
80
+ return
81
+
82
+
21
83
metric = {}
22
84
metric ['host' ] = v .host
23
85
metric ['plugin' ] = v .plugin
@@ -28,11 +90,21 @@ def write_callback(v, data=None):
28
90
metric ['interval' ] = v .interval
29
91
metric ['values' ] = []
30
92
93
+
94
+ i = 0
31
95
for value in v .values :
96
+ s_name = v_type [i ][0 ]
97
+ ds_type = v_type [i ][1 ]
98
+
32
99
metric ['values' ].append (value )
33
100
34
- producer .send_messages ("collectd" , json .dumps (metric ))
101
+ producer .send_messages (TOPIC , json .dumps (metric ))
102
+ print "!!!!!!!!!!!!!!!!!!!"
103
+ print str (KAFKA_BROKERS )
104
+ print "!!!!!!!!!!!!!!!!!!!"
35
105
36
- kafka = KafkaClient ( brokers )
37
- producer = SimpleProducer ( kafka )
106
+ get_config ();
107
+ #collectd.register_config(config_callback);
38
108
collectd .register_write (write_callback );
109
+ kafka = KafkaClient (KAFKA_BROKERS )
110
+ producer = SimpleProducer (kafka )
0 commit comments