key为null时kafka 查看topic 分区会将消息发送给哪个分区

您当前位置: >
> kafka入门:简介、使用场景、设计原理、主要配置及集群搭建
kafka入门:简介、使用场景、设计原理、主要配置及集群搭建
来源:程序员人生&& 发布时间: 14:49:03 阅读次数:166次
问题导读:
1.zookeeper在kafka的作用是甚么?
2.kafka中几近不允许对消息进行“随机读写”的缘由是甚么?
3.kafka集群consumer和producer状态信息是如何保存的?
4.partitions设计的目的的根本缘由是甚么?
& & 1、简介
& & Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,另外它其实不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,另外kafka集群有多个kafka实例组成,每一个实例(server)成为broker。不管是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存1些meta信息。
& &2、Topics/logs
& & 1个Topic可以认为是1类消息,每一个topic将被分成多个partition(区),每一个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为1个long型数字,它是唯1标记1条消息。它唯1的标记1条消息。kafka并没有提供其他额外的索引机制来存储offset,由于在kafka中几近不允许对消息进行“随机读写”。
& & kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即便消息被消费,消息依然不会被立即删除.日志文件将会根据broker中的配置要求,保存1定的时间以后删除;比如log文件保存2天,那末两天后,文件会被清除,不管其中的消息是不是被消费.kafka通过这类简单的手段,来释放磁盘空间,和减少消息消费以后对文件内容改动的磁盘IO开支.
& & 对consumer而言,它需要保存消费消息的offset,对offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会&线性&的向先驱动,即消息将顺次顺序被消费.事实上consumer可使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)
& & kafka集群几近不需要保护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随便离开,而不会对集群造成额外的影响.
& & partitions的设计目的有多个.最根本缘由是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸到达单机磁盘的上限,每一个partiton都会被当前server(kafka实例)保存;可以将1个topic切分多任意多个partitions,来消息保存/消费的效力.另外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.(具体原理参见下文).
& & 3、Distribution
& & 1个Topic的多个partitions,被散布在kafka集群中的多个server上;每一个server(kafka实例)负责partitions中消息的读写操作;另外kafka还可以配置partitions需要备份的个数(replicas),每一个partition将会被备份到多台机器上,以提高可用性.
& & 基于replicated方案,那末就意味着需要对多个备份进行调度;每一个partition都有1个server为&leader&;leader负责所有的读写操作,如果leader失效,那末将会有其他follower来接收(成为新的leader);follower只是单调的和leader跟进,同步消息便可..因而可知作为leader的server承载了全部的要求压力,因此从集群的整体斟酌,有多少个partitions就意味着有多少个&leader&,kafka会将&leader&均衡的分散在每一个实例上,来确保整体的性能稳定.
& &&Producers
& & Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪一个比如基于&round-robin&方式或通过其他的1些算法等.
& &&Consumers
& & 本质上kafka只支持Topic.每一个consumer属于1个反过来讲,每一个group中可以有多个consumer.发送到Topic的消息,只会被定阅此Topic的每一个group中的1个consumer消费.
& & 如果所有的consumer都具有相同的group,这类情况和queue模式很像;消息将会在consumers之间负载均衡.
& & 如果所有的consumer都具有不同的group,那这就是&发布-定阅&;消息将会广播给所有的消费者.
& & 在kafka中,1个partition中的消息只会被group中的1个consumer消费;每一个group中consumer消息消费相互独立;我们可以认为1个group是1个&定阅&者,1个Topic中的每一个partions,只会被1个&定阅者&中的1个consumer消费,不过1个consumer可以消费多个partitions中的消息.kafka只能保证1个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来讲,消息仍不是有序的.
& & kafka的设计原理决定,对1个topic,同1个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将没法得到消息.
& &&Guarantees
& & 1) 发送到partitions中的消息将会依照它接收的顺序追加到日志中
& & 2) 对消费者而言,它们消费消息的顺序和日志中消息顺序1致.
& & 3) 如果Topic的&replicationfactor&为N,那末允许N⑴个kafka实例失效.
2、使用处景
& & 1、Messaging& &
& & 对1些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可使kafka具有良好的扩大性和性能优势.不过到目前为止,我们应当很清楚认识到,kafka并没有提供JMS中的&事务性&&消息传输担保(消息确认机制)&&消息分组&等企业级特性;kafka只能使用作为&常规&的消息系统,在1定程度上,还没有确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
& & 2、Websit activity tracking
& & kafka可以作为&网站活性跟踪&的最好工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或离线统计分析等
& & 3、Log Aggregation
& & kafka的特性决定它非常合适作为&日志搜集中心&;application可以将操作日志&批量&&异步&的发送到kafka集群中,而不是保存在本地或DB中;kafka可以批量提交消息/紧缩消息等,这对producer端而言,几近感觉不到性能的开支.此时consumer端可使hadoop等其他系统化的存储和分析系统.
3、设计原理
& & kafka的设计初衷是希望作为1个统1的信息搜集平台,能够实时的搜集反馈信息,并需要能够支持较大的数据量,且具有良好的容错能力.
& & 1、持久性
& & kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性.且不管任何OS下,对文件系统本身的优化几近没有可能.文件缓存/直接内存映照等是经常使用的手段.由于kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)到达1定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.
& & 需要斟酌的影响性能点很多,除磁盘IO以外,我们还需要斟酌网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技能;对producer端,可以将消息buffer起来,当消息的条数到达1定阀值时,批量发送给对consumer端也是1样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对kafka broker端,似乎有个sendfile系统调用可以潜伏的提升网络IO的性能:将文件的数据映照到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换.
其实对producer/consumer/broker3者而言,CPU的开支应当都不大,因此启用消息紧缩机制是1个良好的策略;紧缩需要消耗少许的CPU资源,不过对kafka而言,网络IO更应当需要斟酌.可以将任何在网络上传输的消息都经过紧缩.kafka支持gzip/snappy等多种紧缩方式.
& & 3、生产者
& & 负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何&路由层&.事实上,消息被路由到哪一个partition上,有producer客户端决定.比如可以采取&random&&key-hash&&轮询&等,如果1个topic中有多个partitions,那末在producer端实现&消息均衡分发&是必要的.
& & 其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已注册了watch用来监听partition leader的变更事件.
& & 异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效力。不过这也有1定的隐患,比如说当producer失效时,那些还没有发送的消息将会丢失。
& & 4、消费者
& & consumer端向broker发送&fetch&要求,并告知其获得消息的尔后consumer将会取得1定条数的消息;consumer端也能够重置offset来重新消费消息.
& & 在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采取了pull方式,即consumer在和broker建立连接以后,主动去pull(或说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);另外,消费者可以良好的控制消息消费的数量,batch fetch.
& & 其他JMS实现,消息消费的位置是有prodiver保存,以便避免重复发送消息或将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有1个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收以后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer客户端也很轻量级.
& & 5、消息传送机制
& & 对JMS实现,消息传输担保非常直接:有且只有1次(exactly once).在kafka中稍有不同:
& & 1) at most once: 最多1次,这个和JMS中&非持久化&消息类似.发送1次,不管成败,将不会重发.
& & 2) at least once: 消息最少发送1次,如果消息未能接受成功,可能会重发,直到接收获功.
& & 3) exactly once: 消息只会发送1次.
& & at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset以后,但是在消息处理进程中出现了异常,致使部份消息未能继续处理.那末尔后&未处理&的消息将不能被fetch到,这就是&at most once&.
& & at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功以后,但是在保存offset阶段zookeeper异常致使保存操作未能履行成功,这就致使接下来再次fetch时可能取得上次已处理过的消息,这就是&at least once&,缘由offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.
& & exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这类策略在kafka中是没有必要的.
& & 通常情况下&at-least-once&是我们搜选.(相比at most once而言,重复接收数据总比丢失数据要好).
& & 6、复制备份
& & kafka将每一个partition数据复制到多个server上,任何1个partition有1个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write要求,follower需要和leader保持同步.Follower和consumer1样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower&落后&太多或失效,leader将会把它从replicas同步列表中删除.当所有的follower都将1条消息保存成功,此消息才被认为是&committed&,那末此时consumer才能消费它.即便只有1个replicas实例存活,依然可以保证消息的正常发送和接收,只要zookeeper集群存活便可.(不同于其他散布式存储,比如hbase需要&多数派&存活才行)
& & 当leader失效时,需在followers当选取出新的leader,可能此时follower落后于leader,因此需要选择1个&up-to-date&的follower.选择follower时需要统筹1个问题,就是新leaderserver上所已承载的partition leader的个数,如果1个server上有过量的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要斟酌到&负载均衡&.
& & 7.日志
& & 如果1个topic的名称为&my_topic&,它有2个partitions,那末日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了1序列&log entries&(日志条目),每一个log entry格式为&4个字节的数字N表示消息的长度& + &N个字节的消息内容&;每一个日志都有1个offset来唯1的标记1条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每一个partition在物理存储层面,有多个log file组成(称为segment).segmentfile的命名为&最小offset&.kafka.例如&.kafka&;其中&最小offset&表示此segment中起始消息的offset.
& & 其中每一个partiton中所持有的segments列表信息会存储在zookeeper中.
& & 当segment文件尺寸到达1定阀值时(可以通过配置文件设定,默许1G),将会创建1个新的文件;当buffer中消息的条数到达阀值时将会触发日志信息flush到日志文件中,同时如果&距离最近1次flush的时间差&到达阀值时,也会触发flush到日志文件.如果broker失效,极有可能会丢失那些还没有flush到文件的消息.由于server意外实现,依然会致使log文件格式的破坏(文件尾部),那末就要求当server启东是需要检测最后1个segment的文件结构是不是合法并进行必要的修复.
& & 获得消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获得消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出便可.
& & 日志文件的删除策略非常简单:启动1个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了不删除文件时依然有read操作(consumer消费),采取copy-on-write方式.
& & 8、分配
& & kafka使用zookeeper来存储1些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
& & 1) Broker node registry: 当1个kafkabroker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
& & 格式: /broker/ids/[0...N]& &--&host:其中[0..N]表示broker id,每一个broker的配置文件中都需要指定1个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.
& & 2) Broker Topic Registry: 当1个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,依然是1个临时znode.
& & 格式: /broker/topics/[topic]/[0...N]&&其中[0..N]表示partition索引号.
& & 3) Consumer and Consumer group: 每一个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了&负载均衡&.
& & 1个group中的多个consumer可以交错的消费1个topic的所有简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能斟酌,让partition相对均衡的分散到每一个consumer上.
& & 4) Consumer id Registry: 每一个consumer都有1个唯1的ID(host:uuid,可以通过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
& & 格式:/consumers/[group_id]/ids/[consumer_id]
& & 依然是1个临时的znode,此节点的值为{&topic_name&:#streams...},即表示此consumer目前所消费的topic + partitions列表.
& & 5) Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.
& & 格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]--&offset_value
& & 此znode为持久节点,可以看出offset跟group_id有关,以表明当group中1个消费者失效,其他consumer可以继续消费.
& & 6) Partition Owner registry: 用来标记partition被哪一个consumer消费.临时znode
& & 格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]--&consumer_node_id当consumer启动时,所触发的操作:
& & A) 首先进行&Consumer id Registry&;
& & B) 然后在&Consumer id Registry&节点下注册1个watch用来监听当前group中其他consumer的&leave&和&join&;只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如1个consumer失效,那末其他consumer接收partitions).
& & C) 在&Broker id registry&节点下,注册1个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
& & 1) Producer端使用zookeeper用来&发现&broker列表,和和Topic下每一个partition leader建立socket连接并发送消息.
& & 2) Broker端使用zookeeper用来注册broker信息,已监测partitionleader存活性.
& & 3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获得消息.
4、主要配置
& & 1、Broker配置
& & 2.Consumer主要配置
3.Producer主要配置
以上是关于kafka1些基础说明,在其中我们知道如果要kafka正常运行,必须配置zookeeper,否则不管是kafka集群还是客户真个生存者和消费者都没法正常的工作的,以下是对zookeeper进行1些简单的介绍:
5、zookeeper集群
& & zookeeper是1个为散布式利用提供1致性服务的软件,它是开源的Hadoop项目的1个子项目,并根据google发表的1篇论文来实现的。zookeeper为散布式系统提供了高笑且易于使用的协同服务,它可以为散布式利用提供相当多的服务,诸如统1命名服务,配置管理,状态同步和组服务等。zookeeper接口简单,我们没必要过量地纠结在散布式系统编程难于处理的同步和1致性问题上,你可使用zookeeper提供的现成(off-the-shelf)服务来实现来实现散布式系统额配置管理,组管理,Leader选举等功能。
& & zookeeper集群的安装,准备3台server1:192.168.0.1,server2:192.168.0.2,
& & server3:192.168.0.3.
& & 1)下载zookeeper
& & 到http://zookeeper.apache.org/releases.html去下载最新版本Zookeeper⑶.4.5的安装包zookeeper⑶.4.5.tar.gz.将文件保存server1的~目录下
& & 2)安装zookeeper
& & 先在server分别履行a-c步骤
& & a)解压&&
& & tar -zxvf zookeeper⑶.4.5.tar.gz
& & 解压完成后在目录~下会发现多出1个目录zookeeper⑶.4.5,重新命令为zookeeper
& & b)配置
& & 将conf/zoo_sample.cfg拷贝1份命名为zoo.cfg,也放在conf目录下。然后依照以下值修改其中的配置:
& & # The number of milliseconds of each tick
& & tickTime=2000
& & # The number of ticks that the initial
& & # synchronization phase can take
& & initLimit=10
& & # The number of ticks that can pass between
& & # sending a request and getting an acknowledgement
& & syncLimit=5
& & # the directory where the snapshot is stored.
& & # do not use /tmp for storage, /tmp here is just
& & # example sakes.
& & dataDir=/home/wwb/zookeeper /data
& & dataLogDir=/home/wwb/zookeeper/logs
& & # the port at which the clients will connect
& & clientPort=2181
& & # Be sure to read the maintenance section of the
& & # administrator guide before turning on autopurge.
& & #http://zookeeper.apache.org/doc/ ... html#sc_maintenance
& & # The number of snapshots to retain in dataDir
& & #autopurge.snapRetainCount=3
& & # Purge task interval in hours
& & # Set to &0& to disable auto purge feature
& & #autopurge.purgeInterval=1
& & server.1=192.168.0.1:
& & server.2=192.168.0.2:
& & server.3=192.168.0.3:
& & tickTime:这个时间是作为 Zookeeper 之间或客户端与之间保持心跳的时间间隔,也就是每一个 tickTime 时间就会发送1个心跳。
& & dataDir:顾名思义就是 Zookeeper 保存数据的目录,默许情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
& & clientPort:这个端口就是客户端连接 Zookeeper 的端口,Zookeeper 会监听这个端口,接受客户真个访问要求。
& & initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 的客户端,而是 Zookeeper 集群中连接到 Leader 的 Follower )初始化连接时最长能忍耐多少个心跳时间间隔数。当已超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 还没有收到客户真个返回信息,那末表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
& & syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,要求和应对时间长度,最长不能超过量少个 tickTime 的时间长度,总的时间长度就是2*2000=4 秒
& & server.A=B:C:D:其中 A 是1个数字,表示这个是第几号;B 是这个的 ip 地址;C 表示的是这个与集群中的 Leader 交换信息的端口;D 表示的是万1集群中的 Leader 挂了,需要1个端口来重新进行选举,选出1个新的 Leader,而这个端口就是用来履行选举时相互通讯的端口。如果是伪集群的配置方式,由于 B 都是1样,所以不同的 Zookeeper 实例通讯端口号不能1样,所以要给它们分配不同的端口号
注意:dataDir,dataLogDir中的wwb是当前登录用户名,data,logs目录开始是不存在,需要使用mkdir命令创建相应的目录。并且在该目录下创建文件myid,serve1,server2,server3该文件内容分别为1,2,3。
针对server2,server3可以将server1复制到相应的目录,不过需要注意dataDir,dataLogDir目录,并且文件myid内容分别为2,3
& & 3)顺次启动server1,server2,server3的zookeeper.
& & /home/wwb/zookeeper/bin/zkServer.sh start,出现类似以下内容
& & JMX enabled by default
& & Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
& & Starting zookeeper ... STARTED
& &4) 测试zookeeper是不是正常工作,在server1上履行以下命令
& & /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出现类似以下内容
& & JLine support is enabled
19:59:40,560 - INFO& && &[main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session& &establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid =& & 0x1429cdb, negotiatedtimeout = 30000
& & WATCHER::
& & WatchedEvent state:SyncConnected type:None path:null
& & [zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#&&
& & 即代表集群构建成功了,如果出现毛病那应当是第3部时没有启动好集群,
运行,先利用
& & ps aux | grep zookeeper查看是不是有相应的进程的,没有话,说明集群启动出现问题,可以在每一个上使用
& & ./home/wwb/zookeeper/bin/zkServer.sh stop。再顺次使用./home/wwb/zookeeper/binzkServer.sh start,这时候在履行41般是没有问题,如果还是有问题,那末先stop再到bin的上级目录履行./bin/zkServer.shstart试试。
注意:zookeeper集群时,zookeeper要求半数以上的机器可用,zookeeper才能提供服务。
<div align="left" style="word-wrap:break
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
------分隔线----------------------------
------分隔线----------------------------
积分:4212q5725827 的BLOG
用户名:q5725827
访问量:3285
注册日期:
阅读量:5863
阅读量:12276
阅读量:310111
阅读量:1025844
51CTO推荐博文
APIProducer API此处只简介一个procedure的例子生产类是用来创建新消息的主题和可选的分区。如果使用Java你需要包括几个包和支持类:import kafka.javaapi.producer.Pimport kafka.producer.KeyedMimport kafka.producer.ProducerC&第一步首先定义producer如何找到集群,如何序列化消息和为消息选择适合的分区。下面吧这些定义在一个标准的JAVA& Properties类中Properties&props&=&new&Properties();
props.put("metadata.broker.list","broker1:9092,broker2:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("partitioner.class","example.producer.SimplePartitioner");
props.put("request.required.acks","1");
ProducerConfig&config&=&new&ProducerConfig(props);1.metadata.broker.list 定义了生产者如何找到一个或多个Broker去确定每个topic的Leader。这不需要你设置集群中全套的brokers但至少应包括两个,第一个经纪人不可用可以替换。不需要担心需要指出broker为主题的领袖(分区),生产者知道如何连接到代理请求元数据并连接到正确的broker。2.第二个属性“序列化类定义“。定义使用什么序列化程序传递消息。在我们的例子中,我们使用一个卡夫卡提供的简单的字符串编码器。请注意,encoder必须和下一步的keyedmessage使用相同的类型可以适当的定义"key.serializer.class"根据key改变序列化类。默认的是与"serializer.class"相同3.第三个属性partitioner.class 定义了决定topic中的分区发送规则。这个属性是可选的,但是对于你的特殊的分区实现是重要的。如果存在key将使用kafka默认的分组规则,如果key为null 则使用随机的分区发送策略。4.最后一个属性“request.required.acks”将设置kafka知否需要broker的回应。如果不设置可能将导致数据丢失。1.1此处可以设置为0 生产者不等待broker的回应。会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)1.2此处可以设置为1生产者会收到leader的回应在leader写入之后。(在当前leader服务器为复制前失败可能会导致信息丢失)1.3此处可以设置为-1生产者会收到leader的回应在全部拷贝完成之后。之后可以定义生产者Producer&String, String& producer =new Producer&String, String&(config);此处泛型的第一个type是分区的key的类型。第二个是消息的类型。与上面Properties中定义的对应。现在定义messgaeRandom&rnd&=&new&Random();
long&runtime&=&new&Date().getTime();
String&ip&=&“192.168.2.”&+rnd.nextInt(255);
String&msg&=&runtime&+&“,,”+&此处模拟一个website的访问记录。之后想broker中写入信息.KeyedMessage&String, String& data =new KeyedMessage&String, String&("page_visits",ip, msg);producer.send(data);此处的“page_visits”是要写入的Topic。此处我们将IP设置为分区的key值。注意如果你没有设置键值,即使你定义了一个分区类,kafka也将使用随机发送.Full Code:import&java.util.*;
import&kafka.javaapi.producer.P
import&kafka.producer.KeyedM
import&kafka.producer.ProducerC
public&class&TestProducer&{
&&&public&static&void&main(String[]&args)&{
&&&&&&&long&events&=&Long.parseLong(args[0]);
&&&&&&&Random&rnd&=&new&Random();
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("metadata.broker.list","broker1:9092,broker2:9092&");
&&&&&&&props.put("serializer.class","kafka.serializer.StringEncoder");
&&&&&&&props.put("partitioner.class","example.producer.SimplePartitioner");
&&&&&&&props.put("request.required.acks",&"1");
&&&&&&&ProducerConfig&config&=&new&ProducerConfig(props);
&&&&&&&Producer&String,&String&&producer&=&new&Producer&String,String&(config);
&&&&&&&for&(long&nEvents&=&0;&nEvents&&&&nEvents++)&{
&&&&&&&&&&&&&&&long&runtime&=&newDate().getTime();&
&&&&&&&&&&&&&&&String&ip&=&“192.168.2.”&+rnd.nextInt(255);
&&&&&&&&&&&&&&&String&msg&=&runtime&+“,,”&+&
&&&&&&&&&&&&&&&KeyedMessage&String,String&&data&=&new&KeyedMessage&String,&String&("page_visits",ip(key),&msg);
&&&&&&&&&&&&&&&producer.send(data);
&&&&&&&producer.close();
Partitioning&Code:&(分区函数)
import&kafka.producer.P
import&kafka.utils.VerifiableP
public&class&SimplePartitioner&implementsPartitioner&String&&{
&&&public&SimplePartitioner&(VerifiableProperties&props)&{
&&&public&int&partition(String&key,&int&a_numPartitions)&{
&&&&&&&int&partition&=&0;
&&&&&&&int&offset&=&key.lastIndexOf('.');
&&&&&&&if&(offset&&&0)&{
&&&&&&&&&&partition&=&Integer.parseInt(&key.substring(offset+1))&%a_numP
&&&&&&return&
}上面分区的作用是相同的IP将发送至相同的分区。但此时你的消费者需要知道如何去处理这样的规则消息。使用前需要建立topicbin/kafka-create-topic.sh&--topicpage_visits&--replica&3&--zookeeper&localhost:2181&--partition&5可以使用下面的工具验证你发送的消息bin/kafka-console-consumer.sh&--zookeeperlocalhost:2181&--topic&page_visits&--from-beginningHigh Level Consumer API顶层接口:class&Consumer&{
&&*&&创建java的消费者与kafka的connect
&&*&&@param&config&&至少需要提供consumer的groupId和zookeeper.connect.
public&statickafka.javaapi.consumer.ConsumerConnector&createJavaConsumerConnector(config:ConsumerConfig);
ConsumerConnector:
public&interfacekafka.javaapi.consumer.ConsumerConnector&{
&&*&&为每一个主题创建一个泛型的消息流
&&*&&@param&topicCountMap&&提供topic和Stream的一一对应
&&*&&@param&decoder&解析器&
&&*&&@return&Map&&&&topic&,List&#streams&&
&&*&&&&&&&&&&&&&&&&&&&此处的KafkaStream提供对内容的Iterable读取
&public&&K,V&&Map&String,&List&KafkaStream&K,V&&&
&&&&createMessageStreams(Map&String,Integer&&topicCountMap,&Decoder&K&&keyDecoder,&Decoder&V&valueDecoder);
&&*&&同上.
&public&Map&String,&List&KafkaStream&byte[],&byte[]&&&createMessageStreams(Map&String,&Integer&&topicCountMap);
&&&*&&&&&&&&&&建一个匹配的通配符主题的消息流的List
&&*&&@param&topicFilter一个topicfilter指定Consumer订阅的话题(
&&*&&包含了一个白名单和黑名单).
&&*&&@param&numStreams&messagestreams的数量
&&*&&@param&keyDecoder&message&key解析器
&&*&&@param&valueDecoder&a&message解析器
&&*&&@return&同上
&public&&K,V&&List&KafkaStream&K,V&&
&&&createMessageStreamsByFilter(TopicFilter&topicFilter,&int&numStreams,Decoder&K&&keyDecoder,&Decoder&V&&valueDecoder);
&………………………….(其余接口类似,是上述方法的重载方法)
&&*&&提交本连接器所连接的所有分区和主题
&public&void&commitOffsets();
&&*&&停止当前Consumer
&public&void&shutdown();
}&e.g &example1. 为什使用高级消费者(High Level Consumer)&&&&&&&& 有时消费者从卡夫卡读取消息不在乎处理消息的偏移量逻辑,只是消费消息内部的信息。高级消费者提供了消费信息的方法而屏蔽了大量的底层细节。&&&&&&&& 首先要知道的是,高级消费者从zookeeper的特殊分区存储最新偏离。这个偏移当kafka启动时准备完毕。这一般是指消费者群体(Consumer group)[此处的意思,kafka中的消息是发送到Consumer group中的任一个consumer上的,kafka保存的是整体的偏移。此处不知是否理解正确请大虾指点。]&&&&&&&& 请小心,对于kafka集群消费群体的名字是全局的,任何的“老”逻辑的消费者应该被关闭,然后运行新的代码。当一个新的进程拥有相同的消费者群的名字,卡夫卡将会增加进程的线程消费topic并且引发的“重新平衡(reblannce)”。在这个重新平衡中,卡夫卡将分配现有分区到所有可用线程,可能移动一个分区到另一个进程的消费分区。如果此时同时拥有旧的的新的代码逻辑,将会有一部分逻辑进入旧得Consumer而另一部分进入新的Consumer中的情况.2. Designing a High Level Consumer了解使用高层次消费者的第一件事是,它可以(而且应该!)是一个多线程的应用。线程围绕在你的主题分区的数量,有一些非常具体的规则:1.&如果你提供比在主题分区多的线程数量,一些线程将不会看到消息2.&如果你提供的分区比你拥有的线程多,线程将从多个分区接收数据3.&如果你每个线程上有多个分区,对于你以何种顺序收到消息是没有保证的。举个栗子,你可能从分区10上获取5条消息和分区11上的6条消息,然后你可能一直从10上获取消息,即使11上也拥有数据。4.添加更多的进程/线程将使卡夫卡重新平衡,可能改变一个分区到线程的分配。这里是一个简单的消费者栗子:package&com.test.
import&kafka.consumer.ConsumerI
import&kafka.consumer.KafkaS
public&class&ConsumerTest&implements&Runnable&{
&&&&privateKafkaStream&m_
&&&&private&intm_threadN
&&&&publicConsumerTest(KafkaStream&a_stream,&int&a_threadNumber)&{
&&&&&&&m_threadNumber&=&a_threadN
&&&&&&&&m_stream&=a_
&&&&public&void&run()&{
&&&&&&&ConsumerIterator&byte[],&byte[]&&it&=&m_stream.iterator();
&&&&&&&&while(it.hasNext())
System.out.println("Thread&"&+&m_threadNumber+&":&"&+&new&String(it.next().message()));
&&&&&&&System.out.println("Shutting&down&Thread:&"&+&m_threadNumber);
}在这里有趣的是,(it.hasnext())。这个代码将从卡夫卡读取直到你停下来。3. Config不像simpleconsumer高层消费者为你很多的提供需要bookkeeping(?)和错误处理。但是你要告诉卡夫卡这些信息。下面的方法定义了创建高级消费者基础配置:private&static&ConsumerConfigcreateConsumerConfig(String&a_zookeeper,&String&a_groupId)&{
&&&&&&&&Propertiesprops&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&&return&newConsumerConfig(props);
&&&&}zookeeper.connect& 指定zookeeper集群中的一个实例,kafka利用zookeeper储存topic的分区偏移值。Groupid 消费者所属的Consumer Group(消费者群体)zookeeper.session.timeout.ms&zookeeper的超时处理<mit.interval.ms&& 属性自动提交的间隔。这将替代消息被消费后提交。如果发生错误,你将从新获得未更新的消息。4.使用线程池处理消息public&void&run(int&a_numThreads)&{
&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&//&now&launch&all&the&threads
&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&//&now&create&an&object&to&consume&the&messages
&&&int&threadNumber&=&0;
&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&threadNumber++;
}首先我们创建一个map,告诉kafka提供给哪个topic多少线程。consumer.createmessagestreams是我们如何把这个信息传递给卡夫卡。返回的是一个包含kafkastream 的以topic 为键list的map结合。(注意,这里我们只向卡夫卡注册一个话题,但我们可以为map中多添加一个元素的)最后,我们创建的线程池和通过一项新的consumertest对象,每个线程运转我们的业务逻辑。5.清理和异常处理Kafka在每次处理后不会立即更新zookeeper上的偏移值,她会休息上一段时间后提交。在这段时间内,你的消费者可能已经消费了一些消息,但并没有提交到zookeeper上。这样你可能会重复消费数据。同时一些时候,broker失败从新选取leader是也可能会导致重复消费消息。为了避免这种情况应该清理完成后再关闭,而不是直接使用kill命令。e.gtry&{
&&&Thread.sleep(10000);
}&catch&(InterruptedException&ie)&{
example.shutdown();full codepackage&com.test.
import&kafka.consumer.ConsumerC
import&kafka.consumer.KafkaS
importkafka.javaapi.consumer.ConsumerC
import&java.util.HashM
import&java.util.L
import&java.util.M
import&java.util.P
importjava.util.concurrent.ExecutorS
import&java.util.concurrent.E
public&class&ConsumerGroupExample&{
&&&private&final&ConsumerConnector&
&&&private&final&String&
&&&private&&ExecutorService&
&&&public&ConsumerGroupExample(String&a_zookeeper,&String&a_groupId,&Stringa_topic)&{
&&&&&&&consumer&=&kafka.consumer.Consumer.createJavaConsumerConnector(
&&&&&&&&&&&&&&&createConsumerConfig(a_zookeeper,&a_groupId));
&&&&&&&this.topic&=&a_
&&&public&void&shutdown()&{
&&&&&&&if&(consumer&!=&null)&consumer.shutdown();
&&&&&&&if&(executor&!=&null)&executor.shutdown();
&&&public&void&run(int&a_numThreads)&{
&&&&&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&&&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&&&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&&&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&&&&&//&now&launch&all&the&threads
&&&&&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&&&&&//&now&create&an&object&to&consume&the&messages
&&&&&&&int&threadNumber&=&0;
&&&&&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&&&&&threadNumber++;
&&&private&static&ConsumerConfig&createConsumerConfig(String&a_zookeeper,String&a_groupId)&{
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&return&new&ConsumerConfig(props);
&&&public&static&void&main(String[]&args)&{
&&&&&&&String&zooKeeper&=&args[0];
&&&&&&&String&groupId&=&args[1];
&&&&&&&String&topic&=&args[2];
&&&&&&&int&threads&=&Integer.parseInt(args[3]);
&&&&&&&ConsumerGroupExample&example&=&new&ConsumerGroupExample(zooKeeper,groupId,&topic);
&&&&&&&example.run(threads);
&&&&&&&try&{
&&&&&&&&&&&Thread.sleep(10000);
&&&&&&&}&catch&(InterruptedException&ie)&{
&&&&&&&example.shutdown();
}此处的启动命令需提供1:2181&group3 &&myTopic& 41.1:2181 zookeeper 的端口和地址2.group3&& Consumer Group Name3.myTopic& consumer消费消息的message4.消费topic的线程数
了这篇文章
类别:┆阅读(0)┆评论(0)

我要回帖

更多关于 kafka 分区策略 的文章

 

随机推荐