Skip to content

Storm Kinesis 集成

Storm Kinesis Spout

提供的核心storm spout(喷口),用户从Amzon Kinesis Streams 中的流中消费数据。它存储可以在zookeeper中提交的序列号,并在重新启动后默认启动消息记录。下面是创建使用spout的示例拓扑的代码示例。下面说明配置spout(喷口)时使用的每个对象。理想情况下,spout(喷口)任务的数量应等于运动时间碎片的数量。但是,每个任务都可以从多个分片中读取。

public class KinesisSpoutTopology {
    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        String topologyName = args[0];
        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
                1000);
        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", kinesisSpout, 3);
        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
        Config topologyConfig = new Config();
        topologyConfig.setDebug(true);
        topologyConfig.setNumWorkers(3);
        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
    }
} 

就像你可以看到的,在它的构造函数中,引出了一个KinesisConfig对象。KinesisConfig的构造函数需要8个对象,如下所描述。

String streamName

用于消费数据的kinesis时间流的名称

ShardIteratorType shardIteratorType

支持3种类型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默认情况下,如果分片状态为this,则忽略此参数在zookeeper中寻找。所以它们将首次应用从拓扑开始。如果您想在拓扑的后续运行中使用任何这些拓扑,将需要清除用于存储序列号的zookeeper节点的状态。

RecordToTupleMapper recordToTupleMapper

一个 RecordToTupleMapper 接口的实现,该端口将会将kinesis记录转换为storm元组。它有两种方法。getOutputFields 告诉spout 要从getTuple方法发出的元组中存在的数据。如果getTuple返回null,记录将被确认。 java Fields getOutputFields (); List<Object> getTuple (Record record);

Date timestamp

与 AT_TIMESTAMP sharedIteratorType参数结合使用。这将使得spout(喷口)从那时开始提取记录。该kinesis时间使用的时间是与kinesis时间记录相关的服务器端时间。

FailedMessageRetryHadnler failedMessageRetryHandler

FailedMessageRetryHandler 接口的实现。 默认情况下,该模块提供支持指数退避重试的实现失败消息的机制。该实现有两个构造函数。默认值没有args 构造函数将配置在100毫秒的第一次重试随后在Math.pow(2,i-1)中退出,其中i是范围2中的重试次数到LONG.MAX_LONG。2表示指数函数的基数(秒)。另外一个构造函数以毫秒为单位进行重试间隔,作为第一个参数进行首次重试,以秒为单位的指数函数为第二个参数,重试作为第三个参数。这种接口的方法及其spout(喷口)的工作方式如下描述 java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId); 将在每个发生故障的元组上调用失败的方法。如果计划重新尝试失败的消息,则返回true,否则返回false。

getNextFailedMessageToRetry 方法将被称为第一件事,每次一个spout(喷口)想要发生一个元组。它应该返回一个应该重试的消息,如果有,否则为空。请注意,在该时间段内没有任何重试信息的情况下,它可以返回null。但是,当该消息被调用失败方法时,它最终将返回它返回true的每个消息

如果spout(喷口)成功的设法从kinesis时获取记录并发送,failedMessageEmitted将被调用。否则,当getNextFailedMessageToRetry方法被再次调用的时候,该实现应该会返回相同的消息

一但失败的消息被重新发送并被spout(喷口)成功地确认,则会被呼叫。如果失败,spout(喷口)失败就会被再次呼叫。

ZkInfo zkInfo

封装的对象信息用来zookeeper的交互。这个构造函数使用zkUrl作为第一个参数,它是一个逗号分隔的字符串zk host和端口,zkNode作为第二个参数将用作存储提交序列号的根节点,会话超时以毫秒为单位,连接超时作为第四毫秒,提交间隔为提交序列号到zookeeper的毫秒数的五分之一,重试尝试作为zk客户端的第六个连接重试尝试,重试间隔以毫秒为单位的等待时间,然后再重试连接。

KinesisConnectionInfo kinesisConnectionInfo

使用kinesis客户端捕获连接到kinesis的参数的对象。它有一个构造函数来实现 AWSCredentialsProvider作为第一个参数。该模块提供了一个称为 CredentialsProviderChain 的实现,它允许spout(喷口)使用以下之一进行kinesis检测,这5个机制顺序按照以下顺序 - EnvironmentVariableCredentialsProvider, SystemPropertiesCredentialsProvider, ClasspathPropertiesFileCredentialsProvider, InstanceProfileCredentialsProvider, ProfileCredentialsProvider。它需要一个ClientConfiguration 对象作为配置。它需要一个 ClientConfiguration 对象作为配置运动的第二个参数客户端,Regions 作为第三个参数,设置客户端上要连接的区域,recordsLimit作为第四个参数,表示最大数量的记录Kinesis客户端将每个GetReocrds请求检索。这个限制应该根据记录的大小,运动时间来仔细选择吞吐量限制和风暴中每个元组延迟的拓扑。另外如果一个任务将从多个分片读取,那么这将影响选择权限认证。

Long maxUncommittedRecords

这表示每个任务允许的最大未提交序列号数。一旦达到这个数字,spout就不会从kinesis中获取任何新的记录。未提交的序列号被定义为尚未提交给zookeeper的任务的所有消息的总和。这与拓扑级别最大待处理消息不同。例如,如果此值设置为10,并且spout将序列号从1发送到10。序号1正在等待,2到10次被告知。在这种情况下,未提交的序列的数量为10,因为1到10范围内的序列号可以被提交到zk。但是,storm仍然可以在端口上调用下一个元组,因为只有一个等待消息。

Maven dependencies

Aws sdk version that this was tested with is 1.10.77

 <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>${aws-java-sdk.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${project.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
        </dependency>
 </dependencies> 

将来的工作

处理kinesis中的碎片的合并或分裂,Trident喷口实施和指标



回到顶部