kafka的部署与实践
kafka简介
kafka是按照分布式事务日志架构的大规模发布/订阅消息队列
基本组成
Producer:Producer即生产者,消息的产生者,是消息的入口。
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
消息大小:消息大小占用4byte,用于描述消息的大小。
消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
部署kafka
在部署kafka的时候遇到了一万个问题,在mac上使用homebrew安装的时候,zookeeper一直安装失败,使用docker直接部署的时候配置文件又出现问题,最后兜兜转转还是使用了docker-compose来安装
docker-compose部署
1 | version: '3' |
1 | 启动Kafka容器组 |
消息测试
- 启动Kafka Producer
新开一个命令后窗口,然后执行以下命令,启动Kafka Producer,准备往topic:test发送消息
1 | 进入容器 |
- 启动Kafka Consumer
新开一个命令后窗口,然后执行以下命令,启动Kafka Consumer,订阅来自topic:test的消息
1 | 进入容器 |
在Producer命令行窗口输入内容,然后回车即可发送消息.然后就可以在Consumer命令行窗口看到收到的消息
可能遇到的问题
如果你碰到ZooKeeper、Kafka容器无法正常启动,可以删除数据卷以及容器后进行创建
1 | # 停用&删除容器 |
在golang中使用kafaka
在github中关于kafka的sdk中star最多的是sarama这个库,我们也通过这个库来对kafka进行简单的操作
我们先启动一个消费者
- consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("Failed to start consumer: %v", err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalf("Failed to start partition consumer: %v", err)
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("Received message: %s\n", string(msg.Value))
case err := <-partitionConsumer.Errors():
log.Printf("Error: %v\n", err)
}
}
}
关于kafka消息的producer默认有两种选择,就是同步和异步
异步如下所示:
- async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("hello,async"),
}
// 3. 连接kafka
client, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
client.Input()<-msg
}
同步如下:
- sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// 1. 初始化生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
// 2. 构造一个消息,结构体类型
msg := &sarama.ProducerMessage{
Topic: "shopping",
Value: sarama.StringEncoder("20220411happy02"),
}
// 3. 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println(err)
}
defer client.Close()
// 4. 发送消息
partition, offset, err := client.SendMessage(msg)
fmt.Println("partition:")
fmt.Println(partition)
fmt.Println("offset:")
fmt.Println(offset)
if err != nil {
fmt.Println(err)
}
}
这样子我们就简单地实现了一个消费者和一个生产者
当然了,关于kafka的配置是有讲究的,我们这里就不一一列举了,详细的信息可以参考文末的参考文章。
参考文章:
使用Docker部署Kafka单机版