pcm-coordinator/api/internal/mqs/queue.go

130 lines
3.0 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package mqs
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/core/queue"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/threading"
)
type (
ConsumeHandle func(v string) error
ConsumeHandler interface {
Consume(value string) error
}
redisQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
redisQueue struct {
topic string
channel chan redis.Message
client *redis.Client
handler ConsumeHandler
consumerRoutines *threading.RoutineGroup
producerRoutines *threading.RoutineGroup
}
)
func (r *redisQueue) Start() {
r.startConsumers()
r.startProducers()
r.producerRoutines.Wait()
close(r.channel)
r.consumerRoutines.Wait()
}
func (r *redisQueue) Stop() {
}
func (r redisQueues) Start() {
for _, each := range r.queues {
r.group.Add(each)
}
r.group.Start()
}
func (r redisQueues) Stop() {
r.group.Stop()
}
func (r *redisQueue) startConsumers() {
r.consumerRoutines.Run(func() {
for message := range r.channel {
if err := r.consumeOne(message.Payload); err != nil {
fmt.Errorf("consume: %s, error: %v", message.Payload, err)
}
}
})
}
func (r *redisQueue) consumeOne(value string) error {
err := r.handler.Consume(value)
return err
}
func (r *redisQueue) startProducers() {
r.producerRoutines.Run(func() {
for {
channel := r.client.Subscribe(context.Background(), r.topic).Channel()
for msg := range channel {
fmt.Println("生产者获取的值:", msg.Payload)
r.channel <- *msg
}
}
})
}
func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
return &redisQueue{
topic: topic,
client: redisClient,
channel: make(chan redis.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
handler: handler}
}
func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
q, err := NewQueue(topic, redisClient, handler)
if err != nil {
fmt.Println("NewQueue报错")
}
return q
}
func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) {
if len(topic) == 0 {
return nil, errors.New("topic不能为空")
}
r := redisQueues{
group: service.NewServiceGroup(),
}
r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler))
return r, nil
}