stormstormmedia是什么意思思

颠覆大数据分析之Storm简介-技术方案-@大数据资讯
你好,游客
颠覆大数据分析之Storm简介
来源:颠覆大数据分析&
之前我们已经极为简单的介绍了Storm。现在我们要对它做一个更详细的了解。Storm是一个复杂 事件处理引擎(CEP),最初由Twitter实现。在实时计算与分析领域,Storm正在得到日益广泛的应用。Storm可以辅助基本的流式处理,例如 聚合数据流,以及基于数据流的机器学习(译者注:原文是ML,根据上下文判断,此处应是指机器学习,下文相同不再缀述)。通常情况,数据分析(译者注:原 文为prestorage analytics,意义应是保存分析结果之前的分析计算)在Storm之上进行,然后把结果保存在NOSQL或关系数据库管理系统(RDBMSs)。以 气象频道为例,使用Storm以并行方式处理集(译者注:原文用到munging,意义应是洗数据)并为离线计算持久化它们。
下面是一些公司使用Storm的有趣方式:
Storm用于持续计算,p并把处理过的数据传输给一个可视化引擎。Data Salt,一个先行者,使用Storm处理大容量数据源。Twitter采用相同的方式,将Storm作为它的发布者分析产品的基础。
Groupon也采用Storm实现了低延迟、高吞吐量的数据处理。
Yahoo采用Storm作为CEP每天处理数以亿计的事件。他们还把Storm整合进了0和Hadoop YARN,以便Storm能够弹性的使用集群资源,以及更易于使用HBase和Hadoop生态系统中的其它组件。
Infochimps采用Storm-Kafka加强他们的数据交付云服务。
Storm还被Cerner公司用于医疗领域,用来处理增量更新,并低延迟的把它们保存在HBase,有效的运用Storm作为流式处理引擎和Hadoop作为批处理引擎。
Impetus将Storm与Kafka结合,运行机器学习算法,探索制造业的故障模式。他们的客户是一家大型的电子一站式服务商。他们运行分类算法,依据日志实时探测故障,识别故障根源。这是一个更一般的用例:日志实时分析。
Impetus还利用Storm在一个分布式系统中构建实时索引。这个系统非常强大,因为它搜索过程几乎是瞬时的。
Storm的一个基本概念是数据流,它可以被定义为无级的无界序列。Storm只提供多种去中心化且容错的数据流转换方式。流的 模式可以指定它的数据类型为以下几种之一:整型、布尔型、字符串、短整型、长整型、字节、字节数组等等。类OutputFieldsDeclarer用来 指定流的模式。还可以使用用户自定义类型,这种情况下,用户可能需要提供自定义序列化程序。一旦声明了一个数据流,它就有一个ID,并有一个默认类型的默 认值。
在Storm内部,数据流的处理由Storm拓扑完成。拓扑包含一个spout,数据源;bolt,负责处理来自spout和其它bolt的数据。目前已经有各种spout,包括从Kafka读取数据的spout(LinkedIn贡献的分布式发布-订阅系统),Twitter API的spout,Kestrel队列的spout,甚至还有从像Oracle这样的关系数据库读取数据的spout。spout可以是可靠的,一旦数据处理失败,它会重新发送数据流。不可靠的spout不跟踪流的状态,不会在失败时重新发送数据。Spout的一个重要方法是nextTuple&&它返回下一条待处理的元组。还有两个分别是ack和fail,分别在流被处理成功或处理不成功时调用。Storm的每个spout必须实现IRichSpout接口。Spout可能会分发多个数据流作为输出。
拓扑中的另一个重要的实体是bolt。bolt执行数据流转换,包括比如计算、过滤、聚合、连接。一个拓扑可以有多个bolt,用来完成复杂的转换和聚合。在声明一个bolt的输入流时,必须订阅其它组件(要么是spout要么是其它bolt)的特定数据流。通过InputDeclarer类和基于数据流组的适当方法完成订阅,这个方法针对数据流组做了简短说明。
execute方法是bolt的一个重要方法,通过调用它处理数据。它从参数接收一个新的数据流,通过OutputCollector分发新的元组。这个方法是线程安全的,这意味着bolt可以是多线程的。bolt必须实现IBasicBolt接口,这个接口提供了ack方法的声明,用来发送确认通知。
一个Storm集群由主节点和从节点构成。主节点通常运行着Nimbus守护进程。Storm已经实现了在Hadoop YARN之上运行&&它可以请求YARN的资源管理器额外启动一个应用主节点的守护进程。Nimbus守护进程负责在集群中传输代码,分派任务,监控集群 健康状态。在YARN之上实现的Storm可以与YARN的资源管理器配合完成监控及分派任务的工作。
每个从节点运行一个叫做supervisor的守护进程。这是一个工人进程,负责执行拓扑的一部分工作。一个典型的拓扑由运行在多个集群节点中的进程组成。supervisor接受主节点分派的任务后启动工人进程处理。
主从节点之间的协调通讯由ZooKeeper集群完成。(ZooKeeper是一个apache的分布式协作项目,被广泛应用于诸如 Storm,Hadoop YARN,以及Kafka等多个项目中。)集群状态由ZooKeeper集群维护,确保集群可恢复性,故障发生时可选举出新的主节点,并继续执行拓扑。
拓扑本身是由spouts、bolts,以及它们连接在一起的方式构成的图结构。它与Map-Reduce任务的主要区别在于,MR任务是短命的,而Storm拓扑一直运行。Storm提供了杀死与重启拓扑的方法。
简单的实时计算例子
一个Kafka&spout就是下面展示的样子:
Kafka Spout的open()方法:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
& & _collector =
& &_rand = new Random();
Kafka Spout的nextTuple()方法:
public void nextTuple() {
& & KafkaConsumer consumer = new KafkaConsumer(kafkaServerURL, kafkaTopic);
& & ArrayList&String& input_data = consumer.getKafkaStreamData();
& & while(true) {
& & & &for(String inputTuple : input_data){
& & & & & &_collector.emit(new Values(inputTuple));
KafkaConsumer类来自开源项目storm-kafka:https:// /nathanmarz/ storm-contrib/tree/master/storm-kafka。
public void prepare(Map stormConf, TopologyContext context){
& & & & String logFileName = logFileL
& & & & file = new FileWriter(logFileName);
& & & & outputFile = new PrintWriter(file);
& & & & outputFile.println("In the prepare() method of bolt");
& & } catch (IOException e){
& & & & System.out.println("an exception has occured");
& & & & e.printStackTrace();
public void execute(Tuple input, BasicOutputCollector collector){
& & String inputMsg=input.getString(0);
& & inputMsg=inputMsg = "I am a bolt";
& & outputFile.println("接收的消息:" + inputMsg);
& & outputFile.flush();
& & collector.emit(tuple(inputMsg));
前面创建的spout与这个bolt连接,这个bolt向数据流的字符串域添加一条消息:我是一个bolt。前文显示的就是这个bolt的代码。接下来的代码是构建拓扑的最后一步。它显示了spout和bolts连接在一起构成拓扑,并运行在集群中。
public static void main(String[] args){
& & int numberOfWorkers = 2;
& & int numberOfExecutorsSpout = 1;
& & int numberOfExecutorsBolt = 1;
& & String numbersHost = "192.168.0.0";
& & TopologyBuilder builder = new TopologyBuilder();
& & Config conf = new Config();
& & builder.setSpout("spout", new TestSpout(false), numberOfExecutorsSpout);
& & builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");
& & conf.setNumWorkers(numberOfWorkers);
& & conf.put(Config.NIMBUS_HOST,nimbusHost);
& & conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);
& & & & StormSubmitter.submitTopology("testing_topology", conf, builder.createTopology());
& & } catch (AlreadyAliveException e){
& & & & System.out.println("Topology with the Same name is already running on the cluster.");
& & & & e.printStackTrace();
& & } catch (InvalidTopologyException e) {
& & & & System.out.println("Topology seems to be invalid.");
& & & & e.printStackTrace();
spout和bolt都可能并行执行多个任务。必须有一种方法指定哪个数据流路由到哪个spout/bolt。数据流组用来指定一个拓扑内必须遵守的路由过程。下面是Storm内建数据流组:
随机数据流组:随机分发数据流,不过它确保所有任务都可得到相同数量的数据流。
域数据流组:基于元组中域的数据流组。比如,有一个machine_id域,拥有相同machine_id域的元组由相同的任务处理。
全部数据流组:它向所有任务分发元组&&它可能导致处理冲突。
直接数据流组:一种特殊的数据流组,实现动态路由。元组生产者决定哪个消费者应该接收这个元组。可能是基于运行时的任务ID。bolt可以通过TopologyContext类得到消费者的任务ID,或OutputCollector的emit方法也可使用直接直接数据流组。
本地数据流组:如果目标bolt在相同进程中有一个以上的任务,元组将被随机分配(就像随机数据流组),但是只分配相同进程中的那些任务。
全局数据流组:所有元组到达拥有最小ID的bolt。
不分组:目前与随机数据流组一样。
Storm的消息处理担保
从spout生成的元组能够触发进一步的元组分发,基于拓扑和所应用的转换。这意味着可能是整个消息树。Storm担保每个元组 被完整的处理了&&树上的每个节点已被处理过了。这一担保不能没有程序员的支持。每当消息树中创建了一个新的节点或者一个节点被处理了,程序员都必须向 Storm指明。第一点通过锚定实现,也就是将处理完成的元组作为OutputCollector的emit方法的第一个参数。这就保证了消息被锚定到了 合适的元组。消息也可以锚定到多个元组,这样就构成了一个消息的非循环有向图(DAG),而不只是一棵树。即使在消息的循环有向图存在的情况 下,Storm也可以担保消息处理。
在每条消息被处理后,程序员可通过调用ack或fail方法,告诉Storm这条消息已被成功处理或处理失败。Storm会在失败时重新发送数据流 &&这里满足至少处理一次的语义。Storm也会在发送数据流时采用超时机制&&这是一个storm.yaml的配置参数 (config.TOPOLOGY_MESSAGE_TIMEOUT_&SECS)。
在Storm内部,有一组&ackeer&任务持续追踪来自每条元组消息的DAG。这些任务的数量可通过storm.yaml中的 TOPOLOGY_ACKERS参数设定。在处理大量消息时,可能将不得不增大这个数字。每个消息元组得到一个64-bit ID,用于ackers追踪。元组的DAG状态由一个叫做ack val的64-bit值维护,只是简单的把树中每个确认过(译者注:原谅是acked)的ID执行异或运算。当ack val成为0时,acker任务就认为这棵元组树被完全处理了。
在某些情况下,当性能至关重要,而可靠性又不是问题时,可靠性也可以关闭。在这些情况下,程序员可以指定TOPOLOGY_ACKERS为0,并在 分发新元组时,不指定输入元组的非锚定消息(译者注:原文为unanchor messages)。这样就跳过了确认消息,节省了带宽,提高了吞吐量。到目前为止我们已经讨论且只讨论了至少处理一次数据流的语义(译者注:原文为 at-least-once stream semantics)。
仅处理一次数据流的语义可以采用事务性拓扑实现。Storm通过为每条元组提供相关联的事务ID为数据流处理提供事务性语义(仅一次,不完全等同于 关系数据库的ACID语义)。对于重新发送数据流来说,相同的事务ID也会被发送并担保这个元组不会被重复处理。这方面牵涉到对于消息处理的严格顺序,就 像是在处理一个元组。由于这样做的低效率,Storm允许批量处理由一个事务ID关联的元组。不像早先的情况 ,程序不得不将消息锚定到输入元组,事务性拓扑对程序员是透明的。Storm内部将元组的处理分为两阶段&&第一阶段为处理阶段,可以并行处理多个批次, 第二阶段为提交阶段,强制严格按照批次ID提交。
事务性拓扑已经过时了&&它已被整合进了一个叫做Trident的更大的框架。Trident允许对流数据进行查询,包括聚合、连接、分组函数,还有过滤器。Trident构建于事务性拓扑之上并提供了一致的一次性语义。更多关于Trident的细节请参考wiki:/nathanmarz/storm/wiki/ Trident- tutorial。
相关新闻 & & &
& (04月29日)
& (04月25日)
& (04月29日)
& (04月26日)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款Storm框架入门1 Topology构成&& 和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。&&& Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。所有Topology任务的提交必须在Storm客户端节点上进行(需要配置~/.storm/storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分成一个个的Task,并将Task和Supervisor相关的信息提交到zookeeper集群上,Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。总体的Topology处理流程图为:&&&& 每个Topology都由Spout和Bolt组成,在Spout和Bolt传递信息的基本单位叫做Tuple,由Spout发出的连续不断的Tuple及其在相应Bolt上处理的子Tuple连起来称为一个Steam,每个Stream的命名是在其首个Tuple被Spout发出的时候,此时Storm会利用内部的Ackor机制保证每个Tuple可靠的被处理。&&& 而Tuple可以理解成键值对,其中,键就是在定义在declareStream方法中的Fields字段,而值就是在emit方法中发送的Values字段。2 Configuration&&& 在运行Topology之前,可以通过一些参数的配置来调节运行时的状态,参数的配置是通过Storm框架部署目录下的conf/storm.yaml文件来完成的。在次文件中可以配置运行时的Storm本地目录路径、运行时Worker的数目等。&&& 在代码中,也可以设置Config的一些参数,但是优先级是不同的,不同位置配置Config参数的优先级顺序为:default.yaml&storm.yaml&topology内部的configuration&内部组件的special configuration&外部组件的special configuration&&& 在storm.yaml中常用的几个选项为:配置选项名称配置选项作用topology.max.task.parallelism每个Topology运行时最大的executor数目topology.workers每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖storm.zookeeper.serverszookeeper集群的节点列表storm.local.dirStorm用于存储jar包和临时文件的本地存储目录storm.zookeeper.rootStorm在zookeeper集群中的根目录,默认是&/&ui.portStorm集群的UI地址端口号,默认是8080nimbus.host:Nimbus节点的hostsupervisor.slots.portsSupervisor节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的Topology分配supervisor.worker.timeout.secsWorker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务drpc.servers在使用drpc服务时,drpc server的服务器列表drpc.port在使用drpc服务时,drpc server的服务端口&3 Spouts&&& Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。&&& 需要根据情况实现Spout类中重要的几个方法有:&3.1 open方法&&& 当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {2 3
_collector =4 5 }&3.2 declareOutputFields方法&&& 此方法用于声明当前Spout的Tuple发送流。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域Fields。示例如下:1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3
declarer.declare(new Fields("word"));4 5
}&3.3 getComponentConfiguration方法&&& 此方法用于声明针对当前组件的特殊的Configuration配置。示例如下: 1 public Map&String, Object& getComponentConfiguration() { 2
if(!_isDistributed) { 4
Map&String, Object& ret = new HashMap&String, Object&(); 6
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 8
return10 11
} else {12 13
return null;14 15
}&&&& 这里便是设置了Topology中当前Component的线程数量上限。3.4 nextTuple方法&&& 这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。示例如下: 1 public void nextTuple() { 2
Utils.sleep(100); 4
final String[] words = new String[] {"twitter","facebook","google"}; 6
final Random rand = new Random(); 8
final String word = words[rand.nextInt(words.length)];10 11
_collector.emit(new Values(word));12 13
}&&&& 这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology。&&& 另外,除了上述几个方法之外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。&&4 Bolts&&& Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。&&& Bolt类需要实现的主要方法有:&4.1 prepare方法&&& 此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。&&& 示例如下:1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {2 3
_collector =4 5
}&4.2 declareOutputFields方法&&& 用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。&&& 示例如下:1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3
declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));4 5 }&&&& 此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "actualWindowLengthInSeconds"。4.3 getComponentConfiguration方法&&& 和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。&&& 示例如下:1 public Map&String, Object& getComponentConfiguration() {2 3
Map&String, Object& conf = new HashMap&String, Object&();4 5
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);6 7
}&&&& 此例定义了从系统组件&_system&的&_tick&流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。4.4 execute方法&&& 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。(1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。这两种情况要根据自己的场景来确定。&&& 示例如下: 1 public void execute(Tuple tuple) { 2
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 4
_collector.ack(tuple); 6
9 public void execute(Tuple tuple) {10 11
_collector.emit(new Values(tuple.getString(0) + "!!!"));12 13
}&&&& 此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法类似,都是在当前Component关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。5 Stream grouping&&& 上文中介绍了Topology的基本组件Spout和Bolt,在Topology中,数据流Tuple的处理就是不断的通过调用不同的Spout和Bolt来完成的。不同的Bolt和Spout的上下游关系是通过在入口类中定义的。示例如下: 1 builder = new TopologyBuilder(); 2
3 builder.setSpout(spoutId, new TestWordSpout(), 5); 4
5 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 6
7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); 8 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); 9&&&& 此例中的builder是TopologyBuilder对象,通过它的createTopology方法可以创建一个Topology对象,同时此builder还要定义当前Topology中用到的Spout和Bolt对象,分别通过setSpout方法和setBolt方法来完成。&&& setSpout方法和setBolt方法中的第一个参数是当前的Component组件的Stream流ID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来确定Topology的Task数目。最后的小尾巴*Grouping是指的一个Stream应如何分配数据给Bolt上面的Task。目前Storm的Stream Grouping有如下几种:&&& (1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致;&&& (2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中;&&& (3)AllGrouping:广播发送,每一个Task都会受到所有的Tuple;&&& (4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中,此时一般将当前Component的并发数目设置为1;&&& (5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行;&&& (6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,而且,此时必须有emitDirect方法来发送;& & (7) localOrShuffleGrouping:和ShuffleGrouping类似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。&&& 不同的的Grouping,需要根据不同的场景来具体设定,不一而论。6 Topology运行6.1 Topology运行方式Topology的运行可以分为本地模式和分布式模式,模式的设置可以在配置文件中设定,也可以在代码中设置。(1)本地运行的提交方式:1 LocalCluster cluster = new LocalCluster();2 3 cluster.submitTopology(topologyName, conf, topology);4 5 cluster.killTopology(topologyName);6 7 cluster.shutdown();&(2)分布式提交方式:StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());&需要注意的是,在Storm代码编写完成之后,需要打包成jar包放到Nimbus中运行,打包的时候,不需要把依赖的jar都打进去,否则如果把依赖的storm.jar包打进去的话,运行时会出现重复的配置文件错误导致Topology无法运行。因为Topology运行之前,会加载本地的storm.yaml配置文件。在Nimbus运行的命令如下:storm jar StormTopology.jar maincalss args6.2 Topology运行流程  有几点需要说明的地方:(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;(2)在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;(3)任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;(4)Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;(5)一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。最后一步会不间断的执行,除非手动结束Topology。6.3 Topology方法调用流程&&& Topology中的Stream处理时的方法调用过程如下:&有几点需要说明的地方:& &(1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。& &(2)open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程 & & & & 的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。& &(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产 & & & & 生无界的Tuple流,体现实时性。相当于线程的run方法。& &(4)在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节 & & & & & 点),在每一个任务上反序列化component。& &(5)Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。& &(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。6.4 Topology并行度&&& 在Topology的执行单元里,有几个和并行度相关的概念。(1)worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。(2)executor:executor是产生于worker进程内部的线程,会执行同一个component的一个或者多个task。(3)task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。&&& 在运行一个Topology时,可以根据具体的情况来设置不同数量的worker、task、executor,而设置的位置也可以在多个地方。(1)worker设置:(1.1)可以通过设置yaml中的topology.workers属性(1.2)在代码中通过Config的setNumWorkers方法设定(2)executor设置:&&& 通过在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;(3)task设置:&&& (3.1) 默认情况下,和executor数目一致;&&& (3.2)在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;6.5 终止Topology&&& 通过在Nimbus节点利用如下命令来终止一个Topology的运行:storm kill topologyName&&& kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。7 Topology跟踪&&& Topology提交后,可以在Nimbus节点的web界面查看,默认的地址是http://NimbusIp:8080。&8 Storm应用&&& 上面给出了如何编写Storm框架任务Topology的方法,那么在哪些场景下能够使用Storm框架呢?下面介绍Storm框架的几个典型的应用场景。(1)利用Storm框架的DRPC进行大量的函数并行调用,即实现分布式的RPC;(2)利用Storm框架的Transaction Topology,可以进行实时性的批量更新或者查询数据库操作或者应用需要同一批内的消息以及批与批之间的消息并行处理这样的场景,此时Topology中只能有一个TrasactionalSpout;(3)利用滑动窗口的逻辑结合Storm框架来计算得出某段时间内的售出量最多的产品、购买者最多的TopN地区等;(4)精确的广告推送,在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的账户信息(如果有的话)完成产品的分类统计,产品的相关性查询等逻辑计算之后,将计算结果推送给用户;(5)实时日志的处理,Storm可以和一个分布式存储结合起来,实时性的从多个数据源发送数据到处理逻辑Bolts,Bolts完成一些逻辑处理之后,交给分布式存储框架进行存储,此时,Spout可以是多个;(6)实时性的监控舆论热点,比如针对某个关键词,在用户查询的时候,产生数据源Spout,结合语义分析等,由Bolt来完成查询关键词的统计分析,汇总当前的舆论热点;(7)数据流的实时聚合操作。9 参考网址/138/twitter-storm%E5%85%A5%E9%97%A8//nathanmarz/storm/wikihttp://nathanmarz.github.io/storm/doc/index-all.html/nathanmarz&有理解不到位的地方,欢迎批评指正,一起交流~
无相关信息

我要回帖

更多关于 storm是什么意思 的文章

 

随机推荐