Storm Kinesis 集成
Storm Kinesis Spout
提供的核心storm spout(喷口),用户从Amzon Kinesis Streams 中的流中消费数据。它存储可以在zookeeper中提交的序列号,并在重新启动后默认启动消息记录。下面是创建使用spout的示例拓扑的代码示例。下面说明配置spout(喷口)时使用的每个对象。理想情况下,spout(喷口)任务的数量应等于运动时间碎片的数量。但是,每个任务都可以从多个分片中读取。
public class KinesisSpoutTopology {
public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
String topologyName = args[0];
RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
1000);
ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", kinesisSpout, 3);
topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
Config topologyConfig = new Config();
topologyConfig.setDebug(true);
topologyConfig.setNumWorkers(3);
StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
}
}
就像你可以看到的,在它的构造函数中,引出了一个KinesisConfig对象。KinesisConfig的构造函数需要8个对象,如下所描述。
String
streamName
用于消费数据的kinesis时间流的名称
ShardIteratorType
shardIteratorType
支持3种类型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默认情况下,如果分片状态为this,则忽略此参数在zookeeper中寻找。所以它们将首次应用从拓扑开始。如果您想在拓扑的后续运行中使用任何这些拓扑,将需要清除用于存储序列号的zookeeper节点的状态。
RecordToTupleMapper
recordToTupleMapper
一个 RecordToTupleMapper
接口的实现,该端口将会将kinesis记录转换为storm元组。它有两种方法。getOutputFields 告诉spout 要从getTuple方法发出的元组中存在的数据。如果getTuple返回null,记录将被确认。 java Fields getOutputFields (); List<Object> getTuple (Record record);
Date
timestamp
与 AT_TIMESTAMP sharedIteratorType参数结合使用。这将使得spout(喷口)从那时开始提取记录。该kinesis时间使用的时间是与kinesis时间记录相关的服务器端时间。
FailedMessageRetryHadnler
failedMessageRetryHandler
FailedMessageRetryHandler
接口的实现。 默认情况下,该模块提供支持指数退避重试的实现失败消息的机制。该实现有两个构造函数。默认值没有args 构造函数将配置在100毫秒的第一次重试随后在Math.pow(2,i-1)中退出,其中i是范围2中的重试次数到LONG.MAX_LONG。2表示指数函数的基数(秒)。另外一个构造函数以毫秒为单位进行重试间隔,作为第一个参数进行首次重试,以秒为单位的指数函数为第二个参数,重试作为第三个参数。这种接口的方法及其spout(喷口)的工作方式如下描述 java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId);
将在每个发生故障的元组上调用失败的方法。如果计划重新尝试失败的消息,则返回true,否则返回false。
getNextFailedMessageToRetry 方法将被称为第一件事,每次一个spout(喷口)想要发生一个元组。它应该返回一个应该重试的消息,如果有,否则为空。请注意,在该时间段内没有任何重试信息的情况下,它可以返回null。但是,当该消息被调用失败方法时,它最终将返回它返回true的每个消息
如果spout(喷口)成功的设法从kinesis时获取记录并发送,failedMessageEmitted将被调用。否则,当getNextFailedMessageToRetry方法被再次调用的时候,该实现应该会返回相同的消息
一但失败的消息被重新发送并被spout(喷口)成功地确认,则会被呼叫。如果失败,spout(喷口)失败就会被再次呼叫。
ZkInfo
zkInfo
封装的对象信息用来zookeeper的交互。这个构造函数使用zkUrl作为第一个参数,它是一个逗号分隔的字符串zk host和端口,zkNode作为第二个参数将用作存储提交序列号的根节点,会话超时以毫秒为单位,连接超时作为第四毫秒,提交间隔为提交序列号到zookeeper的毫秒数的五分之一,重试尝试作为zk客户端的第六个连接重试尝试,重试间隔以毫秒为单位的等待时间,然后再重试连接。
KinesisConnectionInfo
kinesisConnectionInfo
使用kinesis客户端捕获连接到kinesis的参数的对象。它有一个构造函数来实现 AWSCredentialsProvider
作为第一个参数。该模块提供了一个称为 CredentialsProviderChain
的实现,它允许spout(喷口)使用以下之一进行kinesis检测,这5个机制顺序按照以下顺序 - EnvironmentVariableCredentialsProvider
, SystemPropertiesCredentialsProvider
, ClasspathPropertiesFileCredentialsProvider
, InstanceProfileCredentialsProvider
, ProfileCredentialsProvider
。它需要一个ClientConfiguration
对象作为配置。它需要一个 ClientConfiguration
对象作为配置运动的第二个参数客户端,Regions
作为第三个参数,设置客户端上要连接的区域,recordsLimit作为第四个参数,表示最大数量的记录Kinesis客户端将每个GetReocrds请求检索。这个限制应该根据记录的大小,运动时间来仔细选择吞吐量限制和风暴中每个元组延迟的拓扑。另外如果一个任务将从多个分片读取,那么这将影响选择权限认证。
Long
maxUncommittedRecords
这表示每个任务允许的最大未提交序列号数。一旦达到这个数字,spout就不会从kinesis中获取任何新的记录。未提交的序列号被定义为尚未提交给zookeeper的任务的所有消息的总和。这与拓扑级别最大待处理消息不同。例如,如果此值设置为10,并且spout将序列号从1发送到10。序号1正在等待,2到10次被告知。在这种情况下,未提交的序列的数量为10,因为1到10范围内的序列号可以被提交到zk。但是,storm仍然可以在端口上调用下一个元组,因为只有一个等待消息。
Maven dependencies
Aws sdk version that this was tested with is 1.10.77
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
</dependencies>
将来的工作
处理kinesis中的碎片的合并或分裂,Trident喷口实施和指标