diff --git a/client/api/cloud.go b/client/api/cloud.go index 82832c4..665dadf 100644 --- a/client/api/cloud.go +++ b/client/api/cloud.go @@ -1,28 +1,94 @@ package api import ( + "fmt" "github.com/gin-gonic/gin" + "github.com/go-playground/validator/v10" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/service" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + "net/http" + "strings" + "sync" ) type cloudApi struct { *Api + svcLock sync.RWMutex + service *service.Service } var CloudApi = cloudApi{ Api: BaseApi, } -// SubmitHandler 统一作业提交接口 -func (h *cloudApi) SubmitHandler(c *gin.Context) { +func (a *cloudApi) RegisterSvc(svc *service.Service) { + a.svcLock.Lock() + defer a.svcLock.Unlock() + a.service = svc } -// StatusHandler 统一状态查询接口 -func (h *cloudApi) StatusHandler(c *gin.Context) { +func (c *cloudApi) CreateContainerHandler(ctx *gin.Context) { + pfId := ctx.Query("pfId") + var param container.CreateParam + if err := ctx.ShouldBindJSON(¶m); err != nil { + if ve, ok := err.(validator.ValidationErrors); ok { + var errorMsg []string + for _, e := range ve { + errorMsg = append(errorMsg, fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())) + } + model.Response(ctx, http.StatusBadRequest, "请求体格式错误: "+strings.Join(errorMsg, "; "), nil) + } else { + model.Response(ctx, http.StatusBadRequest, "请求体解析失败: "+err.Error(), nil) + } + return + } + fmt.Println(c.service) + createContainer, err := c.service.CreateContainer(ctx.Request.Context(), pfId, ¶m) + if err != nil { + model.Response(ctx, http.StatusInternalServerError, err, nil) + return + } + model.Response(ctx, http.StatusOK, "success", createContainer) } -// DetailHandler 查询作业详情接口 -func (h *cloudApi) DetailHandler(c *gin.Context) { +func (c *cloudApi) DeleteContainerHandler(ctx *gin.Context) { + pfId := ctx.Query("pfId") + var param container.DeleteParam + if err := ctx.ShouldBindJSON(¶m); err != nil { + if ve, ok := err.(validator.ValidationErrors); ok { + var errorMsg []string + for _, e := range ve { + errorMsg = append(errorMsg, fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())) + } + model.Response(ctx, http.StatusBadRequest, "请求体格式错误: "+strings.Join(errorMsg, "; "), nil) + } else { + model.Response(ctx, http.StatusBadRequest, "请求体解析失败: "+err.Error(), nil) + } + return + } + err := c.service.DeleteContainer(ctx.Request.Context(), pfId, ¶m) + if err != nil { + model.Response(ctx, http.StatusInternalServerError, err, nil) + return + } + model.Response(ctx, http.StatusOK, "success", nil) +} + +func (c *cloudApi) GetContainerHandler(ctx *gin.Context) { + pfId := ctx.Query("pfId") + name := ctx.Query("name") + param := container.GetParam{ + Name: name, + } + + getContainer, err := c.service.GetContainer(ctx.Request.Context(), pfId, ¶m) + if err != nil { + model.Response(ctx, http.StatusInternalServerError, err, nil) + return + } + model.Response(ctx, http.StatusOK, "success", getContainer) } diff --git a/client/config/config.yaml b/client/config/config.yaml index fca8ef7..818938f 100644 --- a/client/config/config.yaml +++ b/client/config/config.yaml @@ -8,4 +8,5 @@ pcm-core: coordinator-host: http://127.0.0.1:8999 participant-host: http://localhost:8080 hpc-cluster-list: /pcm/v1/adapter/cluster/list?type=2&pageNum=1&pageSize=10&storageSchedule=1 - ai-cluster-list: /pcm/v1/adapter/cluster/list?adapterId=1777144940456666666&type=1&pageNum=1&pageSize=10&storageSchedule=1 \ No newline at end of file + ai-cluster-list: /pcm/v1/adapter/cluster/list?adapterId=1777144940456666666&type=1&pageNum=1&pageSize=10&storageSchedule=1 + cloud-cluster-list: /pcm/v1/adapter/cluster/list?adapterId=1770658294298316800&type=0&pageNum=1&pageSize=10 \ No newline at end of file diff --git a/client/config/pcm_core.go b/client/config/pcm_core.go index a570a94..f5658f6 100644 --- a/client/config/pcm_core.go +++ b/client/config/pcm_core.go @@ -1,9 +1,10 @@ package config type PcmCore struct { - CoordinatorHost string `mapstructure:"coordinator-host" json:"coordinator-host" yaml:"coordinator-host"` // C端主机地址 - ParticipantHost string `mapstructure:"participant-host" json:"participant-host" yaml:"participant-host"` // 本P端服务主机地址 - HPCClusterList string `mapstructure:"hpc-cluster-list" json:"hpc-cluster-list" yaml:"hpc-cluster-list"` // 集群列表接口 - AIClusterList string `mapstructure:"ai-cluster-list" json:"ai-cluster-list" yaml:"ai-cluster-list"` // 集群列表接口 - Token string `mapstructure:"token" json:"token" yaml:"token"` + CoordinatorHost string `mapstructure:"coordinator-host" json:"coordinator-host" yaml:"coordinator-host"` // C端主机地址 + ParticipantHost string `mapstructure:"participant-host" json:"participant-host" yaml:"participant-host"` // 本P端服务主机地址 + HPCClusterList string `mapstructure:"hpc-cluster-list" json:"hpc-cluster-list" yaml:"hpc-cluster-list"` // 集群列表接口 + AIClusterList string `mapstructure:"ai-cluster-list" json:"ai-cluster-list" yaml:"ai-cluster-list"` // 集群列表接口 + CloudClusterList string `mapstructure:"cloud-cluster-list" json:"cloud-cluster-list" yaml:"cloud-cluster-list"` + Token string `mapstructure:"token" json:"token" yaml:"token"` } diff --git a/client/go.mod b/client/go.mod index 740d90f..5a15fe2 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,6 +7,7 @@ replace gitlink.org.cn/JointCloud/pcm-participant-common v0.0.0 => ../common require ( github.com/gin-gonic/gin v1.10.0 github.com/hashicorp/yamux v0.1.2 + gitlink.org.cn/JointCloud/pcm-participant-common v0.0.0 ) @@ -39,4 +40,4 @@ require ( golang.org/x/text v0.15.0 // indirect google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect -) +) \ No newline at end of file diff --git a/client/go.sum b/client/go.sum index af93d9c..bbe93c2 100644 --- a/client/go.sum +++ b/client/go.sum @@ -2,10 +2,16 @@ github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UME github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic v1.13.3 h1:MS8gmaH16Gtirygw7jV91pDCN33NyMrPbN7qiYhEsF0= +github.com/bytedance/sonic v1.13.3/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= +github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= +github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -13,10 +19,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gabriel-vasile/mimetype v1.4.9 h1:5k+WDwEsD9eTLL8Tz3L0VnmVh9QxGjRmjBvAG7U/oYY= +github.com/gabriel-vasile/mimetype v1.4.9/go.mod h1:WnSQhFKJuBlRyLiKohA/2DtIlPFAbguNaG7QCHcyGok= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= +github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= +github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= @@ -25,8 +37,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4= +github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -41,6 +57,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= @@ -53,6 +71,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -77,24 +97,38 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA= +github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/arch v0.19.0 h1:LmbDQUodHThXE+htjrnmVD73M//D9GTH6wFZjyDkjyU= +golang.org/x/arch v0.19.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/client/initialize/cloudCluster.go b/client/initialize/cloudCluster.go new file mode 100644 index 0000000..1af7fde --- /dev/null +++ b/client/initialize/cloudCluster.go @@ -0,0 +1,152 @@ +package initialize + +import ( + "fmt" + "github.com/go-resty/resty/v2" + "gitlink.org.cn/JointCloud/pcm-participant-client/common/types" + "gitlink.org.cn/JointCloud/pcm-participant-client/common/utils" + "gitlink.org.cn/JointCloud/pcm-participant-client/config" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/service" + eci "gitlink.org.cn/JointCloud/pcm-participant-eci" + k8s "gitlink.org.cn/JointCloud/pcm-participant-k8s" + "strconv" + + "go.uber.org/zap" +) + +// 获取所有集群信息 +func GetAllCloudClusterInfos() map[string]types.ClusterInfo { + result := make(map[string]types.ClusterInfo) + clusterInfos.Range(func(key, value interface{}) bool { + result[key.(string)] = value.(types.ClusterInfo) + return true + }) + return result +} + +func InitCloudCluster(cfg *config.Server) (*service.Service, error) { + client := utils.InitClient(cfg.PcmCore.CoordinatorHost, "") + return initCloudSvc(client, cfg.PcmCore) +} + +// 初始化智算集群连接池 +func initCloudSvc(client *utils.RestyClient, core config.PcmCore) (*service.Service, error) { + resp := types.ResultResp{} + token := "Bearer " + core.Token + _, err := client.Request(core.CoordinatorHost+core.CloudClusterList, "GET", func(req *resty.Request) { + req.SetHeader("Authorization", token).SetResult(&resp) + }) + if err != nil { + return nil, fmt.Errorf("获取集群列表失败: %w", err) + } + + if resp.Code != 200 { + return nil, fmt.Errorf("API返回错误: %d, 消息: %s", resp.Code, resp.Msg) + } + + var platforms []platform.IPlatform + var svc *service.Service + //var octopus octopus.Octopus + + for _, cluster := range resp.Data.List { + if cluster.Status == "offline" { + + // 修改集群的server地址为本服务的地址,修改状态为在线 + cluster.Server = core.ParticipantHost + cluster.Status = "online" + + updateClusterReq := types.ClusterCreateReq{ + Id: cluster.Id, + AdapterId: strconv.FormatInt(cluster.AdapterId, 10), + Name: cluster.Name, + Nickname: cluster.Nickname, + Description: cluster.Description, + Server: cluster.Server, + MonitorServer: cluster.MonitorServer, + Username: cluster.Username, + Password: cluster.Password, + Token: cluster.Token, + Ak: cluster.Ak, + Sk: cluster.Sk, + RegionName: cluster.Region, + ProjectId: cluster.ProjectId, + Version: cluster.Version, + Label: cluster.Label, + OwnerId: cluster.OwnerId, + AuthType: cluster.AuthType, + ProducerDict: cluster.ProducerDict, + RegionDict: cluster.RegionDict, + Status: cluster.Status, + } + + updateResp := types.ResultResp{} + _, err := client.Request(core.CoordinatorHost+"/pcm/v1/adapter/cluster/update", "PUT", func(req *resty.Request) { + req.SetBody(updateClusterReq).SetResult(&updateResp) // 添加请求体 + }) + + if err != nil { + zap.L().Error("更新集群状态失败", zap.Error(err)) + continue + } + if updateResp.Code != 200 { + zap.L().Error("更新集群状态API返回错误", + zap.Int("code", updateResp.Code), + zap.String("message", updateResp.Msg)) + continue + } + + } else if cluster.Status == "online" { + if cluster.Server != core.ParticipantHost { + zap.L().Warn("集群已被其他服务代理", + zap.String("cluster_id", cluster.Id), + zap.String("当前服务地址", core.ParticipantHost), + zap.String("集群记录地址", cluster.Server)) + continue + } else { + // 更新集群信息的修改时间 + _, err := client.Request(core.CoordinatorHost+"/pcm/v1/adapter/cluster/update", "PUT", func(req *resty.Request) { + req.SetBody(cluster) + }) + if err != nil { + zap.L().Error("刷新集群时间失败", zap.Error(err)) + } + } + } + + if cluster.Id == "" { + zap.L().Warn("跳过无效集群条目: 缺少集群ID") + continue + } + switch cluster.Label { + case "kubernetes": + k8s, err := k8s.New(cluster.Token, cluster.Address, platform.Id(cluster.Id)) + if err != nil { + Error("初始化失败", zap.Error(err)) + continue + } + platforms = append(platforms, k8s) + //更新C端集群状态 + case "eci": + eci, err := eci.New(cluster.Ak, cluster.Sk, cluster.Password, platform.Id(cluster.Id)) + if err != nil { + Error("初始化失败", zap.Error(err)) + continue + } + platforms = append(platforms, eci) + + } + } + if len(platforms) == 0 { + return nil, fmt.Errorf("注册集群列表为空") + } + + for _, p := range platforms { + Info("注册集群列表:", zap.Any("id", p.Id()), zap.Any("type", p.Type()), zap.Any("name", p.Name())) + } + svc, err = service.NewService(platforms...) + if err != nil { + return nil, err + } + return svc, nil +} diff --git a/client/main.go b/client/main.go index 2334f2b..130f55f 100644 --- a/client/main.go +++ b/client/main.go @@ -40,14 +40,20 @@ func main() { //if err := initialize.InitHPCCluster(cfg); err != nil { // zap.L().Fatal("集群初始化失败", zap.Error(err)) //} - // 初始化集群连接 - svc, err := initialize.InitAICluster(cfg) + // 初始化智算集群连接 + aiSvc, err := initialize.InitAICluster(cfg) if err != nil { initialize.Panic("Server started failed: %s", err) return } - api.AiApi.RegisterSvc(svc) - + api.AiApi.RegisterSvc(aiSvc) + // 初始化通算集群连接 + cloudSvc, err := initialize.InitCloudCluster(cfg) + if err != nil { + initialize.Panic("Server started failed: %s", err) + return + } + api.CloudApi.RegisterSvc(cloudSvc) defer initialize.CloseAllPools() // 设置退出处理 setupGracefulShutdown() diff --git a/client/router/cloud.go b/client/router/cloud.go index 9cc1ad6..0cc63ab 100644 --- a/client/router/cloud.go +++ b/client/router/cloud.go @@ -1,6 +1,21 @@ package router -import "github.com/gin-gonic/gin" +import ( + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-client/api" +) func CloudRoutes(server *gin.Engine) { + cloud := server.Group("/cloud") + cloudApi := api.CloudApi + { + cloud := cloud.Group("container") + cloud.POST("/create", cloudApi.CreateContainerHandler) + cloud.DELETE("/delete", cloudApi.DeleteContainerHandler) + cloud.GET("/get", cloudApi.GetContainerHandler) + } + { + //deployment := cloud.Group("deployment") + //deployment.POST("/create", cloudApi.CreateDeployment) + } } diff --git a/cloud/cloud.go b/cloud/cloud.go new file mode 100644 index 0000000..0e382de --- /dev/null +++ b/cloud/cloud.go @@ -0,0 +1,77 @@ +package ai + +import ( + "github.com/hashicorp/yamux" + "log" + "net" + "net/http" +) + +func startYamuxClient(serverAddr string) { + // 连接到A服务器 + conn, err := net.Dial("tcp", serverAddr) + if err != nil { + log.Fatal("连接A服务器失败:", err) + } + log.Printf("已成功连接到A服务器: %s", serverAddr) + + // 建立yamux会话 + session, err := yamux.Client(conn, nil) + if err != nil { + log.Fatal(err) + } + + // 保持连接并处理请求 + for { + stream, err := session.Accept() + if err != nil { + log.Println("accept error:", err) + continue + } + go handleRequest(stream) + } + +} + +func handleRequest(stream net.Conn) { + log.Printf("收到新连接: 来自 %s", stream.RemoteAddr()) + + mux := http.NewServeMux() + + mux.HandleFunc("/api/test", func(w http.ResponseWriter, r *http.Request) { + log.Printf("请求头: %v", r.Header) + log.Printf("处理测试接口请求") + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"success","message":"测试接口响应"}`)) + }) + + // 添加默认404处理 + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("404 - 接口不存在")) + }) + + http.Serve(&singleConnListener{conn: stream}, mux) +} + +type singleConnListener struct { + conn net.Conn +} + +func (l *singleConnListener) Accept() (net.Conn, error) { + return l.conn, nil +} + +func (l *singleConnListener) Close() error { + return nil +} + +func (l *singleConnListener) Addr() net.Addr { + return l.conn.LocalAddr() +} + +func main() { + // 这里可以添加命令行参数解析 + serverAddr := "client_url:1234" // P端Client服务地址 + startYamuxClient(serverAddr) +} diff --git a/cloud/container/container.go b/cloud/container/container.go new file mode 100644 index 0000000..2bda7b9 --- /dev/null +++ b/cloud/container/container.go @@ -0,0 +1,36 @@ +package container + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" +) + +type Containers []ISpec + +func NewContainer(container IContainer) *Container { + return &Container{container: container} +} + +type Container struct { + Platform *platform.Platform `json:"platform,omitempty"` + Name string `json:"name,omitempty"` + Image string `json:"image,omitempty"` + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + container IContainer `json:"container,omitempty"` +} + +func (containers *Containers) Containers() ([]*Container, error) { + res := make([]*Container, len(*containers)) + for _, container := range *containers { + spec, err := container.Spec() + if err != nil { + return nil, err + } + res = append(res, spec) + } + return res, nil +} + +func (c *Container) ContainerCreateParameter() { + +} diff --git a/cloud/container/feature.go b/cloud/container/feature.go new file mode 100644 index 0000000..52c9369 --- /dev/null +++ b/cloud/container/feature.go @@ -0,0 +1,7 @@ +package container + +type Features struct { +} + +type FeatureSpec struct { +} diff --git a/cloud/container/interface.go b/cloud/container/interface.go new file mode 100644 index 0000000..640c40f --- /dev/null +++ b/cloud/container/interface.go @@ -0,0 +1,30 @@ +package container + +import ( + "context" +) + +type IContainer interface { + Create(ctx context.Context, param *CreateParam) (interface{}, error) + Delete(ctx context.Context, param *DeleteParam) error + Get(ctx context.Context, param *GetParam) (interface{}, error) + Features() *Features +} + +type ISpec interface { + Spec() (*Container, error) // 自定义参数 + Detail() (interface{}, error) //官方的参数 + Features() *FeatureSpec // 不同平台差异化出参 +} + +type CreateParameter interface { + ContainerCreateParameter() +} + +type DeleteParameter interface { + ContainerDeleteParameter() +} + +type GetParameter interface { + ContainerGetParameter() +} diff --git a/cloud/container/option.go b/cloud/container/option.go new file mode 100644 index 0000000..ef430f6 --- /dev/null +++ b/cloud/container/option.go @@ -0,0 +1,68 @@ +package container + +type CreateParam struct { + ContainerGroupName string `json:"containerGroupName"` + Name string `json:"name"` + Image string `json:"image"` + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + Port int32 `json:"port,omitempty"` + NodePort int32 `json:"nodePort,omitempty"` + MountPath string `json:"mountPath,omitempty"` + Args []string `json:"args,omitempty"` + Envs []struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` + } `json:"envs,omitempty"` + CreateParameter CreateParameter `json:"createParameter,omitempty"` +} + +type K8sCreateParam struct { +} + +type EciCreateParam struct { +} + +func (k K8sCreateParam) ContainerCreateParameter() { +} + +func (e EciCreateParam) ContainerCreateParameter() { +} + +// 删除容器参数 +type DeleteParam struct { + Name string `json:"name,omitempty"` + DeleteParameter DeleteParameter `json:"deleteParameter,omitempty"` +} + +func (k K8sDeleteParam) ContainerDeleteParameter() { +} +func (e EciDeleteParam) ContainerDeleteParameter() { +} + +type K8sDeleteParam struct { +} + +type EciDeleteParam struct { + RegionId string `json:"regionId,omitempty"` + ContainerGroupId string `json:"containerGroupId,omitempty"` +} + +// 获取容器信息 +type GetParam struct { + Name string `json:"name,omitempty"` + GetParameter GetParameter `json:"getParameter,omitempty"` +} + +func (g K8sGetParam) ContainerGetParameter() { +} +func (g EciGetParam) ContainerGetParameter() { +} + +type K8sGetParam struct { +} + +type EciGetParam struct { + RegionId string `json:"regionId,omitempty"` + ContainerGroupName string `json:"containerGroupName,omitempty"` +} diff --git a/cloud/go.mod b/cloud/go.mod new file mode 100644 index 0000000..c827553 --- /dev/null +++ b/cloud/go.mod @@ -0,0 +1,70 @@ +module gitlink.org.cn/JointCloud/pcm-participant-cloud + +go 1.24.0 + +toolchain go1.24.4 + +require ( + github.com/gin-gonic/gin v1.10.1 + github.com/hashicorp/yamux v0.1.2 + k8s.io/api v0.33.2 + k8s.io/apimachinery v0.33.2 + k8s.io/client-go v0.33.2 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/gnostic-models v0.6.9 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + github.com/x448/float16 v0.8.4 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/oauth2 v0.27.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/term v0.30.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect + sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect +) diff --git a/cloud/model/resp.go b/cloud/model/resp.go new file mode 100644 index 0000000..ca40e29 --- /dev/null +++ b/cloud/model/resp.go @@ -0,0 +1,15 @@ +package model + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +func Response(c *gin.Context, code int, msg interface{}, data interface{}) { + c.JSON(http.StatusOK, map[string]interface{}{ + "code": code, + "msg": msg, + "data": data, + }) + return +} diff --git a/cloud/platform/interface.go b/cloud/platform/interface.go new file mode 100644 index 0000000..e9df05a --- /dev/null +++ b/cloud/platform/interface.go @@ -0,0 +1,7 @@ +package platform + +type IPlatform interface { + Type() Type + Name() string + Id() Id +} diff --git a/cloud/platform/platform.go b/cloud/platform/platform.go new file mode 100644 index 0000000..427b0bd --- /dev/null +++ b/cloud/platform/platform.go @@ -0,0 +1,16 @@ +package platform + +const ( + K8s Type = "kubernetes" + Eci Type = "eci" + Serverless Type = "serverless" +) + +type Platform struct { + Name string `json:"name,omitempty"` + Id Id `json:"id,omitempty"` + Type Type `json:"type,omitempty"` +} + +type Type string +type Id string diff --git a/cloud/service/container.go b/cloud/service/container.go new file mode 100644 index 0000000..f2ef323 --- /dev/null +++ b/cloud/service/container.go @@ -0,0 +1,55 @@ +package service + +import ( + "context" + "errors" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" +) + +type Container struct { + container container.IContainer +} + +func NewContainer(container container.IContainer) *Container { + return &Container{container: container} +} + +func (c *Container) Create(ctx context.Context, param *container.CreateParam) (interface{}, error) { + if param.Name == "" { + return nil, errors.New("Name is required") + } + if param.Image == "" { + return nil, errors.New("Image is required") + } + if param.ContainerGroupName == "" { + return nil, errors.New("ContainerGroupName is required") + } + if param == nil { + return nil, nil + } + ctn, err := c.container.Create(ctx, param) + if err != nil { + return nil, err + } + return ctn, nil +} +func (c *Container) Delete(ctx context.Context, param *container.DeleteParam) error { + if param == nil { + return nil + } + err := c.container.Delete(ctx, param) + if err != nil { + return err + } + return nil +} +func (c *Container) Get(ctx context.Context, param *container.GetParam) (interface{}, error) { + if param == nil { + return nil, errors.New("param is required") + } + resp, err := c.container.Get(ctx, param) + if err != nil { + return nil, err + } + return resp, err +} diff --git a/cloud/service/param.go b/cloud/service/param.go new file mode 100644 index 0000000..ba62a24 --- /dev/null +++ b/cloud/service/param.go @@ -0,0 +1,5 @@ +package service + +type ContainerParam struct { + Name string `json:"name,omitempty" form:"name"` +} diff --git a/cloud/service/service.go b/cloud/service/service.go new file mode 100644 index 0000000..85378d0 --- /dev/null +++ b/cloud/service/service.go @@ -0,0 +1,73 @@ +package service + +import ( + "context" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" + eci "gitlink.org.cn/JointCloud/pcm-participant-eci" + k8s "gitlink.org.cn/JointCloud/pcm-participant-k8s" + + "sync" +) + +type Service struct { + containerMap map[platform.Id]*Container + rlock sync.Mutex + dlock sync.Mutex + tlock sync.Mutex +} + +func NewService(platforms ...platform.IPlatform) (*Service, error) { + containerMap := make(map[platform.Id]*Container) + for _, pf := range platforms { + switch pf.Type() { + case platform.K8s: + k8s, ok := pf.(*k8s.Kubernetes) + if !ok { + + } + containerMap[pf.Id()] = NewContainer(k8s.Container) + case platform.Eci: + eci, ok := pf.(*eci.Eci) + if !ok { + + } + containerMap[pf.Id()] = NewContainer(eci.Container) + } + } + + return &Service{containerMap: containerMap}, nil +} + +func (s *Service) CreateContainer(ctx context.Context, pid string, param *container.CreateParam) (interface{}, error) { + fmt.Println(s.containerMap) + for id := range s.containerMap { + fmt.Println(s.containerMap[id]) + } + svc := s.containerMap[platform.Id(pid)] + fmt.Println(svc) + resp, err := svc.Create(ctx, param) + if err != nil { + return nil, err + } + + return resp, nil +} +func (s *Service) DeleteContainer(ctx context.Context, pid string, param *container.DeleteParam) error { + svc := s.containerMap[platform.Id(pid)] + err := svc.Delete(ctx, param) + if err != nil { + return err + } + return nil +} + +func (s *Service) GetContainer(ctx context.Context, pid string, param *container.GetParam) (interface{}, error) { + svc := s.containerMap[platform.Id(pid)] + resp, err := svc.Get(ctx, param) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/cloud/service/service_test.go b/cloud/service/service_test.go new file mode 100644 index 0000000..25b31d8 --- /dev/null +++ b/cloud/service/service_test.go @@ -0,0 +1,135 @@ +package service + +import ( + "context" + "fmt" + "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" + eci "gitlink.org.cn/JointCloud/pcm-participant-eci" + k8s "gitlink.org.cn/JointCloud/pcm-participant-k8s" + "testing" + "time" +) + +func TestCreateService(t *testing.T) { + convey.Convey("Test Service", t, func() { + k8s, err := k8s.New("", "", platform.Id(123)) + if err != nil { + t.Fatalf("failed to create k8s client: %v", err) + } + eci, err := eci.New("", "", "cn-hangzhou", platform.Id(456)) + if err != nil { + t.Fatalf("failed to create eci client: %v", err) + } + + svc, err := NewService(k8s, eci) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + convey.Convey("all train algorithms", func() { + param := container.CreateParam{ + ContainerGroupName: "hello-llama", + Image: "ghcr.io/ggml-org/llama.cpp:server", + //Image: "nginx:latest", + Name: "hello-llama", + Args: []string{"-m", "/models/ERNIE-4.5-0.3B-PT-GGUF/ernie-4.5-0.3b-pt-q4_k_m.gguf", "--port", "8000", "--host", "0.0.0.0", "-n", "512"}, + MountPath: "/models", + Port: 8000, + NodePort: 30000, + Cpu: "100m", + Memory: "256Mi", + } + ctn, err := svc.CreateContainer(ctx, "123", ¶m) + if err != nil { + fmt.Println(err) + } + + convey.So(err, convey.ShouldBeNil) + convey.So(ctn, convey.ShouldNotBeEmpty) + + }) + + }) +} + +func TestDeleteService(t *testing.T) { + convey.Convey("Test Service", t, func() { + k8s, err := k8s.New("eyJhbGciOiJSUzI1NiIsImtpZCI6IkNzNXRMOE5VdWdiVHJ2U2JtU3ZKWk5razRwZlJHWWZmV3M0aVNHLUJJOHMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTg0bW5sIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJjOWU1NjU1OC1lZTRhLTQ1MGUtYTljNy03NGNhNDU4NzEyNGEiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.IxXITCqR8Yv-C3mkC3ItwkDLhNueFk_HMF7QhFtjch8miVhUYH3g2Uh70EB5M_3F8vZIF3CoYd3TLG_1acg2JR9Tf7Ipiwzol3npXIqG27QQJ-px3q2i3CMqwjogKjCSEMWTxHS03CDOCJUFLL2qKIa4U-QmEUYnbOFnNsoXDr7zkgRyREi5QUqlEB1ODMlEy8wb6n1g8E9AqNxnYBeHywAAS8ZMkTiKlEdhi-7Jgblkcssmb_P_5xbWelIy6HfBZuumJICzd8b5JRrkX7m7MaIx4TgNETa17kCFi1JnC6MvC1u3UGQQ7MKiXrud06cN9Sphgnu5nIkFjF5TWpSuaA", "https://119.45.100.73:6443", platform.Id(123)) + if err != nil { + t.Fatalf("failed to create k8s client: %v", err) + } + eci, err := eci.New("LTAI5tLdKjnos44aLvN1XWJk", "FIF0zpGpA8Q0dEraw28VAUTKg7sVLR", "cn-hangzhou", platform.Id(456)) + if err != nil { + t.Fatalf("failed to create eci client: %v", err) + } + + svc, err := NewService(k8s, eci) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + convey.Convey("all train algorithms", func() { + param := container.DeleteParam{ + //DeleteParameter: &container.EciDeleteParam{ + //RegionId: "cn-hangzhou", + //ContainerGroupId: "eci-bp1f6qix5wkkeqhzoc77", + //}, + Name: "hello-llama", + } + err := svc.DeleteContainer(ctx, "123", ¶m) + if err != nil { + fmt.Println(err) + } + + convey.So(err, convey.ShouldBeNil) + convey.So(nil, convey.ShouldNotBeEmpty) + + }) + + }) +} + +func TestGetService(t *testing.T) { + convey.Convey("Test Service", t, func() { + k8s, err := k8s.New("eyJhbGciOiJSUzI1NiIsImtpZCI6IkNzNXRMOE5VdWdiVHJ2U2JtU3ZKWk5razRwZlJHWWZmV3M0aVNHLUJJOHMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTg0bW5sIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJjOWU1NjU1OC1lZTRhLTQ1MGUtYTljNy03NGNhNDU4NzEyNGEiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.IxXITCqR8Yv-C3mkC3ItwkDLhNueFk_HMF7QhFtjch8miVhUYH3g2Uh70EB5M_3F8vZIF3CoYd3TLG_1acg2JR9Tf7Ipiwzol3npXIqG27QQJ-px3q2i3CMqwjogKjCSEMWTxHS03CDOCJUFLL2qKIa4U-QmEUYnbOFnNsoXDr7zkgRyREi5QUqlEB1ODMlEy8wb6n1g8E9AqNxnYBeHywAAS8ZMkTiKlEdhi-7Jgblkcssmb_P_5xbWelIy6HfBZuumJICzd8b5JRrkX7m7MaIx4TgNETa17kCFi1JnC6MvC1u3UGQQ7MKiXrud06cN9Sphgnu5nIkFjF5TWpSuaA", "https://119.45.100.73:6443", platform.Id(123)) + if err != nil { + t.Fatalf("failed to create k8s client: %v", err) + } + eci, err := eci.New("LTAI5tLdKjnos44aLvN1XWJk", "FIF0zpGpA8Q0dEraw28VAUTKg7sVLR", "cn-hangzhou", platform.Id(456)) + if err != nil { + t.Fatalf("failed to create eci client: %v", err) + } + + svc, err := NewService(k8s, eci) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + convey.Convey("all train algorithms", func() { + param := container.GetParam{ + Name: "hello-nginx", + GetParameter: &container.EciGetParam{}, + } + resp, err := svc.GetContainer(ctx, "456", ¶m) + if err != nil { + fmt.Println(err) + } + fmt.Println(resp) + convey.So(err, convey.ShouldBeNil) + convey.So(resp, convey.ShouldNotBeEmpty) + + }) + + }) +} diff --git a/cloud/task/interface.go b/cloud/task/interface.go new file mode 100644 index 0000000..94f30b8 --- /dev/null +++ b/cloud/task/interface.go @@ -0,0 +1,13 @@ +package task + +import "context" + +type Task interface { + Container +} + +type Container interface { + CreateContainer(ctx context.Context, params *TaskParams) (interface{}, error) + GetContainer(ctx context.Context, id string) (interface{}, error) + DeleteContainer(ctx context.Context, id string) error +} diff --git a/cloud/task/option.go b/cloud/task/option.go new file mode 100644 index 0000000..b028dfd --- /dev/null +++ b/cloud/task/option.go @@ -0,0 +1 @@ +package task diff --git a/cloud/task/task.go b/cloud/task/task.go new file mode 100644 index 0000000..918abf8 --- /dev/null +++ b/cloud/task/task.go @@ -0,0 +1,9 @@ +package task + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" +) + +type TaskParams struct { + Container container.CreateParameter +} diff --git a/go.work b/go.work index a8b2d30..f470400 100644 --- a/go.work +++ b/go.work @@ -5,10 +5,15 @@ toolchain go1.24.0 use ( ./ai ./client + ./cloud ./common ./jcs ./participant + ./participant/eci + ./participant/k8s + ./participant/octopus ./participant/openI + ./participant/serverless ./platform hpc participant/octopus @@ -19,4 +24,6 @@ replace gitlink.org.cn/JointCloud/pcm-participant-common v0.0.0 => ./common replace gitlink.org.cn/JointCloud/pcm-participant-ai v0.0.0 => ./ai -replace gitlink.org.cn/JointCloud/pcm-participant-openi v0.0.0 => ./participant/openI \ No newline at end of file +replace gitlink.org.cn/JointCloud/pcm-participant-openi v0.0.0 => ./participant/openI + +replace gitlink.org.cn/JointCloud/pcm-participant-octopus v0.0.0 => ./participant/octopus diff --git a/jcs/go.mod b/jcs/go.mod index 5e74636..239c2da 100644 --- a/jcs/go.mod +++ b/jcs/go.mod @@ -2,8 +2,6 @@ module gitlink.org.cn/JointCloud/jcs go 1.23.0 -toolchain go1.24.1 - require ( github.com/go-git/go-git/v5 v5.13.2 github.com/go-resty/resty/v2 v2.16.5 @@ -11,7 +9,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/json-iterator/go v1.1.12 github.com/pkg/errors v0.9.1 - gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250321070427-b3addd77a29d gopkg.in/yaml.v3 v3.0.1 ) @@ -19,45 +16,26 @@ require ( dario.cat/mergo v1.0.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/ProtonMail/go-crypto v1.1.5 // indirect - github.com/bytedance/sonic v1.13.2 // indirect - github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cloudflare/circl v1.3.7 // indirect - github.com/cloudwego/base64x v0.1.5 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect github.com/emirpasic/gods v1.18.1 // indirect - github.com/gabriel-vasile/mimetype v1.4.8 // indirect - github.com/gin-contrib/sse v1.0.0 // indirect - github.com/gin-gonic/gin v1.10.0 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.6.2 // indirect - github.com/go-playground/locales v0.14.1 // indirect - github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.25.0 // indirect - github.com/goccy/go-json v0.10.5 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.10 // indirect - github.com/leodido/go-urn v1.4.0 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pjbgf/sha1cd v0.3.2 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/skeema/knownhosts v1.3.0 // indirect - github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/ugorji/go/codec v1.2.12 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect - golang.org/x/arch v0.15.0 // indirect golang.org/x/crypto v0.36.0 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.37.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect ) diff --git a/participant/eci/apis/container.go b/participant/eci/apis/container.go new file mode 100644 index 0000000..95d47f6 --- /dev/null +++ b/participant/eci/apis/container.go @@ -0,0 +1,30 @@ +package apis + +import ( + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-eci/common" + "gitlink.org.cn/JointCloud/pcm-participant-eci/model" + "gitlink.org.cn/JointCloud/pcm-participant-eci/service" + "net/http" +) + +func CreateContainer(ctx *gin.Context) { + var param model.CreateContainerParam + if err := ctx.BindJSON(¶m); err != nil { + model.Response(ctx, http.StatusBadRequest, common.INVALIDPARAMS, err) + return + } + client := common.GetEciClient() + resp, err := service.CreateContainer(client, ¶m) + if err != nil { + return + } + if !resp.IsSuccess() { + + model.Response(ctx, 500, common.INVOKEERROR, resp.String()) + return + + } + + model.Response(ctx, http.StatusOK, common.SUCCESS, resp.ContainerGroupId) +} diff --git a/participant/eci/app.go b/participant/eci/app.go new file mode 100644 index 0000000..110dac6 --- /dev/null +++ b/participant/eci/app.go @@ -0,0 +1,13 @@ +package eci + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-eci/initialize" + "gitlink.org.cn/JointCloud/pcm-participant-eci/router" +) + +func main() { + //初始化公共配置 + initialize.InitConfig() + rt, _ := router.Create() + _ = rt.Run(":2028") +} diff --git a/participant/eci/common/client.go b/participant/eci/common/client.go new file mode 100644 index 0000000..c315031 --- /dev/null +++ b/participant/eci/common/client.go @@ -0,0 +1,80 @@ +package common + +import ( + "crypto/tls" + "errors" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-eci/model" + "net/http" + "time" + + "github.com/go-resty/resty/v2" +) + +var ( + NoRedirectClient *resty.Client + RestyClient *resty.Client + HttpClient *http.Client +) +var UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" +var DefaultTimeout = time.Second * 300 + +func InitClient() { + NoRedirectClient = resty.New().SetRedirectPolicy( + resty.RedirectPolicyFunc(func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }), + ).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + NoRedirectClient.SetHeader("user-agent", UserAgent) + + RestyClient = NewRestyClient() + HttpClient = NewHttpClient() +} + +func NewRestyClient() *resty.Client { + client := resty.New(). + SetHeader("user-agent", UserAgent). + SetRetryCount(3). + SetRetryResetReaders(true). + SetTimeout(DefaultTimeout). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + return client +} + +func NewHttpClient() *http.Client { + return &http.Client{ + Timeout: time.Hour * 48, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } +} + +func Request(url string, method string, callback ReqCallback) ([]byte, error) { + respErr := &model.RespErr{} + req := RestyClient.R(). + SetHeaders(map[string]string{ + "Content-Type": "application/json", + }). + SetError(&respErr) + + if callback != nil { + callback(req) + } + + res, err := req.Execute(method, OPENIPREFIX+url) + + if err != nil { + return nil, err + } + if respErr.Message != "" { + return nil, errors.New(respErr.Message) + } + + if res.StatusCode() != http.StatusOK && res.StatusCode() != http.StatusCreated { + return nil, errors.New(fmt.Sprintf("msg: %s, status: %d", res.String(), res.StatusCode())) + } + + return res.Body(), nil +} diff --git a/participant/eci/common/const.go b/participant/eci/common/const.go new file mode 100644 index 0000000..34732ac --- /dev/null +++ b/participant/eci/common/const.go @@ -0,0 +1,64 @@ +package common + +const ( + MaxChunkSize int64 = 1024 * 1024 * 64 //64MB + + Ip = "https://api.aliyun.com" + User = "c2net@pcl.ac.cn" + Pwd = "c2net123" + + QUESTION_MARK = "?" + TIMEOUT = 10 + OPENIPREFIX = "https://openi.pcl.ac.cn" + ACCESSTOKEN = "access_token" + IPADDR = "addr" + // user + USERINFO = "/api/v1/user" + Forward_Slash = "/" + GetToken = "openaiserver/v1/authmanage/token" + + CreateContainer = "/api/Eci/2018-08-08/CreateContainerGroup" + // datasets upload + GetChunksUrl = "/api/v1/attachments/get_chunks" //获取当前需要上传文件的chunk信息 + NewMultipartUrl = "/api/v1/attachments/new_multipart" //获取文件上传的需要的信息 + GetMultipartUrl = "/api/v1/attachments/get_multipart_url" //获取文件上传的地址 + CompleteMultipartUrl = "/api/v1/attachments/complete_multipart" //完成上传接口 //上传文件到数据集 + + // task + TASKCREATIONREQUIRED = "/api/v1/{username}/{reponame}/ai_task/creation/required" // 查询创建任务所需资源接口 + TASKCREATIONIMAGEBYSPEC = "/api/v1/{username}/{reponame}/ai_task/creation/image_by_spec" // 根据选择的规格获取镜像(计算资源是NPU时使用) + TASKCREATE = "/api/v1/{username}/{reponame}/ai_task/create" // 创建任务 + TASKLIST = "/api/v1/{username}/{reponame}/ai_task/list" // 任务列表 + TASKDETAIL = "/api/v1/{username}/{reponame}/ai_task" // 查询任务详情 + TASKSTOP = "/api/v1/{username}/{reponame}/ai_task/stop" // 停止任务接口 + TASKOUTPUT = "/api/v1/{username}/{reponame}/ai_task/output" // 查询结果列表接口 + TASKRESULTDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/output/download/all" // 所有结果下载接口 + TASKLOGDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/log/download" //日志下载 + SelfEndpointUrl = "/api/v1/{username}/{reponame}/ai_task/self_endpoint_url" //在线推理接口 + + // model + MODELCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_new_model" //模型新增接口 + MODELGETBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byId" //根据模型ID查询模型信息接口 + MODELDOWNLOADBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/downloadall" + QUERYMODELBYNAME = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byName" //根据模型名称查询模型 + PageModel = "/api/v1/repos/{username}/{reponame}/modelmanage/show_model_api" //分页查询模型 + QueryAllModelFile = "/api/v1/all_model_data" //查询所有模型文件 + + // model local create + MODELLOCALCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_local_model" //创建一条本地模型记录 + MODELLOCALGETUPLOADEDCHUNKS = "/api/v1/attachments/model/get_chunks" //获取该文件已经上传的分片接口 + MODELLOCALNEWMULTIPART = "/api/v1/attachments/model/new_multipart" //开启一个本地模型上传 + MODELLOCALGETMULTIPARTURL = "/api/v1/attachments/model/get_multipart_url" //获取模型分片传输链接,并进行上传 + MODELLOCALCOMPLETEMULTIPART = "/api/v1/attachments/model/complete_multipart" //完成模型文件上传 +) + +const ( + SUCCESS = "success" +) + +// error +const ( + INVOKEERROR = "failed to invoke" + INVALIDPARAMS = "invalid Request params" + NOTFOUND = "not found" +) diff --git a/participant/eci/common/token.go b/participant/eci/common/token.go new file mode 100644 index 0000000..3c1515c --- /dev/null +++ b/participant/eci/common/token.go @@ -0,0 +1,22 @@ +package common + +import ( + "github.com/aliyun/alibaba-cloud-sdk-go/sdk" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials" + "github.com/aliyun/alibaba-cloud-sdk-go/services/eci" +) + +const ( + accessKeyId = "" + accessKeySecret = "" +) + +func GetEciClient() *eci.Client { + config := sdk.NewConfig() + credential := credentials.NewAccessKeyCredential(accessKeyId, accessKeySecret) + client, err := eci.NewClientWithOptions("cn-hangzhou", config, credential) + if err != nil { + panic(err) + } + return client +} diff --git a/participant/eci/common/types.go b/participant/eci/common/types.go new file mode 100644 index 0000000..b8dffad --- /dev/null +++ b/participant/eci/common/types.go @@ -0,0 +1,12 @@ +package common + +import "github.com/go-resty/resty/v2" + +type Json map[string]interface{} + +type TokenResp struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` +} + +type ReqCallback func(req *resty.Request) diff --git a/participant/eci/common/util.go b/participant/eci/common/util.go new file mode 100644 index 0000000..c09d8af --- /dev/null +++ b/participant/eci/common/util.go @@ -0,0 +1,80 @@ +package common + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "io" + "mime/multipart" + "reflect" + "strconv" +) + +func GetFileMd5(file multipart.File) (string, error) { + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + return "", err + } + + // 计算MD5并转换为16进制字符串 + md5Bytes := hash.Sum(nil) + md5Str := hex.EncodeToString(md5Bytes) + return md5Str, nil +} + +func Bool2String(b bool) string { + return strconv.FormatBool(b) +} + +// StructToMapWithTag 将结构体转换为 map[string]string,key 使用指定标签的值 +func StructToMapWithTag(obj interface{}, tagName string) (map[string]string, error) { + result := make(map[string]string) + + // 获取值的反射对象 + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = val.Elem() // 解引用指针 + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("input is not a struct or pointer to struct") + } + + // 获取类型信息 + typ := val.Type() + + // 遍历结构体字段 + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // 获取字段的标签值 + tagValue := fieldType.Tag.Get(tagName) + if tagValue == "" { + continue // 如果标签不存在,跳过该字段 + } + + // 获取字段值并转换为字符串 + var fieldValue string + switch field.Kind() { + case reflect.String: + fieldValue = field.String() + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + fieldValue = strconv.FormatInt(field.Int(), 10) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + fieldValue = strconv.FormatUint(field.Uint(), 10) + case reflect.Float32, reflect.Float64: + fieldValue = strconv.FormatFloat(field.Float(), 'f', -1, 64) + case reflect.Bool: + fieldValue = strconv.FormatBool(field.Bool()) + default: + // 如果字段类型不支持,跳过 + continue + } + + // 将标签值和字段值存入 map + result[tagValue] = fieldValue + } + + return result, nil +} diff --git a/participant/eci/eci.go b/participant/eci/eci.go new file mode 100644 index 0000000..f5e5bda --- /dev/null +++ b/participant/eci/eci.go @@ -0,0 +1,110 @@ +package eci + +import ( + "context" + "errors" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/aliyun/alibaba-cloud-sdk-go/services/eci" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" + "gitlink.org.cn/JointCloud/pcm-participant-eci/model" + "gitlink.org.cn/JointCloud/pcm-participant-eci/service" +) + +type Eci struct { + Container *Container + opt *Option +} + +type Container struct { + opt *Option + ft *container.Features +} + +type Option struct { + platform *platform.Platform + client *eci.Client +} + +func New(accessKeyId, accessKeySecret, regionId string, id platform.Id) (*Eci, error) { + config := sdk.NewConfig() + credential := credentials.NewAccessKeyCredential(accessKeyId, accessKeySecret) + client, err := eci.NewClientWithOptions(regionId, config, credential) + if err != nil { + panic(err) + } + opt := &Option{ + client: client, + platform: &platform.Platform{ + Id: id, + Type: platform.Eci, + }, + } + container := &Container{ + opt: opt, + } + return &Eci{opt: opt, Container: container}, nil +} +func (e *Eci) Name() string { + return e.opt.platform.Name +} +func (e *Eci) Type() platform.Type { + return e.opt.platform.Type +} +func (e *Eci) Id() platform.Id { + return e.opt.platform.Id +} +func (c *Container) Create(ctx context.Context, param *container.CreateParam) (interface{}, error) { + if param.Name == "" { + return nil, errors.New("Name is required") + } + if param.Image == "" { + return nil, errors.New("Image is required") + } + if param.ContainerGroupName == "" { + return nil, errors.New("ContainerGroupName is required") + } + cParam := model.CreateContainerParam{ + Containers: &[]eci.CreateContainerGroupContainer{ + { + Image: param.Image, + Name: param.Name, + Cpu: requests.Float(param.Cpu), + Memory: requests.Float(param.Memory), + }, + }, + ContainerGroupName: param.ContainerGroupName, + } + resp, err := service.CreateContainer(c.opt.client, &cParam) + if err != nil { + return nil, err + } + return resp, nil +} +func (c *Container) Delete(ctx context.Context, param *container.DeleteParam) error { + deleteParam := param.DeleteParameter.(*container.EciDeleteParam) + err := service.DeleteContainer(c.opt.client, &model.DeleteContainerParam{ + RegionId: deleteParam.RegionId, + ContainerGroupId: deleteParam.ContainerGroupId, + }) + if err != nil { + return err + } + return nil +} +func (c *Container) Features() *container.Features { + return nil +} +func (c *Container) Get(ctx context.Context, param *container.GetParam) (interface{}, error) { + getParam := param.GetParameter.(*container.EciGetParam) + resp, err := service.GetContainer(c.opt.client, &model.GetContainerParam{ + RegionId: getParam.RegionId, + ContainerGroupName: getParam.ContainerGroupName, + }) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/participant/eci/go.mod b/participant/eci/go.mod new file mode 100644 index 0000000..dc0ec3e --- /dev/null +++ b/participant/eci/go.mod @@ -0,0 +1,45 @@ +module gitlink.org.cn/JointCloud/pcm-participant-eci + +go 1.23.10 + +require ( + github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 + github.com/gin-gonic/gin v1.10.1 + github.com/go-resty/resty/v2 v2.16.5 + github.com/smartystreets/goconvey v1.8.1 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/smarty/assertions v1.15.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/participant/eci/initialize/common.go b/participant/eci/initialize/common.go new file mode 100644 index 0000000..fb9f665 --- /dev/null +++ b/participant/eci/initialize/common.go @@ -0,0 +1,9 @@ +package initialize + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-eci/common" +) + +func InitConfig() { + common.InitClient() +} diff --git a/participant/eci/model/container.go b/participant/eci/model/container.go new file mode 100644 index 0000000..da0e9a1 --- /dev/null +++ b/participant/eci/model/container.go @@ -0,0 +1,36 @@ +package model + +import ( + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/aliyun/alibaba-cloud-sdk-go/services/eci" +) + +type CreateContainerParam struct { + RegionId string `json:"regionId"` + ContainerGroupName string `json:"containerGroupName"` + Containers *[]eci.CreateContainerGroupContainer `json:"containers"` +} +type Container struct { + Name string `json:"name"` + Image string `json:"image"` + Memory requests.Float `json:"memory"` + Cpu requests.Float `json:"cpu"` +} + +type CreateContainerRemote struct { + Success bool `json:"success"` + Payload struct { + RequestId string `json:"requestId"` + ContainerGroupId string `json:"containerGroupId"` + } `json:"payload"` + Error interface{} `json:"error"` +} +type DeleteContainerParam struct { + RegionId string `json:"regionId"` + ContainerGroupId string `json:"containerGroupId"` +} + +type GetContainerParam struct { + RegionId string `json:"regionId"` + ContainerGroupName string `json:"containerGroupName"` +} diff --git a/participant/eci/model/error.go b/participant/eci/model/error.go new file mode 100644 index 0000000..f3d4e70 --- /dev/null +++ b/participant/eci/model/error.go @@ -0,0 +1,29 @@ +package model + +// Error Model +// +// The Error contains error relevant information. +// +// swagger:model Error +type Error struct { + // The general error message + // + // required: true + // example: Unauthorized + Error string `json:"error"` + // The http error code. + // + // required: true + // example: 401 + ErrorCode int `json:"errorCode"` + // The http error code. + // + // required: true + // example: you need to provide a valid access token or user credentials to access this api + ErrorDescription string `json:"errorDescription"` +} + +type RespErr struct { + Code string `json:"code"` + Message string `json:"message"` +} diff --git a/participant/eci/model/response.go b/participant/eci/model/response.go new file mode 100644 index 0000000..ca40e29 --- /dev/null +++ b/participant/eci/model/response.go @@ -0,0 +1,15 @@ +package model + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +func Response(c *gin.Context, code int, msg interface{}, data interface{}) { + c.JSON(http.StatusOK, map[string]interface{}{ + "code": code, + "msg": msg, + "data": data, + }) + return +} diff --git a/participant/eci/model/token.go b/participant/eci/model/token.go new file mode 100644 index 0000000..6a9037a --- /dev/null +++ b/participant/eci/model/token.go @@ -0,0 +1,15 @@ +package model + +type Token struct { + Success bool `json:"success"` + Payload struct { + Token string `json:"token"` + Expiration int `json:"expiration"` + } `json:"payload"` + Error interface{} `json:"error"` +} + +type TokenParam struct { + User string `json:"user"` + Pwd string `json:"pwd"` +} diff --git a/participant/eci/router/router.go b/participant/eci/router/router.go new file mode 100644 index 0000000..01b25eb --- /dev/null +++ b/participant/eci/router/router.go @@ -0,0 +1,58 @@ +package router + +import ( + "fmt" + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-eci/apis" + "regexp" + "time" +) + +func Create() (*gin.Engine, error) { + g := gin.New() + + g.Use(gin.LoggerWithFormatter(logFormatter), gin.Recovery()) + g.Use(gin.Logger()) + g.Use(gin.Recovery()) + + api := g.Group("/api") + v1 := api.Group("/v1") + { + + // container + container := v1.Group("container") + container.POST("/create", apis.CreateContainer) + + } + + return g, nil +} + +var tokenRegexp = regexp.MustCompile("token=[^&]+") + +func logFormatter(param gin.LogFormatterParams) string { + if (param.ClientIP == "127.0.0.1" || param.ClientIP == "::1") && param.Path == "/health" { + return "" + } + + var statusColor, methodColor, resetColor string + if param.IsOutputColor() { + statusColor = param.StatusCodeColor() + methodColor = param.MethodColor() + resetColor = param.ResetColor() + } + + if param.Latency > time.Minute { + param.Latency = param.Latency - param.Latency%time.Second + } + path := tokenRegexp.ReplaceAllString(param.Path, "token=[masked]") + return fmt.Sprintf("%v |%s %3d %s| %13v | %15s |%s %-7s %s %#v\n%s", + param.TimeStamp.Format(time.RFC3339), + statusColor, param.StatusCode, resetColor, + param.Latency, + param.ClientIP, + methodColor, param.Method, resetColor, + path, + param.ErrorMessage, + ) +} diff --git a/participant/eci/service/container.go b/participant/eci/service/container.go new file mode 100644 index 0000000..06510ba --- /dev/null +++ b/participant/eci/service/container.go @@ -0,0 +1,47 @@ +package service + +import ( + "fmt" + "github.com/aliyun/alibaba-cloud-sdk-go/services/eci" + "gitlink.org.cn/JointCloud/pcm-participant-eci/model" +) + +type ContainerService struct { +} + +func CreateContainer(client *eci.Client, param *model.CreateContainerParam) (*eci.CreateContainerGroupResponse, error) { + + request := eci.CreateCreateContainerGroupRequest() + request.RegionId = param.RegionId + request.Container = param.Containers + request.ContainerGroupName = param.ContainerGroupName + // 发送请求 + response, err := client.CreateContainerGroup(request) + if err != nil { + fmt.Printf("Error: %v\n", err) + return nil, err + } + return response, nil +} + +func DeleteContainer(client *eci.Client, param *model.DeleteContainerParam) error { + request := eci.CreateDeleteContainerGroupRequest() + request.RegionId = param.RegionId + request.ContainerGroupId = param.ContainerGroupId + _, err := client.DeleteContainerGroup(request) + if err != nil { + return err + } + return nil +} + +func GetContainer(client *eci.Client, param *model.GetContainerParam) (interface{}, error) { + request := eci.CreateDescribeContainerGroupsRequest() + request.RegionId = param.RegionId + request.ContainerGroupName = param.ContainerGroupName + resp, err := client.DescribeContainerGroups(request) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/participant/eci/service/service.go b/participant/eci/service/service.go new file mode 100644 index 0000000..f188694 --- /dev/null +++ b/participant/eci/service/service.go @@ -0,0 +1,3 @@ +package service + +import () diff --git a/participant/go.mod b/participant/go.mod index 114a914..36509e9 100644 --- a/participant/go.mod +++ b/participant/go.mod @@ -1,3 +1,3 @@ module gitlink.org.cn/JointCloud/pcm-participant -go 1.22.0 +go 1.23.10 diff --git a/participant/k8s/apis/container.go b/participant/k8s/apis/container.go new file mode 100644 index 0000000..04667f4 --- /dev/null +++ b/participant/k8s/apis/container.go @@ -0,0 +1,25 @@ +package apis + +import ( + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/common" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/service" + "net/http" +) + +func CreateContainer(ctx *gin.Context) { + var param model.CreateContainerParam + if err := ctx.BindJSON(¶m); err != nil { + model.Response(ctx, http.StatusBadRequest, common.INVALIDPARAMS, err) + return + } + client := common.GetK8sClient() + err := service.CreateContainer(client, ¶m) + if err != nil { + model.Response(ctx, 500, common.INVOKEERROR, err) + return + } + + model.Response(ctx, http.StatusOK, common.SUCCESS, err) +} diff --git a/participant/k8s/app.go b/participant/k8s/app.go new file mode 100644 index 0000000..e126885 --- /dev/null +++ b/participant/k8s/app.go @@ -0,0 +1,13 @@ +package k8s + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-k8s/initialize" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/router" +) + +func main() { + //初始化公共配置 + initialize.InitConfig() + rt, _ := router.Create() + _ = rt.Run(":2028") +} diff --git a/participant/k8s/common/client.go b/participant/k8s/common/client.go new file mode 100644 index 0000000..53e2b02 --- /dev/null +++ b/participant/k8s/common/client.go @@ -0,0 +1,80 @@ +package common + +import ( + "crypto/tls" + "errors" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + "net/http" + "time" + + "github.com/go-resty/resty/v2" +) + +var ( + NoRedirectClient *resty.Client + RestyClient *resty.Client + HttpClient *http.Client +) +var UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" +var DefaultTimeout = time.Second * 300 + +func InitClient() { + NoRedirectClient = resty.New().SetRedirectPolicy( + resty.RedirectPolicyFunc(func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }), + ).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + NoRedirectClient.SetHeader("user-agent", UserAgent) + + RestyClient = NewRestyClient() + HttpClient = NewHttpClient() +} + +func NewRestyClient() *resty.Client { + client := resty.New(). + SetHeader("user-agent", UserAgent). + SetRetryCount(3). + SetRetryResetReaders(true). + SetTimeout(DefaultTimeout). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + return client +} + +func NewHttpClient() *http.Client { + return &http.Client{ + Timeout: time.Hour * 48, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } +} + +func Request(url string, method string, callback ReqCallback) ([]byte, error) { + respErr := &model.RespErr{} + req := RestyClient.R(). + SetHeaders(map[string]string{ + "Content-Type": "application/json", + }). + SetError(&respErr) + + if callback != nil { + callback(req) + } + + res, err := req.Execute(method, OPENIPREFIX+url) + + if err != nil { + return nil, err + } + if respErr.Message != "" { + return nil, errors.New(respErr.Message) + } + + if res.StatusCode() != http.StatusOK && res.StatusCode() != http.StatusCreated { + return nil, errors.New(fmt.Sprintf("msg: %s, status: %d", res.String(), res.StatusCode())) + } + + return res.Body(), nil +} diff --git a/participant/k8s/common/const.go b/participant/k8s/common/const.go new file mode 100644 index 0000000..8c49303 --- /dev/null +++ b/participant/k8s/common/const.go @@ -0,0 +1,74 @@ +package common + +const ( + MaxChunkSize int64 = 1024 * 1024 * 64 //64MB + + QUESTION_MARK = "?" + TIMEOUT = 10 + OPENIPREFIX = "https://openi.pcl.ac.cn" + ACCESSTOKEN = "access_token" + // user + USERINFO = "/api/v1/user" + IPADDR = "addr" + // repo + REPO = "/api/v1/user/repos" + RepoFile = "/api/v1/repos/{username}/{reponame}/contents/{filepath}" //上传文件到项目中、修改项目中的文件内容 + + // image + IMAGERECOMMENDED = "/api/v1/images/recommend" + IMAGECUSTOM = "/api/v1/images/custom" + IMAGESTARED = "/api/v1/images/star" + + // datasets + DATASETCURRENT = "/api/v1/datasets/{username}/{reponame}/current_repo" //查询当前项目的数据集接口 + DATASETMINE = "/api/v1/datasets/{username}/{reponame}/my_datasets" //我上传的数据集 + DATASETPUBLIC = "/api/v1/datasets/{username}/{reponame}/public_datasets" //查询公开数据集 + DATASETFAVORITE = "/api/v1/datasets/{username}/{reponame}/my_favorite" //查询我收藏的数据集 + DATASETEXISTEXPORT = "/api/v1/datasets/{username}/{reponame}/model/export_exist_dataset" //将用户选择的文件从训练任务结果中导出到数据集中 POST请求 + DATASETCREATE = "/api/v1/datasets/{username}/{reponame}/create" // + BaseDatasetsUrl = "/api/v1/datasets/{username}/{reponame}" //数据集列表 //数据集基本接口 + + // datasets upload + GetChunksUrl = "/api/v1/attachments/get_chunks" //获取当前需要上传文件的chunk信息 + NewMultipartUrl = "/api/v1/attachments/new_multipart" //获取文件上传的需要的信息 + GetMultipartUrl = "/api/v1/attachments/get_multipart_url" //获取文件上传的地址 + CompleteMultipartUrl = "/api/v1/attachments/complete_multipart" //完成上传接口 //上传文件到数据集 + + // task + TASKCREATIONREQUIRED = "/api/v1/{username}/{reponame}/ai_task/creation/required" // 查询创建任务所需资源接口 + TASKCREATIONIMAGEBYSPEC = "/api/v1/{username}/{reponame}/ai_task/creation/image_by_spec" // 根据选择的规格获取镜像(计算资源是NPU时使用) + TASKCREATE = "/api/v1/{username}/{reponame}/ai_task/create" // 创建任务 + TASKLIST = "/api/v1/{username}/{reponame}/ai_task/list" // 任务列表 + TASKDETAIL = "/api/v1/{username}/{reponame}/ai_task" // 查询任务详情 + TASKSTOP = "/api/v1/{username}/{reponame}/ai_task/stop" // 停止任务接口 + TASKOUTPUT = "/api/v1/{username}/{reponame}/ai_task/output" // 查询结果列表接口 + TASKRESULTDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/output/download/all" // 所有结果下载接口 + TASKLOGDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/log/download" //日志下载 + SelfEndpointUrl = "/api/v1/{username}/{reponame}/ai_task/self_endpoint_url" //在线推理接口 + + // model + MODELCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_new_model" //模型新增接口 + MODELGETBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byId" //根据模型ID查询模型信息接口 + MODELDOWNLOADBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/downloadall" + QUERYMODELBYNAME = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byName" //根据模型名称查询模型 + PageModel = "/api/v1/repos/{username}/{reponame}/modelmanage/show_model_api" //分页查询模型 + QueryAllModelFile = "/api/v1/all_model_data" //查询所有模型文件 + + // model local create + MODELLOCALCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_local_model" //创建一条本地模型记录 + MODELLOCALGETUPLOADEDCHUNKS = "/api/v1/attachments/model/get_chunks" //获取该文件已经上传的分片接口 + MODELLOCALNEWMULTIPART = "/api/v1/attachments/model/new_multipart" //开启一个本地模型上传 + MODELLOCALGETMULTIPARTURL = "/api/v1/attachments/model/get_multipart_url" //获取模型分片传输链接,并进行上传 + MODELLOCALCOMPLETEMULTIPART = "/api/v1/attachments/model/complete_multipart" //完成模型文件上传 +) + +const ( + SUCCESS = "success" +) + +// error +const ( + INVOKEERROR = "failed to invoke" + INVALIDPARAMS = "invalid Request params" + NOTFOUND = "not found" +) diff --git a/participant/k8s/common/token.go b/participant/k8s/common/token.go new file mode 100644 index 0000000..833ef00 --- /dev/null +++ b/participant/k8s/common/token.go @@ -0,0 +1,30 @@ +package common + +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "time" +) + +const ( + host = "https://119.45.100.73:6443" + bearerToken = "eyJhbGciOiJSUzI1NiIsImtpZCI6IkNzNXRMOE5VdWdiVHJ2U2JtU3ZKWk5razRwZlJHWWZmV3M0aVNHLUJJOHMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTg0bW5sIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJjOWU1NjU1OC1lZTRhLTQ1MGUtYTljNy03NGNhNDU4NzEyNGEiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.IxXITCqR8Yv-C3mkC3ItwkDLhNueFk_HMF7QhFtjch8miVhUYH3g2Uh70EB5M_3F8vZIF3CoYd3TLG_1acg2JR9Tf7Ipiwzol3npXIqG27QQJ-px3q2i3CMqwjogKjCSEMWTxHS03CDOCJUFLL2qKIa4U-QmEUYnbOFnNsoXDr7zkgRyREi5QUqlEB1ODMlEy8wb6n1g8E9AqNxnYBeHywAAS8ZMkTiKlEdhi-7Jgblkcssmb_P_5xbWelIy6HfBZuumJICzd8b5JRrkX7m7MaIx4TgNETa17kCFi1JnC6MvC1u3UGQQ7MKiXrud06cN9Sphgnu5nIkFjF5TWpSuaA" +) + +func GetK8sClient() *kubernetes.Clientset { + // 1. 加载 kubeconfig 配置(本地开发用) + restConfig := &rest.Config{ + Timeout: 10 * time.Second, + Host: host, + BearerToken: bearerToken, + TLSClientConfig: rest.TLSClientConfig{ + Insecure: true, + }, + } + // 2. 创建 clientset + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + panic(err) + } + return client +} diff --git a/participant/k8s/common/types.go b/participant/k8s/common/types.go new file mode 100644 index 0000000..b8dffad --- /dev/null +++ b/participant/k8s/common/types.go @@ -0,0 +1,12 @@ +package common + +import "github.com/go-resty/resty/v2" + +type Json map[string]interface{} + +type TokenResp struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` +} + +type ReqCallback func(req *resty.Request) diff --git a/participant/k8s/common/util.go b/participant/k8s/common/util.go new file mode 100644 index 0000000..ea19722 --- /dev/null +++ b/participant/k8s/common/util.go @@ -0,0 +1,92 @@ +package common + +import ( + "crypto/md5" + "crypto/tls" + "encoding/hex" + "fmt" + "github.com/go-resty/resty/v2" + "io" + "mime/multipart" + "reflect" + "strconv" + "time" +) + +func GetRestyRequest() *resty.Request { + client := resty.New().SetTimeout(time.Duration(5) * time.Second) + client.SetTLSClientConfig(&tls.Config{ + InsecureSkipVerify: true, // Only for development/testing + }) + request := client.R() + return request +} + +func GetFileMd5(file multipart.File) (string, error) { + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + return "", err + } + + // 计算MD5并转换为16进制字符串 + md5Bytes := hash.Sum(nil) + md5Str := hex.EncodeToString(md5Bytes) + return md5Str, nil +} + +func Bool2String(b bool) string { + return strconv.FormatBool(b) +} + +// StructToMapWithTag 将结构体转换为 map[string]string,key 使用指定标签的值 +func StructToMapWithTag(obj interface{}, tagName string) (map[string]string, error) { + result := make(map[string]string) + + // 获取值的反射对象 + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = val.Elem() // 解引用指针 + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("input is not a struct or pointer to struct") + } + + // 获取类型信息 + typ := val.Type() + + // 遍历结构体字段 + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // 获取字段的标签值 + tagValue := fieldType.Tag.Get(tagName) + if tagValue == "" { + continue // 如果标签不存在,跳过该字段 + } + + // 获取字段值并转换为字符串 + var fieldValue string + switch field.Kind() { + case reflect.String: + fieldValue = field.String() + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + fieldValue = strconv.FormatInt(field.Int(), 10) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + fieldValue = strconv.FormatUint(field.Uint(), 10) + case reflect.Float32, reflect.Float64: + fieldValue = strconv.FormatFloat(field.Float(), 'f', -1, 64) + case reflect.Bool: + fieldValue = strconv.FormatBool(field.Bool()) + default: + // 如果字段类型不支持,跳过 + continue + } + + // 将标签值和字段值存入 map + result[tagValue] = fieldValue + } + + return result, nil +} diff --git a/participant/k8s/go.mod b/participant/k8s/go.mod new file mode 100644 index 0000000..d04ea2c --- /dev/null +++ b/participant/k8s/go.mod @@ -0,0 +1,76 @@ +module gitlink.org.cn/JointCloud/pcm-participant-k8s + +go 1.24.0 + +require ( + github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 + github.com/gin-gonic/gin v1.10.1 + github.com/go-resty/resty/v2 v2.16.5 + github.com/smartystreets/goconvey v1.8.1 + k8s.io/api v0.33.2 + k8s.io/apimachinery v0.33.2 + k8s.io/client-go v0.33.2 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/gnostic-models v0.6.9 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/smarty/assertions v1.15.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + github.com/x448/float16 v0.8.4 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/oauth2 v0.27.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/term v0.30.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect + sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect +) diff --git a/participant/k8s/initialize/common.go b/participant/k8s/initialize/common.go new file mode 100644 index 0000000..2c54b8f --- /dev/null +++ b/participant/k8s/initialize/common.go @@ -0,0 +1,9 @@ +package initialize + +import ( + "gitlink.org.cn/JointCloud/pcm-participant-k8s/common" +) + +func InitConfig() { + common.InitClient() +} diff --git a/participant/k8s/k8s.go b/participant/k8s/k8s.go new file mode 100644 index 0000000..25977ab --- /dev/null +++ b/participant/k8s/k8s.go @@ -0,0 +1,158 @@ +package k8s + +import ( + "context" + "errors" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/container" + "gitlink.org.cn/JointCloud/pcm-participant-cloud/platform" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/service" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "time" +) + +type Container struct { + opt *Option + ft *container.Features +} +type ContainerSpec struct { + opt *Option + fSpec *container.FeatureSpec + container *model.Container +} +type Option struct { + platform *platform.Platform + DynamicClient *dynamic.DynamicClient + ClientSet *kubernetes.Clientset +} +type Kubernetes struct { + Container *Container + opt *Option +} + +func New(bearerToken, url string, id platform.Id) (*Kubernetes, error) { + + restConfig := &rest.Config{ + Timeout: 10 * time.Second, + Host: url, + BearerToken: bearerToken, + TLSClientConfig: rest.TLSClientConfig{ + Insecure: true, + }, + } + dynamicClient, err := dynamic.NewForConfig(restConfig) + if err != nil { + fmt.Println(err.Error()) + } + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + fmt.Println(err.Error()) + } + opt := &Option{ + DynamicClient: dynamicClient, + ClientSet: clientSet, + platform: &platform.Platform{ + Id: id, + Type: platform.K8s, + }, + } + container := &Container{ + opt: opt, + } + return &Kubernetes{ + opt: opt, + Container: container, + }, nil +} + +func (ic *ContainerSpec) Detail() (interface{}, error) { + return ic.container, nil +} +func (ic *ContainerSpec) Spec() (*container.Container, error) { + return nil, nil +} + +func (ic *ContainerSpec) Features() *container.FeatureSpec { + return ic.fSpec +} + +func (c *Container) Create(ctx context.Context, param *container.CreateParam) (interface{}, error) { + if param.Name == "" { + return nil, errors.New("Name is required") + } + if param.Image == "" { + return nil, errors.New("Image is required") + } + if param.ContainerGroupName == "" { + return nil, errors.New("ContainerGroupName is required") + } + cParam := model.CreateContainerParam{ + Container: model.Container{ + Name: param.Name, + Image: param.Image, + Args: param.Args, + Limits: struct { + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + }{ + Cpu: param.Cpu, + Memory: param.Memory, + }, + ContainerPort: struct { + Port int32 `json:"port,omitempty"` + NodePort int32 `json:"nodePort,omitempty"` + }{ + Port: param.Port, + NodePort: param.NodePort, + }, + Envs: nil, + Command: nil, + }, + ContainerGroupName: param.ContainerGroupName, + MountPath: param.MountPath, + } + err := service.CreateContainer(c.opt.ClientSet, &cParam) + if err != nil { + return nil, err + } + // 返回创建的容器信息 + return cParam.Container, nil +} +func (c *Container) Delete(ctx context.Context, param *container.DeleteParam) error { + if param.Name == "" { + return errors.New("Name is required") + } + err := service.DeleteContainer(c.opt.ClientSet, &model.DeleteContainerParam{ + ContainerName: param.Name, + }) + if err != nil { + return err + } + return nil +} + +func (c *Container) Features() *container.Features { + return nil +} +func (k *Kubernetes) Name() string { + return k.opt.platform.Name +} +func (k *Kubernetes) Type() platform.Type { + return k.opt.platform.Type +} +func (k *Kubernetes) Id() platform.Id { + return k.opt.platform.Id +} + +func (c *Container) Get(ctx context.Context, param *container.GetParam) (interface{}, error) { + pod, err := service.GetContainer(c.opt.ClientSet, &model.GetContainerParam{ + Name: param.Name, + }) + if err != nil { + return nil, err + } + return pod, nil +} diff --git a/participant/k8s/model/container.go b/participant/k8s/model/container.go new file mode 100644 index 0000000..2e3dfa7 --- /dev/null +++ b/participant/k8s/model/container.go @@ -0,0 +1,33 @@ +package model + +type CreateContainerParam struct { + ContainerGroupName string `json:"containerGroupName"` + Container Container `json:"Container"` + MountPath string `json:"mountPath,omitempty"` +} +type Container struct { + Name string `json:"name"` + Image string `json:"image"` + Args []string `json:"args,omitempty"` + Limits struct { + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + } `json:"limits,omitempty"` + ContainerPort struct { + Port int32 `json:"port,omitempty"` + NodePort int32 `json:"nodePort,omitempty"` + } `json:"containerPorts,omitempty"` + Envs []struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` + } `json:"envs,omitempty"` + Command []string +} + +type DeleteContainerParam struct { + ContainerName string `json:"containerGroupName"` +} + +type GetContainerParam struct { + Name string `json:"name"` +} diff --git a/participant/k8s/model/error.go b/participant/k8s/model/error.go new file mode 100644 index 0000000..f3d4e70 --- /dev/null +++ b/participant/k8s/model/error.go @@ -0,0 +1,29 @@ +package model + +// Error Model +// +// The Error contains error relevant information. +// +// swagger:model Error +type Error struct { + // The general error message + // + // required: true + // example: Unauthorized + Error string `json:"error"` + // The http error code. + // + // required: true + // example: 401 + ErrorCode int `json:"errorCode"` + // The http error code. + // + // required: true + // example: you need to provide a valid access token or user credentials to access this api + ErrorDescription string `json:"errorDescription"` +} + +type RespErr struct { + Code string `json:"code"` + Message string `json:"message"` +} diff --git a/participant/k8s/model/response.go b/participant/k8s/model/response.go new file mode 100644 index 0000000..ca40e29 --- /dev/null +++ b/participant/k8s/model/response.go @@ -0,0 +1,15 @@ +package model + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +func Response(c *gin.Context, code int, msg interface{}, data interface{}) { + c.JSON(http.StatusOK, map[string]interface{}{ + "code": code, + "msg": msg, + "data": data, + }) + return +} diff --git a/participant/k8s/router/router.go b/participant/k8s/router/router.go new file mode 100644 index 0000000..9106c49 --- /dev/null +++ b/participant/k8s/router/router.go @@ -0,0 +1,59 @@ +package router + +import ( + "fmt" + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/apis" + "regexp" + "time" +) + +func Create() (*gin.Engine, error) { + g := gin.New() + + g.Use(gin.LoggerWithFormatter(logFormatter), gin.Recovery()) + g.Use(gin.Logger()) + g.Use(gin.Recovery()) + //g.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) + + api := g.Group("/api") + v1 := api.Group("/v1") + { + + // container + container := v1.Group("container") + container.POST("/create", apis.CreateContainer) + + } + + return g, nil +} + +var tokenRegexp = regexp.MustCompile("token=[^&]+") + +func logFormatter(param gin.LogFormatterParams) string { + if (param.ClientIP == "127.0.0.1" || param.ClientIP == "::1") && param.Path == "/health" { + return "" + } + + var statusColor, methodColor, resetColor string + if param.IsOutputColor() { + statusColor = param.StatusCodeColor() + methodColor = param.MethodColor() + resetColor = param.ResetColor() + } + + if param.Latency > time.Minute { + param.Latency = param.Latency - param.Latency%time.Second + } + path := tokenRegexp.ReplaceAllString(param.Path, "token=[masked]") + return fmt.Sprintf("%v |%s %3d %s| %13v | %15s |%s %-7s %s %#v\n%s", + param.TimeStamp.Format(time.RFC3339), + statusColor, param.StatusCode, resetColor, + param.Latency, + param.ClientIP, + methodColor, param.Method, resetColor, + path, + param.ErrorMessage, + ) +} diff --git a/participant/k8s/service/container.go b/participant/k8s/service/container.go new file mode 100644 index 0000000..d7b1202 --- /dev/null +++ b/participant/k8s/service/container.go @@ -0,0 +1,130 @@ +package service + +import ( + "context" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func CreateContainer(client *kubernetes.Clientset, param *model.CreateContainerParam) error { + // 查询pod是否存在 + _, err := client.CoreV1().Pods("default").Get(context.TODO(), param.ContainerGroupName, metav1.GetOptions{}) + if err == nil { + return err + } + if !errors.IsNotFound(err) { + return err + } + // 创建svc + if param.Container.ContainerPort.Port != 0 { + _, err := CreateSvc(client, param) + if err != nil { + return err + } + } + // 创建 Pod 对象 + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: param.ContainerGroupName, // Pod 名称(必选) + Labels: map[string]string{ + "app": param.ContainerGroupName, + }, + }, + + Spec: v1.PodSpec{ + HostNetwork: true, + Containers: []v1.Container{ // 至少一个容器(必选) + { + ImagePullPolicy: "Never", + Name: param.Container.Name, + Image: param.Container.Image, + Args: param.Container.Args, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(param.Container.Limits.Cpu), + v1.ResourceMemory: resource.MustParse(param.Container.Limits.Memory), + }, + }, + Env: make([]v1.EnvVar, len(param.Container.Envs)), + Command: param.Container.Command, + }, + }, + }, + } + // 挂载pvc + if param.MountPath != "" { + pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, v1.VolumeMount{ + Name: "11mvgt1jh-pvc", + MountPath: param.MountPath, + }) + pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{ + Name: "11mvgt1jh-pvc", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "11mvgt1jh-pvc", + ReadOnly: false, + }, + }, + }) + } + + // 设置环境变量 + for i := range param.Container.Envs { + pod.Spec.Containers[0].Env[i] = v1.EnvVar{ + Name: param.Container.Envs[i].Name, + Value: param.Container.Envs[i].Value, + } + } + + // 发送请求 + response, err := client.CoreV1().Pods("default").Create( + context.TODO(), + pod, + metav1.CreateOptions{}) + if err != nil { + fmt.Printf(err.Error()) + // 删除pvc + //client.CoreV1().PersistentVolumeClaims("default").Delete(context.TODO(), pod.Spec.Volumes[0].PersistentVolumeClaim.ClaimName, metav1.DeleteOptions{}) + return err + } + + fmt.Printf("Pod created successfully. Name: %s\n", response.Name) + return nil +} + +func DeleteContainer(client *kubernetes.Clientset, param *model.DeleteContainerParam) error { + // 查询pod + pod, err := client.CoreV1().Pods("default").Get(context.TODO(), param.ContainerName, metav1.GetOptions{}) + if err != nil { + return err + } + // 删除pod + err = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + return err + } + // 删除svc + if pod.Spec.Containers[0].Ports != nil { + err = client.CoreV1().Services("default").Delete(context.TODO(), pod.Name+"-service", metav1.DeleteOptions{}) + if err != nil { + return err + } + } + + return nil +} + +func GetContainer(client *kubernetes.Clientset, param *model.GetContainerParam) (interface{}, error) { + // 查询pod + pod, err := client.CoreV1().Pods("default").Get(context.TODO(), param.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return pod, nil +} diff --git a/participant/k8s/service/pvc.go b/participant/k8s/service/pvc.go new file mode 100644 index 0000000..0e45d82 --- /dev/null +++ b/participant/k8s/service/pvc.go @@ -0,0 +1,61 @@ +package service + +import ( + "context" + "crypto/rand" + "encoding/binary" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + charSet = "abcdefghijklmnopqrstuvwxyz0123456789" // 36个字符 + base = 36 + length = 9 + maxTries = 1000 // 最大尝试次数 +) + +var sc = "csi-s3" + +func CreatePvc(client *kubernetes.Clientset) (string, error) { + randStr := generateRandomString() + pvc := v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: randStr + "-pvc", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteMany, + }, + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + StorageClassName: &sc, + }, + } + _, err := client.CoreV1().PersistentVolumeClaims("default").Create(context.Background(), &pvc, metav1.CreateOptions{}) + if err != nil { + return "", err + } + return pvc.Name, nil + +} + +// 生成随机字符串 +func generateRandomString() string { + var builder [length]byte + randomBuf := make([]byte, 8) + + for i := 0; i < length; i++ { + if _, err := rand.Read(randomBuf); err != nil { + return "" + } + randomValue := binary.BigEndian.Uint64(randomBuf) + builder[i] = charSet[randomValue%base] + } + return string(builder[:]) +} diff --git a/participant/k8s/service/svc.go b/participant/k8s/service/svc.go new file mode 100644 index 0000000..e61dc45 --- /dev/null +++ b/participant/k8s/service/svc.go @@ -0,0 +1,43 @@ +package service + +import ( + "context" + "gitlink.org.cn/JointCloud/pcm-participant-k8s/model" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" +) + +func CreateSvc(client *kubernetes.Clientset, param *model.CreateContainerParam) (string, error) { + + svc := v1.Service{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: param.ContainerGroupName + "-service", + }, + + Spec: v1.ServiceSpec{ + Type: "NodePort", + Selector: map[string]string{ + "app": param.ContainerGroupName, + }, + Ports: []v1.ServicePort{ + { + Port: param.Container.ContainerPort.Port, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: param.Container.ContainerPort.Port, + }, + NodePort: param.Container.ContainerPort.NodePort, + }, + }, + }, + } + _, err := client.CoreV1().Services("default").Create(context.Background(), &svc, metav1.CreateOptions{}) + if err != nil { + return "", err + } + return svc.Name, nil + +} diff --git a/participant/openI/service/image_test.go b/participant/openI/service/image_test.go index 604d4cd..b72f6bf 100644 --- a/participant/openI/service/image_test.go +++ b/participant/openI/service/image_test.go @@ -9,7 +9,7 @@ import ( ) func TestImage(t *testing.T) { - convey.Convey("Test Image Service", t, func() { + convey.Convey("Test Container Service", t, func() { common.InitClient() token := "8cff1d2db9171462c02901d086d13221389fd082" diff --git a/participant/serverless/apis/container.go b/participant/serverless/apis/container.go new file mode 100644 index 0000000..6d413ce --- /dev/null +++ b/participant/serverless/apis/container.go @@ -0,0 +1,7 @@ +package apis + +import "github.com/gin-gonic/gin" + +func CreateContainer(ctx *gin.Context) { + +} diff --git a/participant/serverless/common/client.go b/participant/serverless/common/client.go new file mode 100644 index 0000000..cb6cade --- /dev/null +++ b/participant/serverless/common/client.go @@ -0,0 +1,80 @@ +package common + +import ( + "crypto/tls" + "errors" + "fmt" + "gitlink.org.cn/JointCloud/pcm-participant-serverless/model" + "net/http" + "time" + + "github.com/go-resty/resty/v2" +) + +var ( + NoRedirectClient *resty.Client + RestyClient *resty.Client + HttpClient *http.Client +) +var UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" +var DefaultTimeout = time.Second * 300 + +func InitClient() { + NoRedirectClient = resty.New().SetRedirectPolicy( + resty.RedirectPolicyFunc(func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }), + ).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + NoRedirectClient.SetHeader("user-agent", UserAgent) + + RestyClient = NewRestyClient() + HttpClient = NewHttpClient() +} + +func NewRestyClient() *resty.Client { + client := resty.New(). + SetHeader("user-agent", UserAgent). + SetRetryCount(3). + SetRetryResetReaders(true). + SetTimeout(DefaultTimeout). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + return client +} + +func NewHttpClient() *http.Client { + return &http.Client{ + Timeout: time.Hour * 48, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } +} + +func Request(url string, method string, callback ReqCallback) ([]byte, error) { + respErr := &model.RespErr{} + req := RestyClient.R(). + SetHeaders(map[string]string{ + "Content-Type": "application/json", + }). + SetError(&respErr) + + if callback != nil { + callback(req) + } + + res, err := req.Execute(method, OPENIPREFIX+url) + + if err != nil { + return nil, err + } + if respErr.Message != "" { + return nil, errors.New(respErr.Message) + } + + if res.StatusCode() != http.StatusOK && res.StatusCode() != http.StatusCreated { + return nil, errors.New(fmt.Sprintf("msg: %s, status: %d", res.String(), res.StatusCode())) + } + + return res.Body(), nil +} diff --git a/participant/serverless/common/const.go b/participant/serverless/common/const.go new file mode 100644 index 0000000..b1af028 --- /dev/null +++ b/participant/serverless/common/const.go @@ -0,0 +1,74 @@ +package common + +const ( + MaxChunkSize int64 = 1024 * 1024 * 64 //64MB + + QUESTION_MARK = "?" + TIMEOUT = 10 + OPENIPREFIX = "https://openi.pcl.ac.cn" + ACCESSTOKEN = "access_token" + // user + USERINFO = "/api/v1/user" + + // repo + REPO = "/api/v1/user/repos" + RepoFile = "/api/v1/repos/{username}/{reponame}/contents/{filepath}" //上传文件到项目中、修改项目中的文件内容 + + // image + IMAGERECOMMENDED = "/api/v1/images/recommend" + IMAGECUSTOM = "/api/v1/images/custom" + IMAGESTARED = "/api/v1/images/star" + + // datasets + DATASETCURRENT = "/api/v1/datasets/{username}/{reponame}/current_repo" //查询当前项目的数据集接口 + DATASETMINE = "/api/v1/datasets/{username}/{reponame}/my_datasets" //我上传的数据集 + DATASETPUBLIC = "/api/v1/datasets/{username}/{reponame}/public_datasets" //查询公开数据集 + DATASETFAVORITE = "/api/v1/datasets/{username}/{reponame}/my_favorite" //查询我收藏的数据集 + DATASETEXISTEXPORT = "/api/v1/datasets/{username}/{reponame}/model/export_exist_dataset" //将用户选择的文件从训练任务结果中导出到数据集中 POST请求 + DATASETCREATE = "/api/v1/datasets/{username}/{reponame}/create" // + BaseDatasetsUrl = "/api/v1/datasets/{username}/{reponame}" //数据集列表 //数据集基本接口 + + // datasets upload + GetChunksUrl = "/api/v1/attachments/get_chunks" //获取当前需要上传文件的chunk信息 + NewMultipartUrl = "/api/v1/attachments/new_multipart" //获取文件上传的需要的信息 + GetMultipartUrl = "/api/v1/attachments/get_multipart_url" //获取文件上传的地址 + CompleteMultipartUrl = "/api/v1/attachments/complete_multipart" //完成上传接口 //上传文件到数据集 + + // task + TASKCREATIONREQUIRED = "/api/v1/{username}/{reponame}/ai_task/creation/required" // 查询创建任务所需资源接口 + TASKCREATIONIMAGEBYSPEC = "/api/v1/{username}/{reponame}/ai_task/creation/image_by_spec" // 根据选择的规格获取镜像(计算资源是NPU时使用) + TASKCREATE = "/api/v1/{username}/{reponame}/ai_task/create" // 创建任务 + TASKLIST = "/api/v1/{username}/{reponame}/ai_task/list" // 任务列表 + TASKDETAIL = "/api/v1/{username}/{reponame}/ai_task" // 查询任务详情 + TASKSTOP = "/api/v1/{username}/{reponame}/ai_task/stop" // 停止任务接口 + TASKOUTPUT = "/api/v1/{username}/{reponame}/ai_task/output" // 查询结果列表接口 + TASKRESULTDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/output/download/all" // 所有结果下载接口 + TASKLOGDOWNLOAD = "/api/v1/{username}/{reponame}/ai_task/log/download" //日志下载 + SelfEndpointUrl = "/api/v1/{username}/{reponame}/ai_task/self_endpoint_url" //在线推理接口 + + // model + MODELCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_new_model" //模型新增接口 + MODELGETBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byId" //根据模型ID查询模型信息接口 + MODELDOWNLOADBYID = "/api/v1/repos/{username}/{reponame}/modelmanage/downloadall" + QUERYMODELBYNAME = "/api/v1/repos/{username}/{reponame}/modelmanage/query_model_byName" //根据模型名称查询模型 + PageModel = "/api/v1/repos/{username}/{reponame}/modelmanage/show_model_api" //分页查询模型 + QueryAllModelFile = "/api/v1/all_model_data" //查询所有模型文件 + + // model local create + MODELLOCALCREATE = "/api/v1/repos/{username}/{reponame}/modelmanage/create_local_model" //创建一条本地模型记录 + MODELLOCALGETUPLOADEDCHUNKS = "/api/v1/attachments/model/get_chunks" //获取该文件已经上传的分片接口 + MODELLOCALNEWMULTIPART = "/api/v1/attachments/model/new_multipart" //开启一个本地模型上传 + MODELLOCALGETMULTIPARTURL = "/api/v1/attachments/model/get_multipart_url" //获取模型分片传输链接,并进行上传 + MODELLOCALCOMPLETEMULTIPART = "/api/v1/attachments/model/complete_multipart" //完成模型文件上传 +) + +const ( + SUCCESS = "success" +) + +// error +const ( + INVOKEERROR = "failed to invoke" + INVALIDPARAMS = "invalid Request params" + NOTFOUND = "not found" +) diff --git a/participant/serverless/common/token.go b/participant/serverless/common/token.go new file mode 100644 index 0000000..abd1bbb --- /dev/null +++ b/participant/serverless/common/token.go @@ -0,0 +1,24 @@ +package common + +import ( + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile" + tke "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/tke/v20180525" +) + +const ( + secretId = "" + secretKey = "" +) + +func GetTkeClient() *tke.Client { + credential := common.NewCredential(secretId, secretKey) + cpf := profile.NewClientProfile() + cpf.HttpProfile.Endpoint = "tke.tencentcloudapi.com" + // 实例化要请求产品的client对象,clientProfile是可选的 + client, err := tke.NewClient(credential, "", cpf) + if err != nil { + panic(err) + } + return client +} diff --git a/participant/serverless/common/types.go b/participant/serverless/common/types.go new file mode 100644 index 0000000..b8dffad --- /dev/null +++ b/participant/serverless/common/types.go @@ -0,0 +1,12 @@ +package common + +import "github.com/go-resty/resty/v2" + +type Json map[string]interface{} + +type TokenResp struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` +} + +type ReqCallback func(req *resty.Request) diff --git a/participant/serverless/common/util.go b/participant/serverless/common/util.go new file mode 100644 index 0000000..b0eaeed --- /dev/null +++ b/participant/serverless/common/util.go @@ -0,0 +1,88 @@ +package common + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "github.com/go-resty/resty/v2" + "io" + "mime/multipart" + "reflect" + "strconv" + "time" +) + +func GetRestyRequest(timeoutSeconds int64) *resty.Request { + client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) + request := client.R() + return request +} + +func GetFileMd5(file multipart.File) (string, error) { + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + return "", err + } + + // 计算MD5并转换为16进制字符串 + md5Bytes := hash.Sum(nil) + md5Str := hex.EncodeToString(md5Bytes) + return md5Str, nil +} + +func Bool2String(b bool) string { + return strconv.FormatBool(b) +} + +// StructToMapWithTag 将结构体转换为 map[string]string,key 使用指定标签的值 +func StructToMapWithTag(obj interface{}, tagName string) (map[string]string, error) { + result := make(map[string]string) + + // 获取值的反射对象 + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = val.Elem() // 解引用指针 + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("input is not a struct or pointer to struct") + } + + // 获取类型信息 + typ := val.Type() + + // 遍历结构体字段 + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // 获取字段的标签值 + tagValue := fieldType.Tag.Get(tagName) + if tagValue == "" { + continue // 如果标签不存在,跳过该字段 + } + + // 获取字段值并转换为字符串 + var fieldValue string + switch field.Kind() { + case reflect.String: + fieldValue = field.String() + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + fieldValue = strconv.FormatInt(field.Int(), 10) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + fieldValue = strconv.FormatUint(field.Uint(), 10) + case reflect.Float32, reflect.Float64: + fieldValue = strconv.FormatFloat(field.Float(), 'f', -1, 64) + case reflect.Bool: + fieldValue = strconv.FormatBool(field.Bool()) + default: + // 如果字段类型不支持,跳过 + continue + } + + // 将标签值和字段值存入 map + result[tagValue] = fieldValue + } + + return result, nil +} diff --git a/participant/serverless/go.mod b/participant/serverless/go.mod new file mode 100644 index 0000000..eb1bfae --- /dev/null +++ b/participant/serverless/go.mod @@ -0,0 +1,42 @@ +module gitlink.org.cn/JointCloud/pcm-participant-serverless + +go 1.23.10 + +require ( + github.com/gin-gonic/gin v1.10.1 + github.com/go-resty/resty/v2 v2.16.5 + github.com/smartystreets/goconvey v1.8.1 + github.com/tencentcloud/tencentcloud-sdk-go v3.0.233+incompatible +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/smarty/assertions v1.15.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/participant/serverless/model/container.go b/participant/serverless/model/container.go new file mode 100644 index 0000000..d548556 --- /dev/null +++ b/participant/serverless/model/container.go @@ -0,0 +1,105 @@ +package model + +import "time" + +// ContainerGroup 表示容器组配置 +type ContainerGroup struct { + Containers []Container `json:"Containers,omitempty"` + EksCiName *string `json:"EksCiName,omitempty"` + SecurityGroupIds []*string `json:"SecurityGroupIds,omitempty"` + SubnetId *string `json:"SubnetId,omitempty"` + VpcId *string `json:"VpcId,omitempty"` + Memory *float64 `json:"Memory,omitempty"` + Cpu *float64 `json:"Cpu,omitempty"` +} + +// Container 表示容器配置 +type Container struct { + Image *string `json:"Image,omitempty"` + Name *string `json:"Name,omitempty"` + Args []*string `json:"Args,omitempty"` + Commands []*string `json:"Commands,omitempty"` + Cpu *float64 `json:"Cpu,omitempty"` + CurrentState *ContainerState `json:"CurrentState,omitempty"` + EnvironmentVars []*EnvironmentVar `json:"EnvironmentVars,omitempty"` + GpuLimit *int `json:"GpuLimit,omitempty"` + LivenessProbe *ProbeConfig `json:"LivenessProbe,omitempty"` + Memory *float64 `json:"Memory,omitempty"` + ReadinessProbe *ProbeConfig `json:"ReadinessProbe,omitempty"` + RestartCount *int `json:"RestartCount,omitempty"` + SecurityContext *SecurityContext `json:"SecurityContext,omitempty"` + VolumeMounts []*VolumeMount `json:"VolumeMounts,omitempty"` + WorkingDir *string `json:"WorkingDir,omitempty"` +} + +// ContainerState 表示容器的当前状态 +type ContainerState struct { + ExitCode *int `json:"ExitCode,omitempty"` + FinishTime *time.Time `json:"FinishTime,omitempty"` + Message *string `json:"Message,omitempty"` + Reason *string `json:"Reason,omitempty"` + RestartCount *int `json:"RestartCount,omitempty"` + StartTime *time.Time `json:"StartTime,omitempty"` + State *string `json:"State,omitempty"` +} + +// EnvironmentVar 表示容器环境变量 +type EnvironmentVar struct { + Name *string `json:"Name,omitempty"` + Value *string `json:"Value,omitempty"` +} + +// ProbeConfig 表示容器健康检查探针配置 +type ProbeConfig struct { + Probe *Probe `json:"Probe,omitempty"` + Exec *ExecProbe `json:"Exec,omitempty"` + HttpGet *HTTPGetProbe `json:"HttpGet,omitempty"` + TcpSocket *TCPSocketProbe `json:"TcpSocket,omitempty"` +} + +// Probe 表示通用探针配置 +type Probe struct { + FailureThreshold *int `json:"FailureThreshold,omitempty"` + InitialDelaySeconds *int `json:"InitialDelaySeconds,omitempty"` + PeriodSeconds *int `json:"PeriodSeconds,omitempty"` + SuccessThreshold *int `json:"SuccessThreshold,omitempty"` + TimeoutSeconds *int `json:"TimeoutSeconds,omitempty"` +} + +// ExecProbe 表示执行命令类型的探针 +type ExecProbe struct { + Commands []*string `json:"Commands,omitempty"` +} + +// HTTPGetProbe 表示HTTP请求类型的探针 +type HTTPGetProbe struct { + Path *string `json:"Path,omitempty"` + Port *int `json:"Port,omitempty"` + Scheme *string `json:"Scheme,omitempty"` +} + +// TCPSocketProbe 表示TCP连接类型的探针 +type TCPSocketProbe struct { + Port *int `json:"Port,omitempty"` +} + +// SecurityContext 表示容器安全上下文 +type SecurityContext struct { + Capabilities *Capabilities `json:"Capabilities,omitempty"` +} + +// Capabilities 表示容器权限配置 +type Capabilities struct { + Add []*string `json:"Add,omitempty"` + Drop []*string `json:"Drop,omitempty"` +} + +// VolumeMount 表示容器卷挂载配置 +type VolumeMount struct { + MountPath *string `json:"MountPath,omitempty"` + Name *string `json:"Name,omitempty"` + MountPropagation *string `json:"MountPropagation,omitempty"` + ReadOnly *bool `json:"ReadOnly,omitempty"` + SubPath *string `json:"SubPath,omitempty"` + SubPathExpr *string `json:"SubPathExpr,omitempty"` +} diff --git a/participant/serverless/model/error.go b/participant/serverless/model/error.go new file mode 100644 index 0000000..f3d4e70 --- /dev/null +++ b/participant/serverless/model/error.go @@ -0,0 +1,29 @@ +package model + +// Error Model +// +// The Error contains error relevant information. +// +// swagger:model Error +type Error struct { + // The general error message + // + // required: true + // example: Unauthorized + Error string `json:"error"` + // The http error code. + // + // required: true + // example: 401 + ErrorCode int `json:"errorCode"` + // The http error code. + // + // required: true + // example: you need to provide a valid access token or user credentials to access this api + ErrorDescription string `json:"errorDescription"` +} + +type RespErr struct { + Code string `json:"code"` + Message string `json:"message"` +} diff --git a/participant/serverless/model/response.go b/participant/serverless/model/response.go new file mode 100644 index 0000000..ca40e29 --- /dev/null +++ b/participant/serverless/model/response.go @@ -0,0 +1,15 @@ +package model + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +func Response(c *gin.Context, code int, msg interface{}, data interface{}) { + c.JSON(http.StatusOK, map[string]interface{}{ + "code": code, + "msg": msg, + "data": data, + }) + return +} diff --git a/participant/serverless/router/router.go b/participant/serverless/router/router.go new file mode 100644 index 0000000..acc829e --- /dev/null +++ b/participant/serverless/router/router.go @@ -0,0 +1,59 @@ +package router + +import ( + "fmt" + "github.com/gin-gonic/gin" + "gitlink.org.cn/JointCloud/pcm-participant-serverless/apis" + "regexp" + "time" +) + +func Create() (*gin.Engine, error) { + g := gin.New() + + g.Use(gin.LoggerWithFormatter(logFormatter), gin.Recovery()) + g.Use(gin.Logger()) + g.Use(gin.Recovery()) + //g.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) + + api := g.Group("/api") + v1 := api.Group("/v1") + { + + // container + container := v1.Group("container") + container.POST("/create", apis.CreateContainer) + + } + + return g, nil +} + +var tokenRegexp = regexp.MustCompile("token=[^&]+") + +func logFormatter(param gin.LogFormatterParams) string { + if (param.ClientIP == "127.0.0.1" || param.ClientIP == "::1") && param.Path == "/health" { + return "" + } + + var statusColor, methodColor, resetColor string + if param.IsOutputColor() { + statusColor = param.StatusCodeColor() + methodColor = param.MethodColor() + resetColor = param.ResetColor() + } + + if param.Latency > time.Minute { + param.Latency = param.Latency - param.Latency%time.Second + } + path := tokenRegexp.ReplaceAllString(param.Path, "token=[masked]") + return fmt.Sprintf("%v |%s %3d %s| %13v | %15s |%s %-7s %s %#v\n%s", + param.TimeStamp.Format(time.RFC3339), + statusColor, param.StatusCode, resetColor, + param.Latency, + param.ClientIP, + methodColor, param.Method, resetColor, + path, + param.ErrorMessage, + ) +}