190 lines
4.8 KiB
Go
190 lines
4.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"github.com/gin-gonic/gin"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-ai/service"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-client/api"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-client/config"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-client/initialize"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-client/middleware"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-client/router"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-hpc/api/job"
|
|
hpc "gitlink.org.cn/JointCloud/pcm-participant-hpc/service"
|
|
"gitlink.org.cn/JointCloud/pcm-participant-hpc/service/slurm"
|
|
"go.uber.org/zap"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
func main() {
|
|
var configFile string
|
|
flag.StringVar(&configFile, "config", "config/config.yaml", "config file path")
|
|
flag.Parse()
|
|
|
|
// 加载配置
|
|
cfg := config.LoadConfig(configFile)
|
|
|
|
// 初始化日志
|
|
err := initialize.InitLogger(cfg)
|
|
if err != nil {
|
|
initialize.Panic("Init log failed, error: %s", err)
|
|
}
|
|
defer initialize.Close()
|
|
// 初始化集群连接
|
|
//if err := initialize.InitHPCCluster(cfg); err != nil {
|
|
// zap.L().Fatal("集群初始化失败", zap.Error(err))
|
|
//}
|
|
// 初始化智算集群连接
|
|
aiSvc, err := initialize.InitAICluster(cfg)
|
|
if err != nil {
|
|
initialize.Panic("Server started failed: %s", err)
|
|
return
|
|
}
|
|
api.AiApi.RegisterSvc(aiSvc)
|
|
// 初始化通算集群连接
|
|
cloudSvc, err := initialize.InitCloudCluster(cfg)
|
|
if err != nil {
|
|
initialize.Panic("Server started failed: %s", err)
|
|
return
|
|
}
|
|
api.CloudApi.RegisterSvc(cloudSvc)
|
|
defer initialize.CloseAllPools()
|
|
// 设置退出处理
|
|
setupGracefulShutdown()
|
|
// 注册集群操作器
|
|
initializeClusterOperators()
|
|
// 启动Web服务
|
|
initialize.Info("Server starting...")
|
|
err = startServer(cfg)
|
|
if err != nil {
|
|
initialize.Panic("Server started failed: %s", err)
|
|
}
|
|
}
|
|
|
|
func startServer(cfg *config.Server) error {
|
|
server := &http.Server{
|
|
Addr: ":" + cfg.System.Port,
|
|
Handler: getEngine(cfg),
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go func(ctxFunc context.CancelFunc) {
|
|
signalChan := make(chan os.Signal, 1)
|
|
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
|
|
for {
|
|
select {
|
|
case <-signalChan:
|
|
ctxFunc()
|
|
return
|
|
}
|
|
}
|
|
}(cancel)
|
|
go func() {
|
|
<-ctx.Done()
|
|
if err := server.Shutdown(context.Background()); err != nil {
|
|
initialize.Error("Failed to shutdown server: %s", err)
|
|
}
|
|
}()
|
|
initialize.Debug("Server started success")
|
|
err := server.ListenAndServe()
|
|
if errors.Is(err, http.ErrServerClosed) {
|
|
initialize.Debug("Server was shutdown gracefully")
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func getEngine(cfg *config.Server) *gin.Engine {
|
|
engine := gin.New()
|
|
engine.Use(gin.CustomRecovery(func(c *gin.Context, err interface{}) {
|
|
initialize.WithCtx(c).Error(fmt.Sprintf("server panic: %s", err))
|
|
c.AbortWithStatusJSON(http.StatusOK, gin.H{
|
|
"code": 500,
|
|
"msg": "服务器内部错误,请稍后再试!",
|
|
})
|
|
}))
|
|
//添加全局的traceId和访问日志
|
|
engine.Use(middleware.AddTrace())
|
|
engine.Use(middleware.AccessLog())
|
|
router.RegisterRoutes(engine)
|
|
return engine
|
|
}
|
|
|
|
func setupGracefulShutdown() {
|
|
c := make(chan os.Signal, 1)
|
|
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
<-c
|
|
zap.L().Info("Shutting down...")
|
|
initialize.CloseAllPools()
|
|
os.Exit(0)
|
|
}()
|
|
}
|
|
|
|
func initializeAISvc(svc *service.Service) {
|
|
// 获取所有集群信息
|
|
allAIClusterInfos := initialize.GetAllAIClusterInfos()
|
|
|
|
if len(allAIClusterInfos) == 0 {
|
|
zap.L().Warn("没有可用的集群信息")
|
|
return
|
|
}
|
|
api.AiApi.RegisterSvc(svc)
|
|
|
|
}
|
|
|
|
func initializeClusterOperators() {
|
|
// 获取所有集群信息
|
|
allHPCClusterInfos := initialize.GetAllHPCClusterInfos()
|
|
|
|
if len(allHPCClusterInfos) == 0 {
|
|
zap.L().Warn("没有可用的集群信息")
|
|
return
|
|
}
|
|
|
|
for clusterId, info := range allHPCClusterInfos {
|
|
pool, ok := initialize.GetClusterPool(clusterId)
|
|
if !ok {
|
|
zap.L().Warn("集群连接池不存在",
|
|
zap.String("cluster", clusterId),
|
|
zap.String("address", info.Address))
|
|
continue
|
|
}
|
|
|
|
var operator hpc.Operator
|
|
|
|
switch info.Driver {
|
|
case "slurm":
|
|
operator = slurm.NewPooledJobManager(pool, info.WorkDir, info.Username)
|
|
zap.L().Info("注册SLURM操作器",
|
|
zap.String("cluster", clusterId),
|
|
zap.String("workDir", info.WorkDir))
|
|
|
|
case "sugonac":
|
|
//operator = sugonac.NewJobManager(pool, info.WorkDir)
|
|
zap.L().Info("注册曙光操作器",
|
|
zap.String("cluster", clusterId),
|
|
zap.String("workDir", info.WorkDir))
|
|
|
|
default:
|
|
zap.L().Warn("未知集群类型",
|
|
zap.String("cluster", clusterId),
|
|
zap.String("type", info.Driver))
|
|
continue
|
|
}
|
|
|
|
if operator != nil {
|
|
job.HpcApi.RegisterOperator(clusterId, operator)
|
|
zap.L().Info("集群操作器已注册",
|
|
zap.String("cluster", clusterId),
|
|
zap.String("type", info.Driver))
|
|
}
|
|
}
|
|
}
|