pcm-coordinator/api/internal/mqs/zzz/queue.go

72 lines
2.0 KiB
Go

package zzz
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
)
type (
MsgQueue interface {
Consume(ctx context.Context, topic string, partition int, h Handler) error
SendMsg(ctx context.Context, msg *Msg) error
ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error
}
defaultMsgQueue struct {
// Redis客户端
client *redis.Client
}
)
func NewMsgQueue(client *redis.Client) MsgQueue {
return &defaultMsgQueue{client: client}
}
// SendMsg 发送消息
func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error {
return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error {
// 订阅频道
channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel()
for msg := range channel {
// 处理消息
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channel closed")
}
// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error {
// 订阅频道
channels := make([]string, len(partitions))
for i, partition := range partitions {
channels[i] = mq.partitionTopic(topic, partition)
}
channel := mq.client.Subscribe(ctx, channels...).Channel()
for msg := range channel {
// 处理消息
_, partitionString, _ := strings.Cut(msg.Channel, ":")
partition, _ := strconv.Atoi(partitionString)
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channels closed")
}
func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string {
return fmt.Sprintf("%s:%d", topic, partition)
}