cms监控软件如何使用JMX监控Kafka

今天看啥 热点:
使用JMX监控KafkaKafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来。
关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring
开启JMX端口
修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then& & export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"& & export JMX_PORT="9999"fi
通过Jconsole测试时候可以连接
通过JavaAPI来访问
通过以下方法获取目标值
public class KafkaDataProvider{& & protected final Logger LOGGER = LoggerFactory.getLogger(getClass());& & private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";& & private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";& & private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";& & private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";& & private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";& & private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";& & private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";& & private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";& & public String extractMonitorData() {& & & & //TODO 通过调用API获得IP以及参数& & & & KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();& & & & String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";& & & & try {& & & & & & MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);& & & & & & ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);& & & & & & ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);& & & & & & ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);& & & & & & ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);& & & & & & ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);& & & & & & ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);& & & & & & ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);& & & & & & ObjectName partCountObj = new ObjectName(PART_COUNT);& & & & & & Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");& & & & & & Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");& & & & & & Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");& & & & & & Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");& & & & & & Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");& & & & & & Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");& & & & & & Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");& & & & & & Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");& & & & & & monitorDataPoint.setMessagesInPerSec(messagesInPerSec);& & & & & & monitorDataPoint.setBytesInPerSec(bytesInPerSec);& & & & & & monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);& & & & & & monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);& & & & & & monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);& & & & & & monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);& & & & & & monitorDataPoint.setActiveControllerCount(activeControllerCount);& & & & & & monitorDataPoint.setPartCount(partCount);& & & & } catch (IOException e) {& & & & & & e.printStackTrace();& & & & } catch (MalformedObjectNameException e) {& & & & & & e.printStackTrace();& & & & } catch (AttributeNotFoundException e) {& & & & & & e.printStackTrace();& & & & } catch (MBeanException e) {& & & & & & e.printStackTrace();& & & & } catch (ReflectionException e) {& & & & & & e.printStackTrace();& & & & } catch (InstanceNotFoundException e) {& & & & & & e.printStackTrace();& & & & }& & & & return monitorDataPoint.toString();& & }& & public static void main(String[] args) {& & & & System.out.println(new KafkaDataProvider().extractMonitorData());& & }& & /**& &
* 获得MBeanServer 的连接& &
* @param jmxUrl& &
* @return& &
* @throws IOException& &
*/& & public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {& & & & JMXServiceURL url = new JMXServiceURL(jmxUrl);& & & & JMXConnector jmxc = JMXConnectorFactory.connect(url, null);& & & & MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();& & & && & }}
其他工具除了自己编写定制化的监控程序外
kafka-web-console/claudemamo/kafka-web-console部署sbt:http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.htmlhttp://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html
KafkaOffsetMonitor/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day
Mx4jLoader
分布式发布订阅消息系统 Kafka 架构设计
Apache Kafka 代码实例
Apache Kafka 教程笔记
Apache kafka原理与特性(0.8V)&
Kafka部署与代码实例&
Kafka介绍和集群环境搭建&
Kafka 的详细介绍:请点这里Kafka 的下载地址:请点这里
本文永久更新链接地址:
相关搜索:
相关阅读:
相关频道:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
Linux最近更新kafka-使用规范_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
kafka-使用规范
阅读已结束,下载文档到电脑
想免费下载本文?
定制HR最喜欢的简历
你可能喜欢一、KafkaOffsetMonitor简述
KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。
二、KafkaOffsetMonitor下载
KafkaOffsetMonitor托管在Github上,可以通过Github下载。
下载地址:/quantifind/KafkaOffsetMonitor/releases
三、KafkaOffsetMonitor启动
将下载下来的KafkaOffsetMonitor jar包上传到linux上,可以新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar进入到KafkaMonitor目录下,通过java编译命令来运行这个jar包:
[root@192KafkaMonitor]#java -cp KafkaOffsetMonitor-assembly-0.2.0.jar
com.quantifind.kafka.offsetapp.OffsetGetterWeb
--zk 127.0.0.1:2181
--refresh 5.minutes
--retain 1.day
[root@192KafkaMonitor]#java -cp KafkaOffsetMonitor-assembly-0.2.0.jar&& com.quantifind.kafka.offsetapp.OffsetGetterWeb--zk 127.0.0.1:2181 --refresh 5.minutes --retain 1.day&
按回车后,可以看到控制台输出:
如果没有指定端口,则默认会开启一个随机端口
如果想指定固定的端口,可以用:
[root@192KafkaMonitor]#java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb
--port 8089
--zk 127.0.0.1:2181
--refresh 5.minutes
--retain 1.day
&&&&[root@192KafkaMonitor]#java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb--port 8089--zk 127.0.0.1:2181 --refresh 5.minutes --retain 1.day&
zk :zookeeper主机地址,如果有多个,用逗号隔开
port :应用程序端口
refresh :应用程序在数据库中刷新和存储点的频率
retain :在db中保留多长时间
dbName :保存的数据库文件名,默认为offsetapp
为了更方便的启动KafkaOffsetMonitor,可以写一个启动脚本来直接运行,我这里新建一个名为:kafka-monitor-start.sh的脚本,然后编辑这个脚本:
[root@192 KafkaMonitor]# vim kafka-monitor-start.sh
[root@192 KafkaMonitor]# vim kafka-monitor-start.sh&
输入如下内容:
#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m
-cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8089 \
--zk 127.0.0.1:2181 \
--refresh 5.minutes \
--retain 1.day 1&/usr/local/src/KafkaMonitor/logs/stdout.log 2&/usr/local/src/KafkaMonitor/logs/stderr.log &
#!/bin/bashjava -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m&&-cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \--port 8089 \--zk 127.0.0.1:2181 \--refresh 5.minutes \--retain 1.day 1>/usr/local/src/KafkaMonitor/logs/stdout.log 2>/usr/local/src/KafkaMonitor/logs/stderr.log &&
然后退出保存即可,接下来修改一下kafka-monitor-start.sh的权限,否则会报
bash:./kafka-monitor-start.sh:权限不够
[root@192 KafkaMonitor]# chmod
kafka-monitor-start.sh
bash:./kafka-monitor-start.sh:权限不够[root@192 KafkaMonitor]# chmod&&+x&&kafka-monitor-start.sh &
启动KafkaOffsetMonitor:
[root@192 KafkaMonitor]# ./kafka-monitor-start.sh
[root@192 KafkaMonitor]# ./kafka-monitor-start.sh&
四、KafkaOffsetMonitor Web UI
在游览器中输入:http://ip:port即可以查看KafkaOffsetMonitor Web UI,如下图:
这里可以看到所有的消费群组。点击某一个群组进入:
可以看到,这个消费群组订阅了两个主题,这里的参数:
Topic:订阅的主题
Partition:分区编号
Offest:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
Last Seen:消费状态刷新最新时间。
点击某个主题进去,可以看到当前主题的消费图谱:
除了上面可视化界面之外,监控工具还提供了活动主题和消费者可视化页面:
这里一个主题只有一个订阅群组,如果是多个订阅者:
同时也提供了Kafka集群可视化:
由于这里是单机的,所以看到的只有一个节点,如果是集群,有多个broker,那么这里就是会有多个节点。
五、KafkaOffsetMonitor 总结
KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。
除了KafkaOffsetMonitor,Kafka监控工具还有另外两款:
Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。
Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。
后续将介绍如何搭建和使用这两款工具。
转载请注明: &
or分享 (0)【Kafka之kafka-manager监控使用示例】 - 突袭资讯
当前位置&:&&&&【Kafka之kafka-manager监控使用示例】
热门标签:&
【Kafka之kafka-manager监控使用示例】
编辑:张德勇评论:
[root@node1 opt]# ls
collectd ? ? ? ? flume1.6 ? ? ? influxdb ? ? ? ? ? ? ?nagios-plugins-1.4.13 ?Python-2.6.6.tgz ? ? ? ? ? ? ? ?
? yum-3.2.26.tar.gz ?elasticsearch-2.0.0-rc1 ? ? ? ? gnu ? ? ? ? ? ?kafka_2.10-0.9.0.1 ? ? ? ? ? ? ? ? ?
?openssl-1.0.0e ? ? ? ? soft ? ? ? ? zookeepe346 ?elasticsearch-2.1.1 ? ? ? ? ? ? grafana-2.5.0 ?
kafka-web-console-2.1.0-SNAPSHOT ? ? ?ORCLfmap ? ? ? ? ? ? ? storm096 ? ? ? zookeeper.out
elasticsearch-jdbc-2.2.0.0 ? ? ?hadoop ? ? ? ? kafka-web-console-2.1.0-SNAPSHOT.zip ?path ? ? ? ? ? ? ? ? ? stormTest.jar ?elasticsearch-jdbc-2.2.0.0.zip ?hadoop_data ? ?mq ? ? ? php-5.4.10 ? wget-log
es5.0 ? ? ? ? ?httpd-2.2.23 ? nagios ? ? ? Python-2.6.6 ? ? yum-3.2.22-40.el5.centos.noarch.rpm
[root@node1 opt]# cd zookeepe346/bin/
[root@node1 bin]# ./zkServer.sh start
JMX enabled by default
Using config: /opt/zookeepe346/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node1 bin]# cd ../../kafka_2.10-0.9.0.1/
[root@node1 kafka_2.10-0.9.0.1]# cd bin/
[root@node1 opt]# ls
collectd ? ? ? ?flume1.6 ? ? ? influxdb ? ? ? ? mq ? ? ? ? ?php-5.4.10 ? ? wget-log ??elasticsearch-2.0.0-rc1 ? ? ? ? gnu ? ? ? ? ? ?kafka_2.10-0.9.0.1 ? ? ? ? nagios ? ? Python-2.6.6 ? ? ?yum-3.2.22-40.el5.centos.noarch.rpm
elasticsearch-2.1.1 ? grafana-2.5.0 ?kafka-manager-1.3.0.4.zip ? ? ? ? ? ? nagios-plugins-1.4.13 ?Python-2.6.6.tgz ?yum-3.2.26.tar.gz ?elasticsearch-jdbc-2.2.0.0 ? ? ?hadoop ? ? ? ? kafka-manager-1.3.3.4.zip ? ? ? ? ? ? openssl-1.0.0e ? ? ? ? soft ? ? zookeepe346 ??elasticsearch-jdbc-2.2.0.0.zip ?hadoop_data ? ?
kafka-web-console-2.1.0-SNAPSHOT ? ? ?ORCLfmap ? ? ? ? ? ? ? storm096 ? ? ? ? ?zookeeper.out
es5.0 ? ? ? ? httpd-2.2.23 ? kafka-web-console-2.1.0-SNAPSHOT.zip ?path ? ?stormTest.jar
[root@node1 bin]# ./kafka-server-start.sh ../config/server.properties ?&
[root@node1 opt]# unzip kafka-manager-1.3.3.4.zip
[root@node1 kafka-manager-1.3.3.4]# cd bin/
[root@node1 bin]# ls
kafka-manager ?kafka-manager.bat
[root@node1 bin]# sh kafka-manager
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/typesafe/config/ConfigException : Unsupported major.minor version 52.0 ?--&修改JDK版本
? ? ? ? at java.lang.ClassLoader.defineClass1(Native Method)
? ? ? ? at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
? ? ? ? at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
? ? ? ? at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
? ? ? ? at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
? ? ? ? at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
? ? ? ? at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
? ? ? ? at java.security.AccessController.doPrivileged(Native Method)
? ? ? ? at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
? ? ? ? at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
? ? ? ? at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
? ? ? ? at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
? ? ? ? at play.core.server.ProdServerStart$.readServerConfigSettings(ProdServerStart.scala:81)
? ? ? ? at play.core.server.ProdServerStart$.start(ProdServerStart.scala:42)
? ? ? ? at play.core.server.ProdServerStart$.main(ProdServerStart.scala:27)
? ? ? ? at play.core.server.ProdServerStart.main(ProdServerStart.scala)
[root@node1 bin]# cat kafka-manager
[root@node1 bin]# vi application.conf ?--&配置ZK
[root@node1 bin]# cat ../conf/application.conf?
# Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
# See accompanying LICENSE file.
# This is the main configuration file for the application.
# Secret key
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret="^&csmm5Fx4d=r2HEX8pelM3iBkFVv?k[IZE&_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=${?APPLICATION_SECRET}
# The application languages
play.i18n.langs=["en"]
play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader
kafka-manager.zkhosts="192.168.8.131:2181"
#kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
? loggers = ["akka.event.slf4j.Slf4jLogger"]
? loglevel = "INFO"
basicAuthentication.enabled=false
basicAuthentication.username="admin"
basicAuthentication.password="password"
basicAuthentication.realm="Kafka-Manager"
kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
[root@node1 bin]#?
[root@node1 bin]# sh kafka-manager
查看图片附件

我要回帖

更多关于 有看头监控 使用说明 的文章

 

随机推荐