|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +Helper class for InfluxDB |
| 4 | +""" |
| 5 | +from collections import namedtuple, defaultdict |
| 6 | +from warnings import warn |
| 7 | + |
| 8 | +import six |
| 9 | + |
| 10 | + |
| 11 | +class SeriesHelper(object): |
| 12 | + |
| 13 | + """ |
| 14 | + Subclassing this helper eases writing data points in bulk. |
| 15 | + All data points are immutable, insuring they do not get overwritten. |
| 16 | + Each subclass can write to its own database. |
| 17 | + The time series names can also be based on one or more defined fields. |
| 18 | +
|
| 19 | + Annotated example:: |
| 20 | +
|
| 21 | + class MySeriesHelper(SeriesHelper): |
| 22 | + class Meta: |
| 23 | + # Meta class stores time series helper configuration. |
| 24 | + series_name = 'events.stats.{server_name}' |
| 25 | + # Series name must be a string, curly brackets for dynamic use. |
| 26 | + fields = ['time', 'server_name'] |
| 27 | + # Defines all the fields in this time series. |
| 28 | + ### Following attributes are optional. ### |
| 29 | + client = TestSeriesHelper.client |
| 30 | + # Client should be an instance of InfluxDBClient. |
| 31 | + :warning: Only used if autocommit is True. |
| 32 | + bulk_size = 5 |
| 33 | + # Defines the number of data points to write simultaneously. |
| 34 | + # Only applicable if autocommit is True. |
| 35 | + autocommit = True |
| 36 | + # If True and no bulk_size, then will set bulk_size to 1. |
| 37 | +
|
| 38 | + """ |
| 39 | + __initialized__ = False |
| 40 | + |
| 41 | + def __new__(cls, *args, **kwargs): |
| 42 | + """ |
| 43 | + Initializes class attributes for subsequent constructor calls. |
| 44 | +
|
| 45 | + :note: *args and **kwargs are not explicitly used in this function, |
| 46 | + but needed for Python 2 compatibility. |
| 47 | + """ |
| 48 | + if not cls.__initialized__: |
| 49 | + cls.__initialized__ = True |
| 50 | + try: |
| 51 | + _meta = getattr(cls, 'Meta') |
| 52 | + except AttributeError: |
| 53 | + raise AttributeError( |
| 54 | + 'Missing Meta class in {}.'.format( |
| 55 | + cls.__name__)) |
| 56 | + |
| 57 | + for attr in ['series_name', 'fields']: |
| 58 | + try: |
| 59 | + setattr(cls, '_' + attr, getattr(_meta, attr)) |
| 60 | + except AttributeError: |
| 61 | + raise AttributeError( |
| 62 | + 'Missing {} in {} Meta class.'.format( |
| 63 | + attr, |
| 64 | + cls.__name__)) |
| 65 | + |
| 66 | + cls._autocommit = getattr(_meta, 'autocommit', False) |
| 67 | + |
| 68 | + cls._client = getattr(_meta, 'client', None) |
| 69 | + if cls._autocommit and not cls._client: |
| 70 | + raise AttributeError( |
| 71 | + 'In {}, autocommit is set to True, but no client is set.' |
| 72 | + .format(cls.__name__)) |
| 73 | + |
| 74 | + try: |
| 75 | + cls._bulk_size = getattr(_meta, 'bulk_size') |
| 76 | + if cls._bulk_size < 1 and cls._autocommit: |
| 77 | + warn( |
| 78 | + 'Definition of bulk_size in {} forced to 1, ' |
| 79 | + 'was less than 1.'.format(cls.__name__)) |
| 80 | + cls._bulk_size = 1 |
| 81 | + except AttributeError: |
| 82 | + cls._bulk_size = -1 |
| 83 | + else: |
| 84 | + if not cls._autocommit: |
| 85 | + warn( |
| 86 | + 'Definition of bulk_size in {} has no affect because' |
| 87 | + ' autocommit is false.'.format(cls.__name__)) |
| 88 | + |
| 89 | + cls._datapoints = defaultdict(list) |
| 90 | + cls._type = namedtuple(cls.__name__, cls._fields) |
| 91 | + |
| 92 | + return super(SeriesHelper, cls).__new__(cls) |
| 93 | + |
| 94 | + def __init__(self, **kw): |
| 95 | + """ |
| 96 | + Constructor call creates a new data point. All fields must be present. |
| 97 | +
|
| 98 | + :note: Data points written when `bulk_size` is reached per Helper. |
| 99 | + :warning: Data points are *immutable* (`namedtuples`). |
| 100 | + """ |
| 101 | + cls = self.__class__ |
| 102 | + |
| 103 | + if sorted(cls._fields) != sorted(kw.keys()): |
| 104 | + raise NameError( |
| 105 | + 'Expected {0}, got {1}.'.format( |
| 106 | + cls._fields, |
| 107 | + kw.keys())) |
| 108 | + |
| 109 | + cls._datapoints[cls._series_name.format(**kw)].append(cls._type(**kw)) |
| 110 | + |
| 111 | + if cls._autocommit and \ |
| 112 | + sum(len(series) for series in cls._datapoints.values()) \ |
| 113 | + >= cls._bulk_size: |
| 114 | + cls.commit() |
| 115 | + |
| 116 | + @classmethod |
| 117 | + def commit(cls, client=None): |
| 118 | + """ |
| 119 | + Commit everything from datapoints via the client. |
| 120 | +
|
| 121 | + :param client: InfluxDBClient instance for writing points to InfluxDB. |
| 122 | + :attention: any provided client will supersede the class client. |
| 123 | + :return: result of client.write_points. |
| 124 | + """ |
| 125 | + if not client: |
| 126 | + client = cls._client |
| 127 | + rtn = client.write_points(cls._json_body_()) |
| 128 | + cls._reset_() |
| 129 | + return rtn |
| 130 | + |
| 131 | + @classmethod |
| 132 | + def _json_body_(cls): |
| 133 | + """ |
| 134 | + :return: JSON body of these datapoints. |
| 135 | + """ |
| 136 | + json = [] |
| 137 | + for series_name, data in six.iteritems(cls._datapoints): |
| 138 | + json.append({'name': series_name, |
| 139 | + 'columns': cls._fields, |
| 140 | + 'points': [[point.__dict__[k] for k in cls._fields] |
| 141 | + for point in data] |
| 142 | + }) |
| 143 | + return json |
| 144 | + |
| 145 | + @classmethod |
| 146 | + def _reset_(cls): |
| 147 | + """ |
| 148 | + Reset data storage. |
| 149 | + """ |
| 150 | + cls._datapoints = defaultdict(list) |
0 commit comments