added statusReportToJcs

This commit is contained in:
tzwang 2025-03-07 17:48:24 +08:00
parent 3eb2ab199b
commit aa7d50a9e8
7 changed files with 84 additions and 12 deletions

View File

@ -81,4 +81,7 @@ BlockChain:
ContractAddress: 0x22ac23bf2d2cf1b4d8fec9cb4d279c7da6718e35
FunctionName: "storeEvidence"
MemberName: "pcm"
Type: "2"
Type: "2"
JcsMiddleware:
Url: 101.201.215.196:7891

8
go.sum
View File

@ -528,14 +528,6 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170 h1:/n3pl6WuH
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY=
gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 h1:WIs/189lRLNMXF2ui/Wm1+Y55eJ53BVGx+4+gdn9cls=
gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250108072048-9adf0597b07c h1:9LphS29VNfoWT73eqhgwKV1nG8PcoDUNu7dRev845wA=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250108072048-9adf0597b07c/go.mod h1:V19vFg8dWRAbaskASoSj70dgpacswOqZu/SaI02dxac=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304023304-d556ce8161c7 h1:pv1WX3+ttqsHs7nr7+lfYNkvzUp1KIJQ0XzWbVetj6w=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304023304-d556ce8161c7/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304035519-da6ab53b969d h1:EfAxN4oaCVIRsnM3pnC7NskifFRjM/THBUiMGtQQzfg=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304035519-da6ab53b969d/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306022112-4ed1f08d3170 h1:NsHFtWPpcL8nF0s4v0DHuHuPaPFgMO9xITQCMM7Du1E=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306022112-4ed1f08d3170/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207 h1:korhOkFl0x1wuQBKoKTsQHeFboDwLFRWwR2G9IPPfNg=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo=

View File

@ -51,6 +51,8 @@ type Config struct {
SnowflakeConf SnowflakeConf
Monitoring Monitoring
JcsMiddleware JcsMiddleware
}
type Monitoring struct {
PromUrl string
@ -61,3 +63,7 @@ type Monitoring struct {
type SnowflakeConf struct {
MachineId int64 `json:"machineId"`
}
type JcsMiddleware struct {
Url string
}

View File

@ -29,7 +29,6 @@ import (
"gorm.io/gorm"
"sigs.k8s.io/yaml"
"strings"
"sync"
)
type Scheduler struct {
@ -40,7 +39,6 @@ type Scheduler struct {
result []string //pID:子任务yamlstring 键值对
AiStorages *database.AiStorage
AiService *service.AiService
mu sync.RWMutex
}
type SubSchedule interface {

View File

@ -26,6 +26,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@ -256,6 +257,13 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
return err
}
//report msg
report := &jcs.JobStatusReportReq{
JobSetID: "",
LocalJobID: "",
Messages: make([]*jcs.ReportMessage, 0),
}
var errmsg string
for _, err := range errs {
e := (err).(struct {
@ -271,6 +279,16 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
if err != nil {
return errors.New("database add failed: " + err.Error())
}
//add report msg
jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false,
Message: msg,
ClusterID: e.clusterId,
}
report.Messages = append(report.Messages, jobMsg)
}
for _, s := range results {
as.option.ComputeCard = s.Card //execute card
@ -291,7 +309,20 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
return errors.New("database add failed: " + err.Error())
}
}
//add report msg
jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false,
Message: s.Msg,
ClusterID: s.ClusterId,
}
report.Messages = append(report.Messages, jobMsg)
}
//report status
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.Url, report)
logx.Errorf(errors.New(errmsg).Error())
return errors.New(errmsg)
}

View File

@ -29,6 +29,7 @@ type AiService struct {
InferenceAdapterMap map[string]map[string]inference.ICluster
Storage *database.AiStorage
LocalCache map[string]interface{}
Conf *config.Config
}
func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) {
@ -43,6 +44,7 @@ func NewAiService(conf *config.Config, storages *database.AiStorage, localCache
InferenceAdapterMap: make(map[string]map[string]inference.ICluster),
Storage: storages,
LocalCache: localCache,
Conf: conf,
}
for _, id := range adapterIds {
clusters, err := storages.GetClustersByAdapterId(id)
@ -91,7 +93,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
inferenceMap[c.Id] = sgai
case OPENI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token)
openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token)
collectorMap[c.Id] = openi
executorMap[c.Id] = openi
inferenceMap[c.Id] = openi

View File

@ -0,0 +1,40 @@
package jcs
import (
"gitlink.org.cn/JointCloud/pcm-openi/common"
)
type JobStatusReportReq struct {
JobSetID string `json:"jobSetID"`
LocalJobID string `json:"localJobID"`
Messages []*ReportMessage `json:"messages"`
}
type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID string `json:"clusterID"`
Output string `json:"output"`
}
func StatusReport(url string, report *JobStatusReportReq) error {
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
_, err := req.
SetHeader("Content-Type", "application/json").
SetBody(&report).
SetResult(&resp).
Post(url)
if err != nil {
return err
}
return nil
}