作者 青鸟

最近在项目中接触了消息队列,写一篇记录一下Go语言下的使用消息队列,调用的是这个库asynq(”github.com/hibiken/asynq”)这个的文档真的是读过这么多以来最清楚的一个了,很值得去拜读一下。这篇博客是对asynq的Wiki做了一个简单的翻译。

什么是消息队列

消息队列(Message Queue)是一种在分布式系统中用于在不同组件或服务之间传递消息的通信模式。它是一种异步通信方式,用于解耦不同部分的系统,使它们能够以解耦和的方式协同工作,而无需直接相互通信。

在消息队列中,消息的发送者将消息放入队列中,而消息的接收者从队列中获取消息并进行处理。这使得发送者和接收者之间可以彼此独立地操作,无需即时相互通信。消息队列具有以下一些优点和特点:

  1. 解耦性: 消息队列允许系统的不同部分能够独立演化,而不会直接影响其他部分。这使得系统更加灵活和可维护。

  2. 异步通信: 发送者和接收者之间的通信是异步的,即发送者不需要等待接收者处理消息。这有助于提高系统的性能和响应性能。

  3. 缓冲能力: 消息队列可以作为一个缓冲区,帮助平衡发送者和接收者之间的处理速度差异。如果接收者暂时无法处理消息,它们可以留在队列中等待。

  4. 可靠性: 好的消息队列系统通常具有数据持久化和备份机制,确保消息不会因为系统故障而丢失。

  5. 扩展性: 通过增加消息队列的实例,系统可以更容易地进行扩展以处理更大的负载。

  6. 支持多种通信模式: 消息队列通常支持点对点通信和发布-订阅模式,使得不同的通信场景都可以得到满足。

  7. 削峰填谷: 在分布式系统中消息队列最重要的功能之一就是削峰填谷,在高并发情况下,系统可能会面临突然的大量请求,导致系统资源耗尽。消息队列可以用于削峰填谷,即将请求暂时存储在消息队列中,然后按照系统处理能力逐渐消费消息,从而平滑处理高峰期的请求,避免系统崩溃或性能下降。

消息队列在高并发场景中是很常见的使用工具。当我们的系统在高并发情况下面临大量的写请求,我们不能再依赖于写入缓存中(缓存适用的场景是读多写少的场景)。于是我们便可以将大量写请求放入消息队列中,以此来均匀地、异步地处理写请求,从而做到削峰填谷的作用。

一些常见的消息队列系统包括 RabbitMQ、Apache Kafka、Amazon SQS(Simple Queue Service)、ActiveMQ 等。这些系统在不同的场景下有不同的适用性和特点,您可以根据具体需求选择合适的消息队列技术来实现异步通信和解耦。

Asynq工作原理

Asynq 是一个开源的异步任务队列库,用于在 Go 编程语言中处理异步任务。它允许开发者将耗时的、非实时的任务从主应用程序中分离出来,放入队列中异步执行,从而提高应用程序的性能和响应能力。

Asynq由 Redis 提供支持(仅仅支持redis),旨在实现可扩展且易于上手。

Asynq 工作原理的高级概述:

  • 客户端将任务放入队列
  • 服务器从队列中拉取任务并为每个任务启动一个工作协程
  • 任务由多个worker同时处理

任务队列用作跨多台机器分配工作的机制。一个系统可以由多个工作服务器和代理组成,使其具有高可用性和水平扩展特性。

示例用例:

以下是 Asynq 的一些特点和功能:

  1. 任务队列: Asynq 提供了一个任务队列,可以将任务添加到队列中,然后由后台工作程序异步执行。这对于需要执行耗时操作的任务非常有用,例如发送电子邮件、处理图像等。

  2. 多优先级支持: Asynq 支持多个任务优先级,您可以为每个任务指定优先级,以确保重要任务得到及时处理。

  3. 任务超时和重试: 您可以为任务设置超时时间,并配置任务失败后的重试策略。这有助于处理临时性的故障或网络问题。

  4. 任务状态跟踪: Asynq 允许您跟踪任务的执行状态,包括已排队、已完成、已失败等。

  5. 任务调度: 您可以根据需要调度任务的执行时间,例如可以延迟执行或定时执行任务。

  6. 可扩展性: Asynq 支持分布式部署,可以将任务队列和工作程序部署在多台服务器上,以实现高可用性和更大的处理能力。

  7. 轻量级: Asynq 在 Go 编程语言中实现,因此它具有较小的资源消耗和快速的执行速度。

  8. 可定制性: 您可以根据自己的需求扩展和定制 Asynq,以适应特定的业务场景。

Asynq 的设计受到了其他一些流行的消息队列系统的影响,但它专注于为 Go 开发人员提供一种简单而强大的异步任务处理解决方案。如果您正在使用 Go 编程语言,并且需要处理异步任务,Asynq 可能是一个值得考虑的选择。您可以在其官方 GitHub 存储库上找到更多关于 Asynq 的信息、文档和示例。强烈推荐去阅读一下Wiki的文档,为数不多的写的非常好的文档

简单使用Asynq

在这里我们会编写两个文件,一个clientworker

client.go是客服端,他会产生很多的task放入队列里,交给worker协程去异步处理

workers.go 会启动很多协程去处理client产生的task

配置

首先两个都需要去做redis的配置:

1
2
3
4
5
6
7
8
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}

Task

一个Asynq的工作单元被封装在Task这个类型中,Task有两个重要的属性:Type和Payload

1
2
3
4
5
// Type是一个字符串值,表示任务的类型
func (t *Task) Type() string

// Payload是任务执行所需要的数据。
func (t *Task) Payload() []byte

Client Program

为了创建一个task,我们使用NewTask函数,这个函数需要两个参数,type和payload

Enqueue这个方法会将任务加入到优先队列中等待worker的消费,这个方法里可以使用参数ProcessIn或者ProcessAt。

如果没有设置ProcessIn或者ProcessAt,任务将立即进入待办队列,如果空闲,则会立即得到执行。

如果使用 ProcessIn 或 ProcessAt 选项来安排将来要处理的任务。相当于延时任务。

ProcessIn接受一个time.Duration类型参数,ProcessAt接受一个time.Time类型参数
这两个的示例如下:

1
2
3
4
5
6

// 24 小时后处理任务。
info, err = client.Enqueue(reTask, asynq.ProcessIn(24*time.Hour))

// 现在时间+24小时后 处理任务
client.Enqueue(weTask, asynq.ProcessAt(time.Now().Add(24*time.Hour)))

下面给出client的完样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Task payload for any email related tasks.
type EmailTaskPayload struct {
// ID for the email recipient.
UserID int
}

// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

// Create a task with typename and payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)

t2 := asynq.NewTask("email:reminder", payload)

// Process the task immediately.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)

// Process the task 24 hours later.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
}

Workers Program

创建任务处理的asynq.Server

1
2
3
4
5
6
7
8
9
10
11
12
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},//Concurrency表示最大并发处理任务数。
)

// NOTE: We'll cover what this `handler` is in the section below.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}

值得注意的是我们这里使用的是Run来启动,这种启动方式的serve是开始任务处理并阻塞,如果使用Start,则是开始任务处理并非阻塞

1
2
3
if err := srv.Start(handler); err != nil {
log.Fatal(err)
}

Run 开始任务处理并阻塞,直到接收到退出程序的操作系统信号。 一旦它收到一个信号,它就会优雅地关闭所有活跃的工作人员和其他 goroutines 来处理任务。

Start 启动工作服务器。 服务器启动后,它会从队列中取出任务并为每个任务启动一个工作协程,然后调用 Handler 来处理它。任务由workers并发处理,最多达到 Config.Concurrency 中指定的并发数。这种方式下,如果你的main函数没有自己增加阻塞退出的方法,任务处理会出现问题。因为main函数处理完后,程序会立即退出。

下面介绍 一下上面用到的handler。要求handler必须要具有ProcessTask方法。

1
2
3
4
5
type Handler interface {
// ProcessTask should return nil if the task was processed successfully.
// If ProcessTask returns a non-nil error or panics, the task will be retried again later.
ProcessTask(context.Context, *Task) error
}

默认情况下,失败的任务将重试,且指数补偿最多可重复25次这个是可以重新配置的

下面给出完整的worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)

case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)

default:
return fmt.Errorf("unexpected task type: %s", t.Type())
}
return nil
}

func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)

// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}

我们可以继续向这个处理程序函数添加不同的case,但是在实际的应用程序中,在单独的函数中为每个 case 定义逻辑更方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
// 欢迎邮件任务 具体执行
mux.HandleFunc("email:welcome", sendWelcomeEmail)
// 提醒邮件任务 具体执行
mux.HandleFunc("email:reminder", sendReminderEmail)

// 不需要适配Handler,因为ServeMux实现了Handler接口
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}

就像“net/http”包中的 ServeMux 一样,可以通过调用 Handle 或 HandleFunc 来注册处理程序。ServeMux 满足 Handler 接口,这样你就可以将它传递给 (*Server).Run或者Start。

完善代码

现在我们已经提取了处理每个任务类型的函数,代码看起来更有条理了。
然而,代码有点太隐式了,我们有这些任务类型和有效负载类型的字符串值,它们应该封装在一个内聚包中。让我们通过编写一个封装任务创建和处理的包来重构代码。我们将简单地创建一个名为.task的包,并且重写一下worker和client中的相关的逻辑

task.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package task

import (
"context"
"fmt"

"github.com/hibiken/asynq"
)

// A list of task types.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)

// Task payload for any email related tasks.
type emailTaskPayload struct {
// ID for the email recipient.
UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}

t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}

// Process the task immediately.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)

// Process the task 24 hours later.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
}

workers.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}

我们在运行程序的时候可以使用命令asynq dash来确定队列情况

定时任务

Asynq对Periodic Tasks提供了很好的支持,我们可以与服务器一起运行一个Scheduler(调度器)以定期处理任务。Scheduler定期对任务进行排队,然后由集群中可用的工作服务器执行。

必须确保一次只有一个Scheduler在为一个Serve运行,否则您最终会得到重复的任务。使用集中式方法意味着不必同步调度,并且服务可以在不使用锁的情况下运行。

如果您需要动态添加和删除周期性任务,请使用PeriodicTaskManager而不是直接使用Scheduler。

Scheduler默认使用UTC时区,但可以使用scheduleopts更改使用的时区。

1
2
3
4
5
6
7
8
9
10
11
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)

静态

为了周期执行任务,需要在scheduler注册一个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
scheduler := asynq.NewScheduler(redisConnOpt, nil)

task := asynq.NewTask("example_task", nil)

// 可以使用 cron 规范字符串来指定计划。
entryID, err := scheduler.Register("* * * * *", task)// 每分钟执行一次
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// 也可以使用"@every <duration>"语法指定间隔
entryID, err = scheduler.Register("@every 30s", task)// 每30s执行一次
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// 可以在注册任务的同时,指定配置项
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))// 每24小时执行一次 队列名字“myqueue”。

//我们使用`Run`把这个scheduler跑起来,并阻塞
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

我们可以为这个scheduler做一个错误处理

1
2
3
4
5
6
7
8
9
10
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// your error handling logic
}

scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)

动态

我们可以使用PeriodicTaskManager去动态的添加和删除周期性task,这就让我们不用去重启Scheduler这个进程。PeriodicTaskManager使用PeriodicTaskConfigProvider去周期性地获取periodic task的配置,并将Scheduler的条目和现在的配置同步

可以将周期性任务配置存储在数据库或本地文件中,并更新此配置源以动态添加和删除周期性任务。也可以轻松修改示例以使用数据库或其他配置源。

使用YAML文件进行说明。

1
2
3
4
5
6
configs:
- cronspec: "* * * * *"
task_type: foo

- cronspec: "* * * * *"
task_type: bar

我们使用PeriodicTaskConfigProvider来读取这个yaml文件,并返回一个PeriodicTaskConfig的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func main() {
//创建任务管理器
provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"}

mgr, err := asynq.NewPeriodicTaskManager(
asynq.PeriodicTaskManagerOpts{
RedisConnOpt: asynq.RedisClientOpt{Addr: "localhost:6379"},
PeriodicTaskConfigProvider: provider, // 配置源的接口
SyncInterval: 10 * time.Second, //指定同步发生的频率(同步配置源)
})
if err != nil {
log.Fatal(err)
}
//// 运行 并阻塞
if err := mgr.Run(); err != nil {
log.Fatal(err)
}
}

// // 必须要实现的方法(接口:PeriodicTaskConfigProvider),读取所有配置项
type FileBasedConfigProvider struct {
filename string
}

// Parses the yaml file and return a list of PeriodicTaskConfigs.
func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
data, err := os.ReadFile(p.filename)
if err != nil {
return nil, err
}
var c PeriodicTaskConfigContainer
if err := yaml.Unmarshal(data, &c); err != nil {
return nil, err
}
var configs []*asynq.PeriodicTaskConfig
for _, cfg := range c.Configs {
configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)})
}
return configs, nil
}

type PeriodicTaskConfigContainer struct {
Configs []*Config `yaml:"configs"`
}

type Config struct {
Cronspec string `yaml:"cronspec"`
TaskType string `yaml:"task_type"`
}

错误中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
// loggingMiddleware 记录任务日志中间件
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("Start processing %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
return nil
})
}

然后再main中注册这个中间件

1
2
mux := asynq.NewServeMux()
mux.Use(loggingMiddleware)

Web UI

Asynqmon是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的README

参考资料:

  1. Asynq简单、可靠、高效的分布式任务队列
  2. Asynq的Wiki