SysEvent功能可以关闭

This commit is contained in:
Sydonian 2025-05-13 16:14:33 +08:00
parent aaae2d526e
commit 5cec10bad4
15 changed files with 50 additions and 53 deletions

View File

@ -76,7 +76,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
}
// 初始化系统事件发布器
evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceClient{
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
UserID: config.Cfg().Local.UserID,
})
if err != nil {

View File

@ -89,7 +89,7 @@ func test(configPath string) {
}
// 初始化系统事件发布器
evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceClient{
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
UserID: config.Cfg().Local.UserID,
})
if err != nil {

View File

@ -69,7 +69,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
}
// 初始化系统事件发布器
evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceClient{
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
UserID: config.Cfg().Local.UserID,
})
if err != nil {

View File

@ -2,7 +2,6 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
@ -14,6 +13,7 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent"
)
type Config struct {
@ -22,7 +22,7 @@ type Config struct {
CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"`
Logger logger.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ mq.Config `json:"rabbitMQ"`
SysEvent sysevent.Config `json:"sysEvent"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
DownloadStrategy strategy.Config `json:"downloadStrategy"`

View File

@ -31,7 +31,7 @@ func init() {
}
func watchSysEvent(outputJSON bool) {
host, err := sysevent.NewWatcherHost(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ))
host, err := sysevent.NewWatcherHost(config.Cfg().SysEvent)
if err != nil {
fmt.Println(err)
return

View File

@ -19,15 +19,14 @@
"password": "123456",
"databaseName": "cloudream"
},
"rabbitMQ": {
"sysEvent": {
"enabled": false,
"address": "127.0.0.1:5672",
"account": "cloudream",
"password": "123456",
"vhost": "/",
"param": {
"retryNum": 5,
"retryInterval": 5000
}
"exchange": "SysEvent",
"queue": "SysEvent"
},
"connectivity": {
"testInterval": 300

View File

@ -11,16 +11,6 @@
"password": "123456",
"databaseName": "cloudream"
},
"rabbitMQ": {
"address": "127.0.0.1:5672",
"account": "cloudream",
"password": "123456",
"vhost": "/",
"param": {
"retryNum": 5,
"retryInterval": 5000
}
},
"tickTock": {
"hubUnavailableTime": "20s"
},

View File

@ -20,15 +20,14 @@
"outputDirectory": "log",
"level": "debug"
},
"rabbitMQ": {
"sysEvent": {
"enabled": false,
"address": "127.0.0.1:5672",
"account": "cloudream",
"password": "123456",
"vhost": "/",
"param": {
"retryNum": 5,
"retryInterval": 5000
}
"exchange": "SysEvent",
"queue": "SysEvent"
},
"tickTock": {
"testHubConnectivitiesInterval": "5m"

View File

@ -1,19 +1,11 @@
package sysevent
import "gitlink.org.cn/cloudream/common/pkgs/mq"
type Config struct {
Enabled bool `json:"enabled"`
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
}
func ConfigFromMQConfig(mqCfg mq.Config) Config {
return Config{
Address: mqCfg.Address,
Account: mqCfg.Account,
Password: mqCfg.Password,
VHost: mqCfg.VHost,
}
Exchange string `json:"exchange"`
Queue string `json:"queue"`
}

View File

@ -32,6 +32,7 @@ type OtherError struct {
}
type Publisher struct {
cfg Config
connection *amqp.Connection
channel *amqp.Channel
eventChan *async.UnboundChannel[SysEvent]
@ -39,6 +40,13 @@ type Publisher struct {
}
func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
if !cfg.Enabled {
return &Publisher{
cfg: cfg,
thisSource: thisSource,
}, nil
}
config := amqp.Config{
Vhost: cfg.VHost,
}
@ -55,7 +63,7 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
return nil, fmt.Errorf("openning channel on connection: %w", err)
}
err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil)
err = channel.ExchangeDeclare(cfg.Exchange, "fanout", false, true, false, false, nil)
if err != nil {
connection.Close()
return nil, fmt.Errorf("declare exchange: %w", err)
@ -74,6 +82,10 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
func (p *Publisher) Start() *PublisherEventChan {
ch := async.NewUnboundChannel[PublisherEvent]()
go func() {
if !p.cfg.Enabled {
return
}
defer ch.Close()
defer p.channel.Close()
defer p.connection.Close()
@ -95,7 +107,7 @@ func (p *Publisher) Start() *PublisherEventChan {
continue
}
err = p.channel.Publish(ExchangeName, "", false, false, amqp.Publishing{
err = p.channel.Publish(p.cfg.Exchange, "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: eventData,
Expiration: "60000", // 消息超时时间默认1分钟
@ -111,12 +123,20 @@ func (p *Publisher) Start() *PublisherEventChan {
}
func (p *Publisher) Stop() {
if !p.cfg.Enabled {
return
}
p.channel.Close()
p.connection.Close()
}
// Publish 发布事件,会自动补齐必要信息
func (p *Publisher) Publish(eventBody datamap.SysEventBody) {
if !p.cfg.Enabled {
return
}
p.eventChan.Send(datamap.SysEvent{
Timestamp: time.Now(),
Source: p.thisSource,
@ -126,5 +146,9 @@ func (p *Publisher) Publish(eventBody datamap.SysEventBody) {
// PublishRaw 完全原样发布事件,不补齐任何信息
func (p *Publisher) PublishRaw(evt SysEvent) {
if !p.cfg.Enabled {
return
}
p.eventChan.Send(evt)
}

View File

@ -4,11 +4,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
)
const (
SysEventQueueName = "SysEventQueue"
ExchangeName = "SysEventExchange"
)
type SysEvent = datamap.SysEvent
type Source = datamap.SysEventSource

View File

@ -45,14 +45,14 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) {
return nil, fmt.Errorf("openning channel on connection: %w", err)
}
err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil)
err = channel.ExchangeDeclare(cfg.Exchange, "fanout", false, true, false, false, nil)
if err != nil {
connection.Close()
return nil, fmt.Errorf("declare exchange: %w", err)
}
_, err = channel.QueueDeclare(
SysEventQueueName,
cfg.Queue,
false,
true,
false,
@ -65,14 +65,14 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) {
return nil, fmt.Errorf("declare queue: %w", err)
}
err = channel.QueueBind(SysEventQueueName, "", ExchangeName, false, nil)
err = channel.QueueBind(cfg.Queue, "", cfg.Exchange, false, nil)
if err != nil {
channel.Close()
connection.Close()
return nil, fmt.Errorf("bind queue: %w", err)
}
recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil)
recvChan, err := channel.Consume(cfg.Queue, "", true, false, true, false, nil)
if err != nil {
channel.Close()
connection.Close()

View File

@ -2,7 +2,6 @@ package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
@ -12,7 +11,6 @@ import (
type Config struct {
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ mq.Config `json:"rabbitMQ"`
TickTock ticktock.Config `json:"tickTock"`
RPC rpc.Config `json:"rpc"`
}

View File

@ -93,7 +93,7 @@ func serve(configPath string, opts serveOptions) {
// go serveAccessStat(acStat)
// 初始化系统事件发布器
evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceHub{
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceHub{
HubID: hubCfg.Hub.HubID,
HubName: hubCfg.Hub.Name,
})

View File

@ -2,11 +2,11 @@ package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
c "gitlink.org.cn/cloudream/common/utils/config"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
"gitlink.org.cn/cloudream/jcs-pub/hub/internal/http"
"gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock"
@ -19,7 +19,7 @@ type Config struct {
HTTP *http.Config `json:"http"`
CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"`
Logger log.Config `json:"logger"`
RabbitMQ mq.Config `json:"rabbitMQ"`
SysEvent sysevent.Config `json:"sysEvent"`
TickTock ticktock.Config `json:"tickTock"`
}