pcm-coordinator/rpc/internal/logic/participantservice/registerparticipantlogic.go

125 lines
4.0 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package participantservicelogic
import (
"context"
"github.com/pkg/errors"
models2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"time"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
"github.com/zeromicro/go-zero/core/logx"
)
type RegisterParticipantLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewRegisterParticipantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RegisterParticipantLogic {
return &RegisterParticipantLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// RegisterParticipant Participant注册接口
func (l *RegisterParticipantLogic) RegisterParticipant(in *pcmCore.ParticipantPhyReq) (*pcmCore.ParticipantPhyResp, error) {
//判断ParticipantId是否存在
db := l.svcCtx.DbEngin.Begin()
// 执行回滚或者提交操作
defer func() {
if p := recover(); p != nil {
db.Rollback()
logx.Error(p)
} else if db.Error != nil {
logx.Info("rollback")
db.Rollback()
} else {
db = db.Commit()
logx.Info("commit success")
}
}()
participantInfo := &models2.ScParticipantPhyInfo{}
utils.Convert(in, participantInfo)
if in.ParticipantId == 0 {
participantInfo.Id = utils.GenSnowflakeID()
} else {
participantInfo.Id = in.ParticipantId
}
//保存participant静态信息
result := db.Save(&participantInfo)
//保存节点信息
nodeList := make([]*models2.ScNodePhyInfo, 0)
for _, info := range in.NodeInfo {
nodeInfo := &models2.ScNodePhyInfo{}
utils.Convert(info, nodeInfo)
nodeInfo.CreatedTime = time.Now()
nodeInfo.ParticipantId = participantInfo.Id
//查询节点name与ParticipantId是否存在
nodeErr := db.Where(&models2.ScNodePhyInfo{NodeName: nodeInfo.NodeName, ParticipantId: in.ParticipantId}).Take(nodeInfo)
if errors.Is(nodeErr.Error, gorm.ErrRecordNotFound) {
nodeInfo.Id = utils.GenSnowflakeID()
}
nodeList = append(nodeList, nodeInfo)
}
result = db.Save(&nodeList)
//保存队列信息
queueList := make([]*models2.ScQueuePhyInfo, 0)
for _, info := range in.QueueInfo {
queueInfo := &models2.ScQueuePhyInfo{}
utils.Convert(info, queueInfo)
queueInfo.ParticipantId = participantInfo.Id
//查询队列name与ParticipantId是否存在
queueErr := db.Where(&models2.ScQueuePhyInfo{QueueName: queueInfo.QueueName, ParticipantId: in.ParticipantId}).Take(queueInfo)
if errors.Is(queueErr.Error, gorm.ErrRecordNotFound) {
queueInfo.Id = utils.GenSnowflakeID()
}
queueList = append(queueList, queueInfo)
}
result = db.Save(&queueList)
//保存标签信息
labelList := make([]*models2.ScParticipantLabelInfo, 0)
for _, label := range in.LabelInfo {
labelInfo := &models2.ScParticipantLabelInfo{}
utils.Convert(label, labelInfo)
labelInfo.CreatedTime = time.Now()
labelInfo.ParticipantId = participantInfo.Id
//查询标签key value与ParticipantId是否存在
labelErr := db.Where(&models2.ScParticipantLabelInfo{Key: labelInfo.Key, Value: labelInfo.Value, ParticipantId: in.ParticipantId}).Take(labelInfo)
if errors.Is(labelErr.Error, gorm.ErrRecordNotFound) {
labelInfo.Id = utils.GenSnowflakeID()
}
labelList = append(labelList, labelInfo)
}
result = db.Save(&labelList)
if result.Error != nil {
logx.Errorf("orm err:", result.Error)
return &pcmCore.ParticipantPhyResp{}, nil
}
return &pcmCore.ParticipantPhyResp{
Code: 200,
Msg: "ok",
ParticipantId: participantInfo.Id,
}, nil
}