kafka简介#
kafka是按照分布式事务日志架构的大规模发布/订阅消息队列
基本组成#
Producer:Producer即生产者,消息的产生者,是消息的入口。
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
消息大小:消息大小占用4byte,用于描述消息的大小。
消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
部署kafka#
在部署kafka的时候遇到了一万个问题,在mac上使用homebrew安装的时候,zookeeper一直安装失败,使用docker直接部署的时候配置文件又出现问题,最后兜兜转转还是使用了docker-compose来安装
docker部署#
首先看一下老版本的kafka,这是需要zookeeper去做状态同步的,于是使用docker-compose去同时部署两个容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| version: '3'
name: kafka-group
services:
zookeeper-test:
image: zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper_vol:/data
- zookeeper_vol:/datalog
- zookeeper_vol:/logs
container_name: zookeeper-test
kafka-test:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: "localhost"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-test:2181"
KAFKA_LOG_DIRS: "/kafka/logs"
volumes:
- kafka_vol:/kafka
depends_on:
- zookeeper-test
container_name: kafka-test
volumes:
zookeeper_vol: {}
kafka_vol: {}
|
1
2
| # 启动Kafka容器组
docker compose -f kafa-group.yml up -d
|
然后看一下新版的kafka使用了kraft去同步集群状态,去掉了对于zookeeper的依赖,于是就可以直接运行了。这里选择了3.7.0的版本,高版本也是一样的。
1
| docker run -p 9092:9092 --name kafka apache/kafka:3.7.0
|
消息测试#
首先可以创建主题
1
2
3
4
5
6
7
8
| # 进入容器
docker exec -it kafka-test /bin/bash
# 进入Kafka bin目录
cd /opt/kafka/bin
# 创建主题
kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
|
- 启动Kafka Producer
新开一个命令后窗口,然后执行以下命令,启动Kafka Producer,准备往topic:test发送消息
1
2
3
4
5
6
7
8
| # 进入容器
docker exec -it kafka-test /bin/bash
# 进入Kafka bin目录
cd /opt/kafka/bin
# 启动Producer
sh kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
- 启动Kafka Consumer
新开一个命令后窗口,然后执行以下命令,启动Kafka Consumer,订阅来自topic:test的消息
1
2
3
4
5
6
7
8
| # 进入容器
docker exec -it kafka-test /bin/bash
# 进入Kafka bin目录
cd /opt/kafka/bin
# 启动Consumer
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
|
在Producer命令行窗口输入内容,然后回车即可发送消息.然后就可以在Consumer命令行窗口看到收到的消息
可能遇到的问题#
如果你碰到ZooKeeper、Kafka容器无法正常启动,可以删除数据卷以及容器后进行创建
1
2
3
4
5
6
7
| # 停用&删除容器
docker stop zookeeper-test kafka-test
docker rm zookeeper-test kafka-test
# 删除数据卷
docker volume rm zookeeper_vol kafka_vol
docker volume rm kafka-group_kafka_vol kafka-group_zookeeper_vol
|
在golang中使用kafaka#
在github中关于kafka的sdk中star最多的是sarama这个库,我们也通过这个库来对kafka进行简单的操作
我们先启动一个消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Failed to start consumer: %v", err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalf("Failed to start partition consumer: %v", err)
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("Received message: %s\n", string(msg.Value))
case err := <-partitionConsumer.Errors():
log.Printf("Error: %v\n", err)
}
}
}
|
关于kafka消息的producer默认有两种选择,就是同步和异步
异步如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("hello,async"),
}
// 3. 连接kafka
client, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
client.Input()<-msg
}
|
同步如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "shopping",
Value: sarama.StringEncoder("20220411happy02"),
}
// 3. 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
partition, offset, err := client.SendMessage(msg)
fmt.Println("partition:")
fmt.Println(partition)
fmt.Println("offset:")
fmt.Println(offset)
if err != nil {
fmt.Println(err)
}
}
|
这样子我们就简单地实现了一个消费者和一个生产者
当然了,关于kafka的配置是有讲究的,我们这里就不一一列举了,详细的信息可以参考文末的参考文章。
参考文章:
使用Docker部署Kafka单机版
Golang:使用sarama向kafka投递任务
Kafka入门(3):Sarama生产者是如何工作的
Golang中如何正确的使用sarama包操作Kafka?