- 1.下载 kafka和zookeeper
- 2.安装zookeeper
- 3.启动kafka
- 4.kafka常用命令
- 1.创建一个主题
- 2.使用kafka-console-producer发送数据
- 3.使用kafka-console-consumer消费数据
- 4.查看指定的topic的offset信息
- 5.删除一个topic
- 6.查看某个group的信息
- 7.查看consumer group的列表
- 8.在zk中删除一个consumer group
- 9.查看topic的offset的最小值
- 10.查看topic的offset的最大值
- 11.重置topic的某个消费者的offset为0
- 12.重置topic的某个消费者的offset为指定值
- 13.指定offset和partition进行消费
- 14.查看kafka topic的consumer的某个时间的offset
- 15.将某个topic的某个消费者的offset置为lastest
- 16.迁移topic副本
- 17.生成迁移方案
- 18.查看同步延迟(under replicated)的partition
- 19.增加kafka topic的副本数量
1.下载 kafka和zookeeper#
这里下载的是 kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gz
可以在清华镜像站下载
1 | https://mirrors.tuna.tsinghua.edu.cn/apache/ |
或者apache官网
1 | https://kafka.apache.org/downloads |
然后分别解压到/usr/local目录下
2.安装zookeeper#
进入zookeeper目录,在conf目录下将zoo_sample.cfg文件拷贝,并更名为zoo.cfg
参考 https://my.oschina.net/phoebus789/blog/730787
zoo.cfg文件的内容
1 | # The number of ticks that the initial |
新建下面这两个目录
1 | /home/common/zookeeper/zookeeperdir/zookeeper-data |
在zookeeper-data目录下新建一个myid文件,内容为1,代表这个服务器的编号是1,具体参考上面网址中的内容
最后在/etc/profile中添加环境变量,并source
1 | export ZOOKEEPER_HOME=/usr/local/zookeeper |
现在zookeeper就安装好了,现在启动zookeeper
1 | bin/zkServer.sh start |
查看状态
1 | bin/zkServer.sh status |
启动客户端脚本
1 | bin/zkCli.sh -server localhost:2181 |
停止zookeeper
1 | bin/zkServer.sh stop |
3.启动kafka#
1.现在安装kafka,同样是解压之后就安装好了
参考 http://www.jianshu.com/p/efc8b9dbd3bd
2.进入kafka目录下
kafka需要使用Zookeeper,首先需要启动Zookeeper服务,上面的操作就已经启动了Zookeeper服务
如果没有的话,可以使用kafka自带的脚本启动一个简单的单一节点Zookeeper实例
1 | bin/zookeeper-server-start.sh config/zookeeper.properties |
启动 Kafka服务
1 | bin/kafka-server-start.sh config/server.properties |
停止 Kafka服务
1 | bin/kafka-server-stop.sh config/server.properties |
4.kafka常用命令#
1.创建一个主题#
首先创建一个名为test
的topic,只使用单个分区和一个复本
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
现在可以运行list topic命令看到我们的主题
1 | bin/kafka-topics.sh --list --zookeeper localhost:2181 |
2.使用kafka-console-producer发送数据#
1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
如果要批量导入文件数据到kafka,参考:2.1 本地环境下kafka批量导入数据
1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat |
如果要模拟实时数据到打入kafka的情况,可以写一个shell脚本
1 | #!/usr/bin/env bash |
3.使用kafka-console-consumer消费数据#
旧版消费者
1 | bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null |
新版消费者
1 | bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null |
消费带权限kafka topic
1 | /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --new-consumer --bootstrap-server xxxx:9092 --topic my_topic --consumer.config ./client.jaas > ./test.log |
其他参数
1 | --max-messages XXX 指定在退出前最多读取的消息数 |
同时打印kafka message中的key和value
1 | kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic my_topic_name --partition 1 --offset 1589306 --max-messages 100000 --property print.key=true --property key.separator="-" > /home/lintong/tmp.log |
4.查看指定的topic的offset信息#
对于结尾是ZK的消费者,其消费者的信息是存储在Zookeeper中的
对于结尾是KF的消费者,其消费者的信息是存在在Kafka的broker中的
都可以使用下面的命令进行查看
1 | bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx |
结果
1 | bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx |
或者
1 | ./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx |
结果
1 | bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group |
lag是负数的原因是 topic中的消息数量过期(超过kafka默认的7天后被删除了),变成了0,所以Lag=logSize减去Offset,所以就变成了负数
5.删除一个topic#
需要在 conf/server.properties 文件中设置
1 | # For delete topic |
否则在执行了以下删除命令后,再 list 查看所有的topic,还是会看到该topic
1 | bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB |
再到 配置文件 中的kafka数据存储地址去删除物理数据了,我的地址为
1 | /tmp/kafka-logs |
最后需要到zk里删除kafka的元数据
1 | ./bin/zkCli.sh #进入zk shell |
6.查看某个group的信息#
新版
1 | bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx |
结果
1 | bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id |
如果这时候消费者进程关闭了之后,使用上面的命令和下面的-list命令将不会查出这个group_id,但是当消费者进程重新开启后,这个group_id又能重新查到,且消费的offset不会丢失
旧版
1 | bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe |
7.查看consumer group的列表#
ZK的消费者可以使用下面命令查看,比如上面的例子中的 test-consumer-group
1 | bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list |
KF的消费者可以使用下面命令查看,比如上面的例子中的 console-consumer-xxx ,但是只会查看到类似于 KMOffsetCache-master 的结果,这是由于这种消费者的信息是存放在 __consumer_offsets 中
对于如何查看存储于 __consumer_offsets 中的新版消费者的信息,可以参考huxihx的博文: Kafka 如何读取offset topic内容 (__consumer_offsets)
1 | bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list |
8.在zk中删除一个consumer group#
1 | rmr /consumers/test-consumer-group |
9.查看topic的offset的最小值#
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2 |
10.查看topic的offset的最大值#
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1 |
11.重置topic的某个消费者的offset为0#
需要高版本的kafka才有该命令,在高版本的kafka client对低版本的kafka集群执行该命令是会生效的
而且需要该group是inactive的,即该消费组没有消费者,不然会报 Error: Assignments can only be reset if the group ‘xxxxxx’ is inactive, but the current state is Stable.
1 | kafka-consumer-groups --bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute |
如果是要调整某个topic的某个partition,只需要在topic名字后面加上(:partition_id),如
1 | kafka-consumer-groups --bootstrap-server localhost:9092 --group xxx --topic xxx:0 --reset-offsets --to-earliest --execute |
12.重置topic的某个消费者的offset为指定值#
1 | kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --reset-offsets --to-offset 1000 --topic topicName --execute |
13.指定offset和partition进行消费#
指定offset的时候必须指定partition
1 | /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --topic xxxx --partition 2 --offset 820000 --bootstrap-server xxx:9092 > ./test2.log |
14.查看kafka topic的consumer的某个时间的offset#
注意这个–to-datetime是utc时间,需要减去8个小时
1 | /opt/cloudera/parcels/KAFKA/bin/kafka-consumer-groups --bootstrap-server xxxx:9092 --group xxxx --topic xxxx --command-config ./client.jaas --reset-offsets --to-datetime 2020-01-01T00:00:00.000 |
client.jaas
1 | properties { |
15.将某个topic的某个消费者的offset置为lastest#
1 | /opt/cloudera/parcels/KAFKA/bin/kafka-consumer-groups --bootstrap-server xxxx:9092 --group xxxx --topic xxxx --reset-offsets --to-latest --execute |
16.迁移topic副本#
比如topic1有3个partition,这个3个partition分布在broker150、broker151、broker152上,此时想要将broker152换成broker164,比如说在kafka集群添加了新节点或者遇到磁盘分布不均匀的情况的时候
在kafka manager上面点击
修改
再点击
如果数据量大的话会比较耗时
17.生成迁移方案#
18.查看同步延迟(under replicated)的partition#
1 | /opt/cloudera/parcels/KAFKA/lib/kafka/bin/kafka-topics.sh --zookeeper xxxx:2181 --describe --under-replicated-partitions |
19.增加kafka topic的副本数量#