Storm OpenTSDB 集成
Storm OpenTSDB Bolt 和 TridentState
OpenTSDB 为时间序列数据提供了可扩展且高可用性的存储. 它由 Time Series Daemon (TSD) servers 以及命令行工具组成. 每个 TSD 连接到配置的 HBase 集群以 push/query(推送/查询) 数据.
时间序列数据点包括: - a metric name. - a UNIX timestamp (seconds or milliseconds since Epoch). - a value (64 bit integer or single-precision floating point value). - a set of tags (key-value pairs) that describe the time series the point belongs to.
Storm bolt 和 trident state 从一个基于给定的 TupleMetricPointMapper
的 tuple 中创建了上面的时间序列数据.
该模块提供了 core Storm 和 Trident bolt 实现, 用户将数据写入 OpenTSDB.
时间序列数据点被写入时有 at-least-once(至少一次)的语义保证, 并且重复的数据点应该像 OpenTSDB 的 这里 提及的一样来处理.
示例
Core Bolt
下面的示例描述了 org.apache.storm.opentsdb.bolt.OpenTsdbBolt
cord bolt 的使用方法
OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
Trident State
final OpenTsdbStateFactory openTsdbStateFactory =
new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
TridentTopology tridentTopology = new TridentTopology();
final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());
stream.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("########### Received tuple: [{}]", input);
}
}).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());