Write by lyc at 2020-6-17
参考博文:
kafka官网
zookeeper官网
Kafka学习之路 (一)Kafka的简介

一、关于消息系统

1.消息队列

  • 消息 Message:网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频、视频等。
  • 队列 Queue:一种特殊的线性表,特殊之处在于它只允许再首部删除元素(队首),在尾部添加元素(队尾)。
  • 消息队列 MQ:保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。

2.MQ 分类

主要分为两类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subscribe)

共同点:消息生产者(Producer)生产消息发送到队列中,然后消息消费者(Consumer)从队列中读取并消费消息。

不同点:

  • 点对点:组成:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。一个生产者生产的消息只能有一个消费者,消息一旦被消费,消息就不在消息队列中了。如:打电话。即发送到消息队列的消息且只能被一个接受者接收
  • 发布/订阅:组成:消息队列(Queue)、发布者(Publish)、订阅者(Subscribe)、主题(Topic)。每个消息可以有多个消费者,彼此互不影响,如:我发布一个微博,关注我的人都能看到,即发布到消息队列的消息能被多个接收者(订阅者)接收

3.常见的消息系统

  • ActiveMQ:历史悠久,实现了JMS(Java Message Service)规范,支持性好,性能相对不高
  • RabbitMQ:可靠性高、安全
  • Kafka:分布式、高性能、跨语言
  • RocketMQ:阿里开源的消息中间件,纯java实现

二、kafka_2.12-2.5.0 安装部署

kafka是一个分布式的 发布/订阅(Publish/Subscribe) 消息系统。

1.kafka 组成

  • Broker kafka集群有多个kafka服务节点,每个和服务节点称为一个Broker
  • Topic 主题,用来存储不同类别的消息(kafka消息数据是存储在硬盘上的)
  • Partition 分区,每个Topic包含一个或多个Partition,在创建Topic时指定包含的Partition数量(目的是为了进行分布式存储)
  • Replication 副本,每个分区可以有多个副本,分布在不同的Broker上。
    • zk会选出一个Leader,所有的读写请求都会通过Leader完成,Follower只负责备份数据。
    • 所有Follower会自动的从Leader复制数据,当Leader宕机后,会从Follower中选出一个新的Leader继续提供服务,实现故障的自动转移。
  • Message 消息,是通信的基本单位,每个消息都属于一个 Partition
  • Producer 消息的生产者,向kafka的一个Topic发布消息
  • Consumer 消息的消费者,订阅Topic发布消息
  • Consumer Group 每个Consumer属于一个特定的Consumer Group,多个Consumer可以属于同一个Consumer Group
  • Zookeeper 协调kafka的正常运行,kafka将元数据信息保存在Zookeeper中,但发送给Topic本身的消息数据并不存储在ZK中,而在存储的磁盘文件中

2.推荐使用外部安装的 zookeeper

zookeeper,kafka 安装前先安装jdk1.8

zookeeper-3.6.1 二进制安装

1
2
3
4
5
6
$ cd /usr/local/src/
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
$ tar xvf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/
$ cd /usr/local/
$ mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1
$ ln -s /usr/local/zookeeper-3.6.1 /usr/local/zookeeper

zookeeper 配置

1
2
3
4
5
6
7
8
9
10
$ mkdir -p /data/zookeeper/{data,logs}
$ /usr/local/zookeeper/conf
$ cp zoo_sample.cfg zoo.cfg
$ vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181

zookeeper 启动

1
2
3
4
5
6
7
8
# 启动
$ /usr/local/zookeeper/bin/zkServer.sh start

# 查看状态
$ /usr/local/zookeeper/bin/zkServer.sh status

# 停止
$ /usr/local/zookeeper/bin/zkServer.sh stop

3.kafka 二进制安装

kafka 二进制安装

1
2
3
4
$ cd /usr/local/src/
$ wget -c https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
$ tar xvf kafka_2.12-2.5.0.tgz -C /usr/local/
$ ln -s /usr/local/kafka_2.12-2.5.0 /usr/local/kafka

kafka 配置

1
2
3
$ mkdir -p /data/kafka/{data,logs}
$ cd /usr/local/kafka
$ cp config/server.properties config/server.properties.default

配置文件解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ vim config/server.properties
############################# Server Basics #############################
broker.id=0 # broker的id,值为整数,且必须唯一,在一个集群中不能重复

############################# Socket Server Settings #############################
#listeners=PLAINTEXT://:9092 # kafka默认监听的端口为9092
num.network.threads=3 # 处理网络请求的线程数量,默认为3个
num.io.threads=8 # 执行磁盘IO操作的线程数量,默认为8个
socket.send.buffer.bytes=102400 # socket服务发送数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400 # socket服务接收数据的缓冲区大小,默认100KB
socket.request.max.bytes=104857600 # socket服务所能接收的一个请求的最大大小,默认100M

############################# Log Basics #############################
log.dirs=/data/kafka/logs # kafka存储消息数据的目录
num.partitions=1 # 每个topic默认的分区partition数量
num.recovery.threads.per.data.dir=1 # 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
#log.flush.interval.messages=10000 # 消息刷新到磁盘中的消息数阈值
#log.flush.interval.ms=1000 # 消息刷新到磁盘中的最大时间间隔

############################# Log Retention Policy ############################# 日志删除策略
log.retention.hours=168 # 日志保留小时数,超时会自动删除,默认为7天
#log.retention.bytes=1073741824 # 日志保留大小,超出大小会自动删除,默认为1G
log.segment.bytes=1073741824 # 日志分片策略,单个日志文件的大小最大为1G,超过会创建一个新的日志文件
log.retention.check.interval.ms=300000 # 每隔多长时间检测数据是否达到删除条件

############################# Zookeeper #############################
zookeeper.connect=localhost:2181 # zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connection.timeout.ms=18000 # 连接zookeeper的超时时间

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0


## Custom configuration # 用户自定义配置
delete.topic.enable=true # 是否可以删除topic,默认为false

kafka 启动

1
2
3
4
5
# 启动
$ /usr/local/kafka/bin/kafka-server-start.sh config/server.properties &

# 停止
$ /usr/local/kafka/bin/kafka-server-stop.sh

jps 查看当前运行的java进程

1
2
3
4
$ jps
3552 Jps
3139 Kafka # kafka
3100 QuorumPeerMain # zookeeper

三、kafka 使用

1.Topic 创建

1
2
3
4
5
6
7
$ /usr/local/kafka/bin/kafka-topics.sh \
> --create \
> --zookeeper localhost:2181 \
> --replication-factor 1 \
> --partitions 3 \
> --topic hello
Created topic hello.

2.查看Topic列表

1
2
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
hello

3.查看某一个具体的Topic

1
2
3
4
5
$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello
Topic: hello PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 2 Leader: 0 Replicas: 0 Isr: 0

4.修改Topic:增加分区Partitions

  • 分区 Partitions 只能增加,不能减少
  • 副本Replication-Factor 不能修改
1
2
3
$ /usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

5.删除Topic

1
2
3
4
5
6

$ vim server.properties # 需要另外启用Topic删除功能
delete.topic.enable=true # 是否可以删除Topic,默认为false

# Topci 删除
$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello

6.消息收发测试:

生产消息

1
2
3
4
5
6
$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
>hello world
>my name is lyc
>yes
>hihihihi
>

消费消息

1
2
3
4
5
$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning
hello world
yes
my name is lyc
hihihihi