JCC-CSScheduler/common/pkgs/mq/dcontroller/server.go

78 lines
2.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dcontroller
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
const (
ServerQueueName = "Scheduler-DController"
)
type Service interface {
StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage)
WaitStorageLoadPackage(msg *WaitStorageLoadPackage) (*WaitStorageLoadPackageResp, *mq.CodeMessage)
StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage)
WaitStorageCreatePackage(msg *WaitStorageCreatePackage) (*WaitStorageCreatePackageResp, *mq.CodeMessage)
StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage)
WaitCacheMovePackage(msg *WaitCacheMovePackage) (*WaitCacheMovePackageResp, *mq.CodeMessage)
}
type Server struct {
service Service
rabbitSvr mq.RabbitMQServer
OnError func(err error)
}
func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
srv := &Server{
service: svc,
}
rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
ServerQueueName,
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
)
if err != nil {
return nil, err
}
srv.rabbitSvr = *rabbitSvr
return srv, nil
}
func (s *Server) Stop() {
s.rabbitSvr.Close()
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
}
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
return nil
}
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
return nil
}