Storm可以确保由Spout发送的每条消息都会被所有的Bolts完全处理,但是这需要用户来决定是否需要可靠性机制。如果是简单的统计分析,可靠性要求不是很高,则可以选择使用不可靠的Bolts。
Storm编程中,有各种Bolt,命名格式XXRichBolt或XXBasicBolt。其中,带有Rich的Bolt,是没有可靠性机制的,需要自己手动调用方法collect.ack()或者collect.fail();而带有Basic的Bolt是封装可靠性机制,在emit一个Tuple时,会在内部自动加入该tuple,进行锚定,所以用户只需要编写处理逻辑,系统会默认调用ack或fail方法。那么,storm是如何来进行ack呢?
注:若要storm系统对一个Tuple进行可靠性跟踪,必须指定一个唯一ID告知系统。因为在Spout中的ack()和fail()方法都有一个参数是messageID,系统是根据该ID来确定成功或失败的。如collector.emit(tuple, tupleID);
先明确一些概念:
1,什么叫做一个消息被所有的Bolts完全处理?
一个Tuple会经过不同的bolts,形成一个或多个分支,这些分支叫做Tuple树,当Tuple树被完全处理时,这个tuple才被完全处理;当中间某个节点处理失败或超时(超时时间,可以再定义Topology的Config时,进行修改属性conf.setMessageTimeoutSecs),tuple都是处理失败的。
2,锚定(Anchor技术):即collect.emit(inputTuple,new Values(……))。BasicBolt会在内部自动锚定到输入tuple;RichBolt默认是没有锚定的。
------------------一个tuple的ACK流程如下---------------------
1,Spout初始化,产生一个唯一标识taskID;
2,Spout创建一个Tuple,有个tupleID(64位随机数);
3,Spout发射Tuple(一定要指定一个唯一ID,来开启跟踪),并同时发送消息到Acker,要求其对Tuple进行跟踪;
4,Tuple经过一系列bolts,产生一个anchor tuple列表;
5,Bolt调用OutputCollector.ack(),进行一系列异或操作,若结果是0,则表示ok,否则fail;
6,根据taskID,回调原始Spout的ack()或fail()方法;
-----------------------------------------------------------------------
注意:方法ack、fail和nextTuple是在同一个线程中完成的,因此应当尽量避免在spout去调用sleep()方法。
如果必须要控制消息的发送速率,可以再开启一个异步线程来读取数据到队列,再由spout去取队列中的数据。
如果不关心数据是否丢失,或者想测试是否有某个bolt拖慢了spout的速度,可以进行如下配置:
1,在定义Topology时,conf.setNumAckers(0);
2,在spout中,不指定唯一的messageId,即不开启跟踪;
3,在bolt中,使用richBolt,即不进行锚定,发射新的tuple;
参考网址:找不到了。。。
如有错误,请指出
相关推荐
storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。
tcp协议ACK机制在opnet中实现代码
AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。Producer客户端使用来发送消息的,Consumer客户端...
总结kafka数据可靠性与一致性实现方式,对需要深入学习kakfa的同学有极大帮助,包含架构解析、leader选举、存储格式、HW、ACK等有详细的描述
有噪信道中基于IEEE802.11e群ACK机制的模型建立及其性能分析,马树皓,张春业,摘 要:为了减少额外开销带来的低效性,一种群确认机制(Block ACK,BTA)被建议用在802.11e标准和802.11n提案中。本文将基于优先级...
使用Storm编程,可以通过调用ack和fail方法来确保一条消息的处理成功或失败。不过当元组被重发时,会发生什么呢?你又该如何砍不会重复计算? Storm0.7.0实现了一个新特性——事务性拓扑,这一特性使消息在语义上...
为了确定信息传输过程,减少系统空闲时间,提高系统可控性,提出了一种具有ACK机制的双时钟三维概率随机多信道接入协议。 它的基本原理是:信道是空闲时的连续时钟方式,信道是忙时的时隙方式。 使用三维概率来控制...
试编写一段递归子程序计算ackermann函数ACK(m,n)。对于m≥0和n≥0的ACK(m,n)函数定义如下: ACK(0,n)=n+1 ACK(m,0)=ACK(m-1,1) ACK(m,n)=ACK(m-1,ACK(m,n-1)) 程序要求: ⑴ m、n在主程序从键盘输入,输入错误显示...
82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK
ACK系列产品升级指南,告诉你如何升级ACK系列产品的版本。
ack-etcd备份资源
对RabbitMQ消息确认(ACK)原理的理解及实践
阿里云, 微服务, K8S,ACK
ZigBee实验\6.5.应答ACK帧实验
监控与评估: 方案通常包括监控和评估的机制,以确保实施的有效性。通过定期的评估,可以及时调整方案,以适应变化的环境或新的挑战。 总体而言,方案的作用在于提供一种有序、有计划的方法,以解决问题、实现目标...
nRF24L01,ACK,自动重发,自动应答,测试程序,每秒更新一次成功接收或者发送的数据包个数,注释清晰,代码简洁,具有预编译选项,单个程序包含发射和接收的测试程序,方便调试。
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。...里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。