10-06 6,699 views
Storm Spout
通过实现Storm中的ISpout接口,重写其中的nextTuple、ack和fail方法,可以实现tuple流的发送、成功确认、失败重发。ISpout接口代码如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public interface ISpout extends Serializable { /** * work进程初始化spout时运行该方法 */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * 关闭时调用,但不保证一定调用(kill -9) */ void close(); /** * activate topology后调用 */ void activate(); /** * deactivate topology后调用 */ void deactivate(); /** * 在nextTuple中通过调用collector的emit方法发送tuple * nextTuple、ack和fail这三个方法在一个线程中被循环调用 */ void nextTuple(); /** * msgId标识的消息处理成功后调用该方法 */ void ack(Object msgId); /** * msgId标识的消息处理失败后调用该方法 * 可以进行消息的重新发送 */ void fail(Object msgId); } |
storm-kafka
storm-kafka模块目前集成中在storm(https://github.com/apache/storm)中,为storm提供读取kafka消息队列的支持,其中,KafkaSpout类继承自BaseRichSpout类,用于从kafka消息队列中读取消息发送tuple,对tuple进行成功确认和失败重发,保证kafka消息至少被处理一次。
KafkaSpout
KafkaSpout类核心逻辑的序列图如下所示。
从中可以看出,发送tuple,成功确认和失败重发都是由PartitionManager这个类完成。
在storm中,可以设置spout的并行度,也就是说可以有多个KafkaSpout实例从一个kafka队列中读取消息,而kafka作为大数据应用场景下的消息队列,其每个队列可以配置为多个partition,每个partition可以由一个消费者读取消息,这样可以保证消息消费的大吞吐量。当在storm集群中使用KafkaSpout类读取消息时,需要控制每个KafkaSpout实例读取kafka队列的哪几个partition,如果实例数等于partition数,那么每个KafkaSpout实例从一个partiton中读取消息。
在KafkaSpout类中有一个成员变量PartitionCoordinator _coordinator,_coordinator中有一个成员变量Map<Partition, PartitionManager> _managers。通过在zookeeper上保存信息,_coordinator可以在storm集群中协调各个KafkaSpout实例读取哪几个partition,并通过_managers管理当前所在KafkaSpout实例负责的partition的读取,而PartitionManager类则具体负责某个partition的读取。
当KafkaSpout实例执行nextTuple方法时,会从_coordinator中获取到PartitonManager实例,并调用该实例的next方法,而当KafkaSpout实例执行ack方法时,实际调用了PartitonManager的ack方法,而当KafkaSpout实例执行fail方法时,实际调用了PartitonManager的fail方法。PartitionManager的next、ack和fail方法具体流程如下所示。
版权属于: 我爱我家
原文地址: http://magicwt.com/2015/10/06/storm-kafka-kafkaspout%e5%8e%9f%e7%90%86%e5%88%86%e6%9e%90/
转载时必须以链接形式注明原始出处及本声明。