Write by lyc at 2020-6-17 参考博文:kafka官网 zookeeper官网 Kafka学习之路 (一)Kafka的简介
一、关于消息系统 1.消息队列
消息 Message
:网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频、视频等。
队列 Queue
:一种特殊的线性表,特殊之处在于它只允许再首部删除元素(队首),在尾部添加元素(队尾)。
消息队列 MQ
:保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。
2.MQ 分类 主要分为两类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subscribe)
共同点:消息生产者(Producer)生产消息发送到队列中,然后消息消费者(Consumer)从队列中读取并消费消息。
不同点:
点对点:组成:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。一个生产者生产的消息只能有一个消费者,消息一旦被消费,消息就不在消息队列中了。如:打电话。即发送到消息队列的消息且只能被一个接受者接收
发布/订阅:组成:消息队列(Queue)、发布者(Publish)、订阅者(Subscribe)、主题(Topic)。每个消息可以有多个消费者,彼此互不影响,如:我发布一个微博,关注我的人都能看到,即发布到消息队列的消息能被多个接收者(订阅者)接收
3.常见的消息系统
ActiveMQ
:历史悠久,实现了JMS(Java Message Service)规范,支持性好,性能相对不高
RabbitMQ
:可靠性高、安全
Kafka
:分布式、高性能、跨语言
RocketMQ
:阿里开源的消息中间件,纯java实现
二、kafka_2.12-2.5.0 安装部署 kafka是一个分布式的 发布/订阅(Publish/Subscribe) 消息系统。
1.kafka 组成
Broker
kafka集群有多个kafka服务节点,每个和服务节点称为一个Broker
Topic
主题,用来存储不同类别的消息(kafka消息数据是存储在硬盘上的)
Partition
分区,每个Topic
包含一个或多个Partition
,在创建Topic
时指定包含的Partition
数量(目的是为了进行分布式存储)
Replication
副本,每个分区可以有多个副本,分布在不同的Broker
上。
zk
会选出一个Leader
,所有的读写请求都会通过Leader
完成,Follower
只负责备份数据。
所有Follower
会自动的从Leader
复制数据,当Leader
宕机后,会从Follower
中选出一个新的Leader
继续提供服务,实现故障的自动转移。
Message
消息,是通信的基本单位,每个消息都属于一个 Partition
Producer
消息的生产者,向kafka的一个Topic
发布消息
Consumer
消息的消费者,订阅Topic
发布消息
Consumer Group
每个Consumer
属于一个特定的Consumer Group
,多个Consumer
可以属于同一个Consumer Group
中
Zookeeper
协调kafka的正常运行,kafka将元数据信息保存在Zookeeper
中,但发送给Topic
本身的消息数据并不存储在ZK中,而在存储的磁盘文件中
2.推荐使用外部安装的 zookeeper
zookeeper,kafka 安装前先安装jdk1.8
zookeeper-3.6.1 二进制安装 1 2 3 4 5 6 $ cd /usr/local /src/ $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz $ tar xvf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local / $ cd /usr/local / $ mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1 $ ln -s /usr/local /zookeeper-3.6.1 /usr/local /zookeeper
zookeeper 配置 1 2 3 4 5 6 7 8 9 10 $ mkdir -p /data/zookeeper/{data,logs} $ /usr/local /zookeeper/conf $ cp zoo_sample.cfg zoo.cfg $ vim zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper/data dataLogDir=/data/zookeeper/logs clientPort=2181
zookeeper 启动 1 2 3 4 5 6 7 8 $ /usr/local /zookeeper/bin/zkServer.sh start $ /usr/local /zookeeper/bin/zkServer.sh status $ /usr/local /zookeeper/bin/zkServer.sh stop
3.kafka 二进制安装 kafka 二进制安装 1 2 3 4 $ cd /usr/local /src/ $ wget -c https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz $ tar xvf kafka_2.12-2.5.0.tgz -C /usr/local / $ ln -s /usr/local /kafka_2.12-2.5.0 /usr/local /kafka
kafka 配置 1 2 3 $ mkdir -p /data/kafka/{data,logs} $ cd /usr/local /kafka $ cp config/server.properties config/server.properties.default
配置文件解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 $ vim config/server.properties broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true
kafka 启动 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-server-start.sh config/server.properties & $ /usr/local /kafka/bin/kafka-server-stop.sh
jps 查看当前运行的java进程 1 2 3 4 $ jps 3552 Jps 3139 Kafka 3100 QuorumPeerMain
三、kafka 使用 1.Topic 创建 1 2 3 4 5 6 7 $ /usr/local /kafka/bin/kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 3 \ > --topic hello Created topic hello.
2.查看Topic列表 1 2 $ /usr/local /kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 hello
3.查看某一个具体的Topic 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello Topic: hello PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: hello Partition: 2 Leader: 0 Replicas: 0 Isr: 0
4.修改Topic:增加分区Partitions
分区 Partitions
只能增加,不能减少
副本Replication-Factor
不能修改
1 2 3 $ /usr/local /kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
5.删除Topic 1 2 3 4 5 6 $ vim server.properties delete.topic.enable=true $ /usr/local /kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello
6.消息收发测试: 生产消息 1 2 3 4 5 6 $ /usr/local /kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello >hello world >my name is lyc >yes >hihihihi >
消费消息 1 2 3 4 5 $ /usr/local /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning hello world yes my name is lyc hihihihi