增加rabbitmq断连重试机制

This commit is contained in:
JeshuaRen 2024-10-30 17:18:43 +08:00
parent 59b3105007
commit f5cb48eede
8 changed files with 94 additions and 23 deletions

View File

@ -61,12 +61,32 @@ func main() {
func serveMQServer(server *advmq.Server) {
logger.Info("start serving mq server")
err := server.Serve()
if err != nil {
logger.Errorf("mq server stopped with error: %s", err.Error())
ch := server.Start()
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}
logger.Info("mq server stopped")
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}
switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
if val == 1 {
break
}
}
}
logger.Info("command server stopped")
// TODO 仅简单结束了程序
os.Exit(1)
}
func serveReporter(rpt *reporter.Reporter) {

View File

@ -46,10 +46,30 @@ func main() {
func serveColServer(server *colmq.Server) {
logger.Info("start serving command server")
err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
ch := server.Start()
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}
switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
if val == 1 {
break
}
}
}
logger.Info("command server stopped")
// TODO 仅简单结束了程序
os.Exit(1)
}

View File

@ -2,6 +2,7 @@ package advisor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
@ -29,6 +30,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@ -42,8 +44,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start()
}
func (s *Server) OnError(callback func(error)) {

View File

@ -2,6 +2,7 @@ package collector
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
@ -35,6 +36,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@ -48,8 +50,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start()
}
func (s *Server) OnError(callback func(error)) {

View File

@ -1,12 +1,16 @@
package mq
import "fmt"
import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
RabbitMQParam mq.RabbitMQParam `json:"param"`
}
func (cfg *Config) MakeConnectingURL() string {

View File

@ -32,6 +32,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@ -46,7 +47,7 @@ func (s *Server) Stop() {
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
return s.rabbitSvr.Start()
}
func (s *Server) OnError(callback func(error)) {

View File

@ -2,6 +2,7 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
@ -39,6 +40,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@ -52,8 +54,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start()
}
func (s *Server) OnError(callback func(error)) {

View File

@ -103,12 +103,32 @@ func serveJobManager(mgr *jobmgr.Manager) {
func serveMQServer(server *mgrmq.Server) {
logger.Info("start serving mq server")
err := server.Serve()
if err != nil {
logger.Errorf("mq server stopped with error: %s", err.Error())
ch := server.Start()
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}
logger.Info("mq server stopped")
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}
switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
if val == 1 {
break
}
}
}
logger.Info("command server stopped")
// TODO 仅简单结束了程序
os.Exit(1)
}
func serveAdvisorManager(mgr *advisormgr.Manager) {