kafka简介

kafka是按照分布式事务日志架构的大规模发布/订阅消息队列

基本组成

  • Producer:Producer即生产者,消息的产生者,是消息的入口。

  • Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……

  • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。

  • Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

  • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

  • Message:每一条发送的消息主体。上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!

消息大小:消息大小占用4byte,用于描述消息的大小。

消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

  • Consumer:消费者,即消息的消费方,是消息的出口。

  • Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!

  • Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

部署kafka

在部署kafka的时候遇到了一万个问题,在mac上使用homebrew安装的时候,zookeeper一直安装失败,使用docker直接部署的时候配置文件又出现问题,最后兜兜转转还是使用了docker-compose来安装

docker部署

首先看一下老版本的kafka,这是需要zookeeper去做状态同步的,于是使用docker-compose去同时部署两个容器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '3'

name: kafka-group

services:
  zookeeper-test:
    image: zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper_vol:/data
      - zookeeper_vol:/datalog
      - zookeeper_vol:/logs
    container_name: zookeeper-test

  kafka-test:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: "localhost"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-test:2181"
      KAFKA_LOG_DIRS: "/kafka/logs"
    volumes:
      - kafka_vol:/kafka
    depends_on:
      - zookeeper-test
    container_name: kafka-test

volumes:
  zookeeper_vol: {}
  kafka_vol: {}
1
2
# 启动Kafka容器组
docker compose -f kafa-group.yml up -d

然后看一下新版的kafka使用了kraft去同步集群状态,去掉了对于zookeeper的依赖,于是就可以直接运行了。这里选择了3.7.0的版本,高版本也是一样的。

1
docker run  -p 9092:9092 --name  kafka apache/kafka:3.7.0

消息测试

首先可以创建主题

1
2
3
4
5
6
7
8
# 进入容器
docker exec -it kafka-test /bin/bash

# 进入Kafka bin目录
cd /opt/kafka/bin

# 创建主题
kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
  1. 启动Kafka Producer

新开一个命令后窗口,然后执行以下命令,启动Kafka Producer,准备往topic:test发送消息

1
2
3
4
5
6
7
8
# 进入容器
docker exec -it kafka-test /bin/bash

# 进入Kafka bin目录
cd /opt/kafka/bin

# 启动Producer
sh kafka-console-producer.sh --broker-list localhost:9092 --topic test
  1. 启动Kafka Consumer

新开一个命令后窗口,然后执行以下命令,启动Kafka Consumer,订阅来自topic:test的消息

1
2
3
4
5
6
7
8
# 进入容器
docker exec -it kafka-test /bin/bash

# 进入Kafka bin目录
cd /opt/kafka/bin

# 启动Consumer
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

在Producer命令行窗口输入内容,然后回车即可发送消息.然后就可以在Consumer命令行窗口看到收到的消息

可能遇到的问题

如果你碰到ZooKeeper、Kafka容器无法正常启动,可以删除数据卷以及容器后进行创建

1
2
3
4
5
6
7
# 停用&删除容器
docker stop zookeeper-test kafka-test
docker rm zookeeper-test kafka-test

# 删除数据卷
docker volume rm zookeeper_vol kafka_vol
docker volume rm kafka-group_kafka_vol kafka-group_zookeeper_vol

在golang中使用kafaka

在github中关于kafka的sdk中star最多的是sarama这个库,我们也通过这个库来对kafka进行简单的操作

我们先启动一个消费者

  • consume.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"fmt"
	"log"
	"github.com/IBM/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to start consumer: %v", err)
	}
	defer consumer.Close()

	partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
	if err != nil {
		log.Fatalf("Failed to start partition consumer: %v", err)
	}
	defer partitionConsumer.Close()

	for {
		select {
		case msg := <-partitionConsumer.Messages():
			fmt.Printf("Received message: %s\n", string(msg.Value))
		case err := <-partitionConsumer.Errors():
			log.Printf("Error: %v\n", err)
		}
	}
}

关于kafka消息的producer默认有两种选择,就是同步和异步

异步如下所示:

  • async.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
    "fmt"
    "github.com/IBM/sarama"
)

func main() {
    config := sarama.NewConfig()
    // 1. 初始化生产者配置
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 选择分区
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 成功交付的信息
    config.Producer.Return.Successes = true

    // 2. 构造一个消息,结构体类型
    msg := &sarama.ProducerMessage{
        Topic: "test",
        Value: sarama.StringEncoder("hello,async"),
    }

    // 3. 连接kafka
    client, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println(err)
    }
    defer client.Close()
    // 4. 发送消息
	client.Input()<-msg
}

同步如下:

  • sync.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "github.com/IBM/sarama"
)

func main() {
    config := sarama.NewConfig()
    // 1. 初始化生产者配置
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 选择分区
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 成功交付的信息
    config.Producer.Return.Successes = true

    // 2. 构造一个消息,结构体类型
    msg := &sarama.ProducerMessage{
        Topic: "shopping",
        Value: sarama.StringEncoder("20220411happy02"),
    }

    // 3. 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println(err)
    }
    defer client.Close()
    // 4. 发送消息
    partition, offset, err := client.SendMessage(msg)
    fmt.Println("partition:")
    fmt.Println(partition)
    fmt.Println("offset:")
    fmt.Println(offset)
    if err != nil {
        fmt.Println(err)
    }
}

这样子我们就简单地实现了一个消费者和一个生产者

当然了,关于kafka的配置是有讲究的,我们这里就不一一列举了,详细的信息可以参考文末的参考文章。

参考文章: 使用Docker部署Kafka单机版

Golang:使用sarama向kafka投递任务

Kafka入门(3):Sarama生产者是如何工作的

Golang中如何正确的使用sarama包操作Kafka?