Write by lyc at 2020-6-17 参考博文:kafka官网 zookeeper官网 Kafka学习之路 (一)Kafka的简介
一、关于消息系统 1.消息队列
消息 Message:网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频、视频等。
队列 Queue:一种特殊的线性表,特殊之处在于它只允许再首部删除元素(队首),在尾部添加元素(队尾)。
消息队列 MQ:保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。
2.MQ 分类 主要分为两类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subscribe)
共同点:消息生产者(Producer)生产消息发送到队列中,然后消息消费者(Consumer)从队列中读取并消费消息。
不同点:
点对点:组成:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。一个生产者生产的消息只能有一个消费者,消息一旦被消费,消息就不在消息队列中了。如:打电话。即发送到消息队列的消息且只能被一个接受者接收
发布/订阅:组成:消息队列(Queue)、发布者(Publish)、订阅者(Subscribe)、主题(Topic)。每个消息可以有多个消费者,彼此互不影响,如:我发布一个微博,关注我的人都能看到,即发布到消息队列的消息能被多个接收者(订阅者)接收
3.常见的消息系统
ActiveMQ:历史悠久,实现了JMS(Java Message Service)规范,支持性好,性能相对不高
RabbitMQ:可靠性高、安全
Kafka:分布式、高性能、跨语言
RocketMQ:阿里开源的消息中间件,纯java实现
二、kafka_2.12-2.5.0 安装部署 kafka是一个分布式的 发布/订阅(Publish/Subscribe) 消息系统。
1.kafka 组成
Broker kafka集群有多个kafka服务节点,每个和服务节点称为一个Broker
Topic 主题,用来存储不同类别的消息(kafka消息数据是存储在硬盘上的)
Partition 分区,每个Topic包含一个或多个Partition,在创建Topic时指定包含的Partition数量(目的是为了进行分布式存储)
Replication 副本,每个分区可以有多个副本,分布在不同的Broker上。
zk会选出一个Leader,所有的读写请求都会通过Leader完成,Follower只负责备份数据。
所有Follower会自动的从Leader复制数据,当Leader宕机后,会从Follower中选出一个新的Leader继续提供服务,实现故障的自动转移。
Message 消息,是通信的基本单位,每个消息都属于一个 Partition
Producer 消息的生产者,向kafka的一个Topic发布消息
Consumer 消息的消费者,订阅Topic发布消息
Consumer Group 每个Consumer属于一个特定的Consumer Group,多个Consumer可以属于同一个Consumer Group中
Zookeeper 协调kafka的正常运行,kafka将元数据信息保存在Zookeeper中,但发送给Topic本身的消息数据并不存储在ZK中,而在存储的磁盘文件中
2.推荐使用外部安装的 zookeeper
zookeeper,kafka 安装前先安装jdk1.8
zookeeper-3.6.1 二进制安装 1 2 3 4 5 6 $ cd /usr/local /src/ $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz $ tar xvf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local / $ cd /usr/local / $ mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1 $ ln -s /usr/local /zookeeper-3.6.1 /usr/local /zookeeper
zookeeper 配置 1 2 3 4 5 6 7 8 9 10 $ mkdir -p /data/zookeeper/{data,logs} $ /usr/local /zookeeper/conf $ cp zoo_sample.cfg zoo.cfg $ vim zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper/data dataLogDir=/data/zookeeper/logs clientPort=2181
zookeeper 启动 1 2 3 4 5 6 7 8 $ /usr/local /zookeeper/bin/zkServer.sh start $ /usr/local /zookeeper/bin/zkServer.sh status $ /usr/local /zookeeper/bin/zkServer.sh stop
3.kafka 二进制安装 kafka 二进制安装 1 2 3 4 $ cd /usr/local /src/ $ wget -c https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz $ tar xvf kafka_2.12-2.5.0.tgz -C /usr/local / $ ln -s /usr/local /kafka_2.12-2.5.0 /usr/local /kafka
kafka 配置 1 2 3 $ mkdir -p /data/kafka/{data,logs} $ cd /usr/local /kafka $ cp config/server.properties config/server.properties.default
配置文件解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 $ vim config/server.properties broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true
kafka 启动 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-server-start.sh config/server.properties & $ /usr/local /kafka/bin/kafka-server-stop.sh
jps 查看当前运行的java进程 1 2 3 4 $ jps 3552 Jps 3139 Kafka 3100 QuorumPeerMain
三、kafka 使用 1.Topic 创建 1 2 3 4 5 6 7 $ /usr/local /kafka/bin/kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 3 \ > --topic hello Created topic hello.
2.查看Topic列表 1 2 $ /usr/local /kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 hello
3.查看某一个具体的Topic 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello Topic: hello PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: hello Partition: 2 Leader: 0 Replicas: 0 Isr: 0
4.修改Topic:增加分区Partitions
分区 Partitions 只能增加,不能减少
副本Replication-Factor 不能修改
1 2 3 $ /usr/local /kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
5.删除Topic 1 2 3 4 5 6 $ vim server.properties delete.topic.enable=true $ /usr/local /kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello
6.消息收发测试: 生产消息 1 2 3 4 5 6 $ /usr/local /kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello >hello world >my name is lyc >yes >hihihihi >
消费消息 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning hello world yes my name is lyc hihihihi