pcm-coordinator/adaptor/pcm_pod/server/podImpl.go

351 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
pbpod "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_pod/gen/idl"
"code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_pod/service"
pbtenant "code.gitlink.org.cn/JCCE/PCM.git/tenant/gen/idl"
"context"
"flag"
"fmt"
"sync"
"code.gitlink.org.cn/JCCE/PCM.git/common/tenanter"
"github.com/golang/glog"
"github.com/pkg/errors"
)
// GetPodRegion get the available region for pcm_pod
func GetPodRegion(ctx context.Context, req *pbpod.GetPodRegionReq) (resp *pbpod.GetPodRegionResp, err error) {
var (
regionInit tenanter.Region
regions []*pbpod.Region
)
switch req.GetProvider() {
case pbpod.CloudProvider_ali:
regionInit, _ = tenanter.NewRegion(pbtenant.CloudProvider(req.GetProvider()), 2)
case pbpod.CloudProvider_tencent:
regionInit, _ = tenanter.NewRegion(pbtenant.CloudProvider(req.GetProvider()), 5)
case pbpod.CloudProvider_huawei:
regionInit, _ = tenanter.NewRegion(pbtenant.CloudProvider(req.GetProvider()), 5)
}
tenanters, err := tenanter.GetTenanters(pbtenant.CloudProvider(req.GetProvider()))
if err != nil {
return nil, errors.WithMessage(err, "getTenanters error")
}
for _, tenant := range tenanters {
pod, err := poder.NewPodClient(req.GetProvider(), regionInit, tenant)
if err != nil {
return nil, errors.WithMessage(err, "NewPodClient error")
}
request := &pbpod.GetPodRegionReq{
Provider: req.GetProvider(),
}
resp, err := pod.GetPodRegion(ctx, request)
if err != nil {
return nil, errors.Wrap(err, "GetPodRegion error")
}
for _, region := range resp.GetRegions() {
regions = append(regions, region)
}
}
return &pbpod.GetPodRegionResp{Regions: regions}, nil
}
func CreatePods(ctx context.Context, req *pbpod.CreatePodsReq) (*pbpod.CreatePodsResp, error) {
var (
wg sync.WaitGroup
requestIds = make([]string, 0)
)
wg.Add(len(req.CreatePodReq))
c := make(chan string)
for k := range req.CreatePodReq {
reqPod := req.CreatePodReq[k]
go func() {
defer wg.Done()
resp, err := CreatePod(ctx, reqPod)
if err != nil || resp == nil {
fmt.Println(errors.Wrap(err, "Batch pcm_pod creation error"))
return
}
c <- resp.RequestId
}()
}
go func() {
defer close(c)
wg.Wait()
}()
isFinished := false
if len(requestIds) > 0 {
isFinished = true
}
for v := range c {
requestIds = append(requestIds, v)
}
return &pbpod.CreatePodsResp{
Finished: isFinished,
RequestId: requestIds,
}, nil
}
func CreatePod(ctx context.Context, req *pbpod.CreatePodReq) (*pbpod.CreatePodResp, error) {
var (
pod poder.Poder
)
tenanters, err := tenanter.GetTenanters(pbtenant.CloudProvider(req.Provider))
if err != nil {
return nil, errors.WithMessage(err, "getTenanters error")
}
region, err := tenanter.NewRegion(pbtenant.CloudProvider(req.Provider), req.RegionId)
if err != nil {
return nil, errors.WithMessagef(err, "provider %v regionId %v", req.Provider, req.RegionId)
}
for _, tenant := range tenanters {
if req.AccountName == "" || tenant.AccountName() == req.AccountName {
if pod, err = poder.NewPodClient(req.Provider, region, tenant); err != nil {
return nil, errors.WithMessage(err, "NewPodClient error")
}
break
}
}
return pod.CreatePod(ctx, req)
}
func DeletePod(ctx context.Context, req *pbpod.DeletePodReq) (*pbpod.DeletePodResp, error) {
var (
pod poder.Poder
)
//pcm adk过来的请求需要从用户本地读取配置文件
if len(req.RequestSource) > 0 {
var configFile string
flag.StringVar(&configFile, "conf", "configs/tenanter.yaml", "tenanter.yaml")
flag.Parse()
defer glog.Flush()
if err := tenanter.LoadCloudConfigsFromFile(configFile); err != nil {
if !errors.Is(err, tenanter.ErrLoadTenanterFileEmpty) {
glog.Fatalf("tenanter.LoadCloudConfigsFromFile error %+v", err)
}
glog.Warningf("tenanter.LoadCloudConfigsFromFile empty file path %s", configFile)
}
glog.Infof("load tenant from file finished")
}
tenanters, err := tenanter.GetTenanters(pbtenant.CloudProvider(req.Provider))
if err != nil {
return nil, errors.WithMessage(err, "getTenanters error")
}
region, err := tenanter.NewRegion(pbtenant.CloudProvider(req.Provider), req.RegionId)
if err != nil {
return nil, errors.WithMessagef(err, "provider %v regionId %v", pbtenant.CloudProvider(req.Provider), req.RegionId)
}
for _, tenant := range tenanters {
if req.AccountName == "" || tenant.AccountName() == req.AccountName {
if pod, err = poder.NewPodClient(req.Provider, region, tenant); err != nil {
return nil, errors.WithMessage(err, "NewPodClient error")
}
break
}
}
return pod.DeletePod(ctx, req)
}
func UpdatePod(ctx context.Context, req *pbpod.UpdatePodReq) (*pbpod.UpdatePodResp, error) {
var (
pod poder.Poder
)
//pcm adk过来的请求需要从用户本地读取配置文件
if len(req.RequestSource) > 0 {
var configFile string
flag.StringVar(&configFile, "conf", "configs/tenanter.yaml", "tenanter.yaml")
flag.Parse()
defer glog.Flush()
if err := tenanter.LoadCloudConfigsFromFile(configFile); err != nil {
if !errors.Is(err, tenanter.ErrLoadTenanterFileEmpty) {
glog.Fatalf("tenanter.LoadCloudConfigsFromFile error %+v", err)
}
glog.Warningf("tenanter.LoadCloudConfigsFromFile empty file path %s", configFile)
}
glog.Infof("load tenant from file finished")
}
tenanters, err := tenanter.GetTenanters(pbtenant.CloudProvider(req.Provider))
if err != nil {
return nil, errors.WithMessage(err, "getTenanters error")
}
region, err := tenanter.NewRegion(pbtenant.CloudProvider(req.Provider), req.RegionId)
if err != nil {
return nil, errors.WithMessagef(err, "provider %v regionId %v", req.Provider, req.RegionId)
}
for _, tenant := range tenanters {
if req.AccountName == "" || tenant.AccountName() == req.AccountName {
if pod, err = poder.NewPodClient(req.Provider, region, tenant); err != nil {
return nil, errors.WithMessage(err, "NewPodClient error")
}
break
}
}
return pod.UpdatePod(ctx, req)
}
func ListPodDetail(ctx context.Context, req *pbpod.ListPodDetailReq) (*pbpod.ListPodDetailResp, error) {
var (
pod poder.Poder
)
tenanters, err := tenanter.GetTenanters(pbtenant.CloudProvider(req.Provider))
if err != nil {
return nil, errors.WithMessage(err, "getTenanters error")
}
region, err := tenanter.NewRegion(pbtenant.CloudProvider(req.Provider), req.RegionId)
if err != nil {
return nil, errors.WithMessagef(err, "provider %v regionId %v", req.Provider, req.RegionId)
}
for _, tenant := range tenanters {
if req.AccountName == "" || tenant.AccountName() == req.AccountName {
if pod, err = poder.NewPodClient(req.Provider, region, tenant); err != nil {
return nil, errors.WithMessage(err, "NewPodClient error")
}
break
}
}
return pod.ListPodDetail(ctx, req)
}
func ListPod(ctx context.Context, req *pbpod.ListPodReq) (*pbpod.ListPodResp, error) {
var (
wg sync.WaitGroup
mutex sync.Mutex
pods []*pbpod.PodInstance
tenanters []tenanter.Tenanter
)
//pcm adk过来的请求需要从用户本地读取配置文件
if len(req.RequestSource) > 0 {
var configFile string
flag.StringVar(&configFile, "conf", "configs/tenanter.yaml", "tenanter.yaml")
flag.Parse()
defer glog.Flush()
if err := tenanter.LoadCloudConfigsFromFile(configFile); err != nil {
if !errors.Is(err, tenanter.ErrLoadTenanterFileEmpty) {
glog.Fatalf("tenanter.LoadCloudConfigsFromFile error %+v", err)
}
glog.Warningf("tenanter.LoadCloudConfigsFromFile empty file path %s", configFile)
}
glog.Infof("load tenant from file finished")
}
tenanters, _ = tenanter.GetTenanters(pbtenant.CloudProvider(req.Provider))
//get the available region for product
reqPodRegion := &pbpod.GetPodRegionReq{Provider: req.GetProvider()}
respPodRegion, err := GetPodRegion(ctx, reqPodRegion)
if err != nil {
return nil, errors.WithMessage(err, "getPodRegion error")
}
wg.Add(len(tenanters) * len(respPodRegion.Regions))
for _, t := range tenanters {
for _, region := range respPodRegion.Regions {
go func(tenant tenanter.Tenanter, region tenanter.Region) {
defer wg.Done()
pod, err := poder.NewPodClient(req.Provider, region, tenant)
if err != nil {
glog.Errorf("New Pod Client error %v", err)
return
}
request := &pbpod.ListPodDetailReq{
Provider: req.Provider,
AccountName: tenant.AccountName(),
RegionId: region.GetId(),
Namespace: req.Namespace,
PageNumber: 1,
PageSize: 100,
NextToken: "",
}
for {
resp, err := pod.ListPodDetail(ctx, request)
if err != nil {
glog.Errorf("ListDetail error %v", err)
return
}
mutex.Lock()
pods = append(pods, resp.Pods...)
mutex.Unlock()
if resp.Finished {
break
}
request.PageNumber, request.PageSize, request.NextToken = resp.PageNumber, resp.PageSize, resp.NextToken
}
}(t, region)
}
}
wg.Wait()
return &pbpod.ListPodResp{Pods: pods}, nil
}
func ListPodAll(ctx context.Context) (*pbpod.ListPodResp, error) {
var (
wg sync.WaitGroup
mutex sync.Mutex
pods []*pbpod.PodInstance
)
wg.Add(len(pbpod.CloudProvider_name))
for k := range pbpod.CloudProvider_name {
go func(provider int32) {
defer wg.Done()
//针对私有K8S集群调用listAll时默认只查询ListPodDetailReq namespace下的pod
if provider == 3 {
resp, err := ListPod(ctx, &pbpod.ListPodReq{Provider: pbpod.CloudProvider(provider), Namespace: "pcm"})
if err != nil {
glog.Errorf("List error %v", err)
return
}
mutex.Lock()
pods = append(pods, resp.Pods...)
mutex.Unlock()
} else {
resp, err := ListPod(ctx, &pbpod.ListPodReq{Provider: pbpod.CloudProvider(provider)})
if err != nil {
glog.Errorf("List error %v", err)
return
}
mutex.Lock()
pods = append(pods, resp.Pods...)
mutex.Unlock()
}
}(k)
}
wg.Wait()
return &pbpod.ListPodResp{Pods: pods}, nil
}