JCC-CSScheduler/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go

205 lines
5.7 KiB
Go

package state2
import (
"encoding/json"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/blockchain"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state"
"strconv"
"strings"
"sync"
"time"
)
type DataUpload struct {
userID cdssdk.UserID
uploadInfo sch.UploadInfo
dataType string
blockChainToken string
//storages []cdssdk.StorageID
lock sync.Mutex
}
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string) *DataUpload {
return &DataUpload{
userID: userID,
uploadInfo: uploadInfo,
dataType: dataType,
blockChainToken: blockChainToken,
//storages: storages,
}
}
func (s *DataUpload) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
s.lock.Lock()
defer s.lock.Unlock()
err := s.do(rtx)
if err != nil {
logger.Error(err)
rtx.Mgr.ChangeState(jo, state.FailureComplete(err))
return
}
rtx.Mgr.ChangeState(jo, state.SuccessComplete())
}
func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
var objectIDs []cdssdk.ObjectID
switch info := s.uploadInfo.(type) {
// 通过本地上传
case *sch.LocalUploadInfo:
objectIDs = info.ObjectIDs
// 通过URL上传
case *sch.RemoteUploadInfo:
uploaderCli, err := schglb.UploaderPool.Acquire()
if err != nil {
return fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.UploaderPool.Release(uploaderCli)
req := uploadersdk.UploadReq{
DataType: s.dataType,
Source: &uploadersdk.UrlSource{
Type: sch.StorageTypeURL,
Url: info.Url,
},
Target: &uploadersdk.UrlTarget{
Type: sch.StorageTypeURL,
ClusterID: uploadersdk.ClusterID(info.Cluster),
JCSUploadInfo: cdsapi.ObjectUploadInfo{
UserID: s.userID,
PackageID: info.PackageID,
},
},
}
uploadResp, err := uploaderCli.Upload(req)
if err != nil {
return fmt.Errorf("upload data: %w", err)
}
if uploadResp.JsonData != "" {
err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1)
if err != nil {
return fmt.Errorf("update package: %w", err)
}
}
objectIDs = uploadResp.ObjectIDs
}
// 传入存证
blockChains, err := s.blockChain(objectIDs)
if err != nil {
return fmt.Errorf("blockchain: %w", err)
}
err = rtx.Mgr.DB.UploadData().InsertBlockchains(rtx.Mgr.DB.DefCtx(), blockChains)
if err != nil {
return fmt.Errorf("insert blockchains: %w", err)
}
return nil
}
func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.BlockChain, error) {
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return nil, fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
//objects, err := cdsCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{})
objects, err := cdsCli.Object().ListByIDs(cdsapi.ObjectListByIDs{
ObjectIDs: objectIDs,
UserID: s.userID,
})
if err != nil {
logger.Error(fmt.Errorf("list objects: %w", err))
return nil, fmt.Errorf("list objects: %w", err)
}
if objects.Objects == nil || len(objects.Objects) == 0 {
return nil, fmt.Errorf("objects is nil")
}
bcCli, err := schglb.BlockChainPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new blockchain client: %w", err)
}
defer schglb.BlockChainPool.Release(bcCli)
var blockChains []*uploadersdk.BlockChain
for _, obj := range objects.Objects {
if obj == nil {
logger.Warnf("object is nil")
continue
}
now := time.Now()
timestamp := now.UnixNano() / int64(time.Millisecond)
chainID := strconv.FormatInt(int64(obj.ObjectID), 10) + "_" + strconv.FormatInt(timestamp, 10)
formattedTime := now.Format("2006-01-02 15:04:05")
paths := strings.Split(obj.Path, "/")
fileName := paths[len(paths)-1]
// 去掉hash前四个字符
fileHash := obj.FileHash[4:]
var args = make(map[string]string)
args["userID"] = strconv.FormatInt(int64(s.userID), 10)
args["type"] = s.dataType
args["fileName"] = fileName
args["fileHash"] = string(fileHash)
args["fileSize"] = strconv.FormatInt(obj.Size, 10)
args["objectID"] = strconv.FormatInt(int64(obj.ObjectID), 10)
args["createTime"] = formattedTime
// 将map转换成json字符串
argsJson, _ := json.Marshal(args)
argsArr := []string{chainID, string(argsJson)}
req := blockchain.InvokeReq{
ContractAddress: schglb.BlockChainConfig.ContractAddress,
FunctionName: schglb.BlockChainConfig.FunctionName,
//MemberName: schglb.BlockChainConfig.MemberName,
Type: schmod.BlockChain_Upload,
Args: argsArr,
}
err = bcCli.BlockChainInvoke(req, s.blockChainToken)
if err != nil {
return nil, fmt.Errorf("invoke blockchain: %w", err)
}
blockChains = append(blockChains, &uploadersdk.BlockChain{
ObjectID: obj.ObjectID,
BlockChainID: chainID,
BlockChainType: schmod.Write,
//FileHash: string(fileHash),
//FileName: fileName,
//FileSize: obj.Size,
})
}
if blockChains == nil {
return nil, fmt.Errorf("blockchains is nil")
}
return blockChains, nil
}
func (s *DataUpload) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
return &jobmod.NormalJobReadyToExecuteDump{}
}