pcm-participant/client/main.go

190 lines
4.8 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"github.com/gin-gonic/gin"
"gitlink.org.cn/JointCloud/pcm-participant-ai/service"
"gitlink.org.cn/JointCloud/pcm-participant-client/api"
"gitlink.org.cn/JointCloud/pcm-participant-client/config"
"gitlink.org.cn/JointCloud/pcm-participant-client/initialize"
"gitlink.org.cn/JointCloud/pcm-participant-client/middleware"
"gitlink.org.cn/JointCloud/pcm-participant-client/router"
"gitlink.org.cn/JointCloud/pcm-participant-hpc/api/job"
hpc "gitlink.org.cn/JointCloud/pcm-participant-hpc/service"
"gitlink.org.cn/JointCloud/pcm-participant-hpc/service/slurm"
"go.uber.org/zap"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
var configFile string
flag.StringVar(&configFile, "config", "config/config.yaml", "config file path")
flag.Parse()
// 加载配置
cfg := config.LoadConfig(configFile)
// 初始化日志
err := initialize.InitLogger(cfg)
if err != nil {
initialize.Panic("Init log failed, error: %s", err)
}
defer initialize.Close()
// 初始化集群连接
//if err := initialize.InitHPCCluster(cfg); err != nil {
// zap.L().Fatal("集群初始化失败", zap.Error(err))
//}
// 初始化智算集群连接
//aiSvc, err := initialize.InitAICluster(cfg)
//if err != nil {
// initialize.Panic("Server started failed: %s", err)
// return
//}
//api.AiApi.RegisterSvc(aiSvc)
// 初始化通算集群连接
cloudSvc, err := initialize.InitCloudCluster(cfg)
if err != nil {
initialize.Panic("Server started failed: %s", err)
return
}
api.CloudApi.RegisterSvc(cloudSvc)
defer initialize.CloseAllPools()
// 设置退出处理
setupGracefulShutdown()
// 注册集群操作器
initializeClusterOperators()
// 启动Web服务
initialize.Info("Server starting...")
err = startServer(cfg)
if err != nil {
initialize.Panic("Server started failed: %s", err)
}
}
func startServer(cfg *config.Server) error {
server := &http.Server{
Addr: ":" + cfg.System.Port,
Handler: getEngine(cfg),
}
ctx, cancel := context.WithCancel(context.Background())
go func(ctxFunc context.CancelFunc) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
for {
select {
case <-signalChan:
ctxFunc()
return
}
}
}(cancel)
go func() {
<-ctx.Done()
if err := server.Shutdown(context.Background()); err != nil {
initialize.Error("Failed to shutdown server: %s", err)
}
}()
initialize.Debug("Server started success")
err := server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
initialize.Debug("Server was shutdown gracefully")
return nil
}
return err
}
func getEngine(cfg *config.Server) *gin.Engine {
engine := gin.New()
engine.Use(gin.CustomRecovery(func(c *gin.Context, err interface{}) {
initialize.WithCtx(c).Error(fmt.Sprintf("server panic: %s", err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
"code": 500,
"msg": "服务器内部错误,请稍后再试!",
})
}))
//添加全局的traceId和访问日志
engine.Use(middleware.AddTrace())
engine.Use(middleware.AccessLog())
router.RegisterRoutes(engine)
return engine
}
func setupGracefulShutdown() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
zap.L().Info("Shutting down...")
initialize.CloseAllPools()
os.Exit(0)
}()
}
func initializeAISvc(svc *service.Service) {
// 获取所有集群信息
allAIClusterInfos := initialize.GetAllAIClusterInfos()
if len(allAIClusterInfos) == 0 {
zap.L().Warn("没有可用的集群信息")
return
}
api.AiApi.RegisterSvc(svc)
}
func initializeClusterOperators() {
// 获取所有集群信息
allHPCClusterInfos := initialize.GetAllHPCClusterInfos()
if len(allHPCClusterInfos) == 0 {
zap.L().Warn("没有可用的集群信息")
return
}
for clusterId, info := range allHPCClusterInfos {
pool, ok := initialize.GetClusterPool(clusterId)
if !ok {
zap.L().Warn("集群连接池不存在",
zap.String("cluster", clusterId),
zap.String("address", info.Address))
continue
}
var operator hpc.Operator
switch info.Driver {
case "slurm":
operator = slurm.NewPooledJobManager(pool, info.WorkDir, info.Username)
zap.L().Info("注册SLURM操作器",
zap.String("cluster", clusterId),
zap.String("workDir", info.WorkDir))
case "sugonac":
//operator = sugonac.NewJobManager(pool, info.WorkDir)
zap.L().Info("注册曙光操作器",
zap.String("cluster", clusterId),
zap.String("workDir", info.WorkDir))
default:
zap.L().Warn("未知集群类型",
zap.String("cluster", clusterId),
zap.String("type", info.Driver))
continue
}
if operator != nil {
job.HpcApi.RegisterOperator(clusterId, operator)
zap.L().Info("集群操作器已注册",
zap.String("cluster", clusterId),
zap.String("type", info.Driver))
}
}
}