Storm Druid 集成
Storm Druid Bolt 和 TridentState
该模块提供了将数据写入Druid 数据存储的核心Strom和Trident bolt(螺栓)的实现。 该实现使用Druid's的Tranquility库向druid发送消息。
一些实施细节从现有的借用 Tranquility Storm Bolt. 这个新的Bolt(螺栓)增加了支持最新的storm释放,并保持在storm回购的bolt(螺栓)。
Core Bolt
下面的例子描述了使用 org.apache.storm.druid.bolt.DruidBeamBolt
的核心bolt(螺栓)默认情况下,该bolt(螺栓)希望收到元组,其中"事件"字段提供您的事件类型。可以通过实现ITupleDruidEventMapper接口来更改此逻辑。
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
Trident State
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
stream.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("########### Received tuple: [{}]", input);
}
}).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
样品工厂实现
Druid bolt 必须配置一个 BeamFactory. 您可以使用它们其中一个来实现 DruidBeams builder's "buildBeam()" method. See the Configuration documentation for details. For more details refer Tranquility library docs.
public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory(
"click"
)
);
// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
{
@Override
public DateTime timestamp(Map<String, Object> theMap)
{
return new DateTime(theMap.get("timestamp"));
}
};
// Tranquility uses ZooKeeper (through Curator) for coordination.
final CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
.build();
curator.start();
// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
// Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
// done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
// In this case, we won't provide one, so we're just using Jackson.
final Beam<Map<String, Object>> beam = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.timestampSpec(timestampSpec)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning
.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT10M"))
.partitions(1)
.replicants(1)
.build()
)
.druidBeamConfig(
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period("PT10M"))
.build())
.buildBeam();
return beam;
}
}
Example code is available here.