初步实现随机读取

This commit is contained in:
Sydonian 2024-04-12 08:57:42 +08:00
parent 33b1a4ea2d
commit 2f91aed827
42 changed files with 622 additions and 467 deletions

View File

@ -7,6 +7,7 @@ import (
c "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/grpc"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)
@ -21,6 +22,7 @@ type Config struct {
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
}
var cfg Config

View File

@ -5,7 +5,7 @@ import (
"io"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -43,7 +43,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error {
return fmt.Errorf("recv message failed, err: %w", err)
}
err = myio.WriteAll(pw, msg.Data)
err = io2.WriteAll(pw, msg.Data)
if err != nil {
// 关闭文件写入不需要返回的hash和error
pw.CloseWithError(io.ErrClosedPipe)

View File

@ -5,7 +5,7 @@ import (
"io"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
@ -53,7 +53,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro
return fmt.Errorf("recv message failed, err: %w", err)
}
err = myio.WriteAll(writer, msg.Data)
err = io2.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入不需要返回的hash和error
writer.Abort(io.ErrClosedPipe)

View File

@ -52,11 +52,6 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
if err != nil {
return fmt.Errorf("getting package object details: %w", err)
}
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
@ -64,9 +59,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
defer ipfsCli.Close()
// TODO 可以考虑优化比如rep类型的直接pin就可以
objIter := iterator.NewDownloadObjectIterator(getResp.Objects, &iterator.DownloadContext{
Distlock: ctx.distlock,
})
objIter := ctx.downloader.DownloadPackage(t.packageID)
defer objIter.Close()
for {

View File

@ -13,7 +13,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
myref "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage/common/consts"
@ -73,7 +73,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
}
t.FullOutputPath = outputDirPath
getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID))
if err != nil {
return fmt.Errorf("getting package object details: %w", err)
}
@ -184,7 +184,7 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i
if bsc < osc {
var fileStrs []io.ReadCloser
rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
rs, err := ec.NewStreamRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, nil, fmt.Errorf("new rs: %w", err)
}
@ -204,7 +204,7 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i
fileStrs = append(fileStrs, str)
}
fileReaders, filesCloser := myio.ToReaders(fileStrs)
fileReaders, filesCloser := io2.ToReaders(fileStrs)
var indexes []int
var pinnedBlocks []stgmod.ObjectBlock
@ -218,8 +218,8 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i
})
}
outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
outputs, outputsCloser := io2.ToReaders(rs.ReconstructData(fileReaders, indexes))
return io2.AfterReadClosed(io2.Length(io2.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), pinnedBlocks, nil

View File

@ -4,6 +4,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -11,6 +12,7 @@ type TaskContext struct {
distlock *distlock.Service
sw *ioswitch.Switch
connectivity *connectivity.Collector
downloader *downloader.Downloader
}
// 需要在Task结束后主动调用completing函数将在Manager加锁期间被调用
@ -25,10 +27,11 @@ type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector) Manager {
func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
sw: sw,
connectivity: connectivity,
downloader: downloader,
})
}

View File

@ -13,6 +13,7 @@ import (
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
@ -94,11 +95,13 @@ func main() {
sw := ioswitch.NewSwitch()
dlder := downloader.NewDownloader(config.Cfg().Downloader)
//处置协调端、客户端命令(可多建几个)
wg := sync.WaitGroup{}
wg.Add(4)
taskMgr := task.NewManager(distlock, &sw, &conCol)
taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder)
// 启动命令服务器
// TODO 需要设计AgentID持久化机制

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)
@ -19,6 +20,7 @@ type Config struct {
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
}
var cfg Config

View File

@ -13,6 +13,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)
type ObjectService struct {
@ -96,7 +97,17 @@ func (s *ObjectService) Download(ctx *gin.Context) {
return
}
file, err := s.svc.ObjectSvc().Download(req.UserID, req.ObjectID)
off := req.Offset
len := int64(-1)
if req.Length != nil {
len = *req.Length
}
file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{
ObjectID: req.ObjectID,
Offset: off,
Length: len,
})
if err != nil {
log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))

View File

@ -7,8 +7,8 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mytask "gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@ -65,28 +65,14 @@ func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObje
return resp.Successes, nil
}
func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (*iterator.IterDownloadingObject, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
resp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID}))
if err != nil {
return nil, fmt.Errorf("requesting to coordinator")
}
if resp.Objects[0] == nil {
return nil, fmt.Errorf("object not found")
}
iter := iterator.NewDownloadObjectIterator([]stgmod.ObjectDetail{*resp.Objects[0]}, &iterator.DownloadContext{
Distlock: svc.DistLock,
})
defer iter.Close()
func (svc *ObjectService) Download(userID cdssdk.UserID, req downloader.DownloadReqeust) (*downloader.Downloading, error) {
// TODO 检查用户ID
iter := svc.Downloader.DownloadObjects([]downloader.DownloadReqeust{req})
downloading, err := iter.MoveNext()
if downloading.Object == nil {
return nil, fmt.Errorf("object not found")
}
if err != nil {
return nil, err
}

View File

@ -6,7 +6,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@ -78,23 +78,9 @@ func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID
return resp.Package, nil
}
func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getObjsResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object details: %w", err)
}
iter := iterator.NewDownloadObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{
Distlock: svc.DistLock,
})
return iter, nil
func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (downloader.DownloadIterator, error) {
// TODO 检查用户ID
return svc.Downloader.DownloadPackage(packageID), nil
}
func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error {

View File

@ -3,16 +3,19 @@ package services
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/client/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)
type Service struct {
DistLock *distlock.Service
TaskMgr *task.Manager
DistLock *distlock.Service
TaskMgr *task.Manager
Downloader *downloader.Downloader
}
func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader) (*Service, error) {
return &Service{
DistLock: distlock,
TaskMgr: taskMgr,
DistLock: distlock,
TaskMgr: taskMgr,
Downloader: downloader,
}, nil
}

View File

@ -14,6 +14,7 @@ import (
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)
func main() {
@ -51,7 +52,9 @@ func main() {
taskMgr := task.NewManager(distlockSvc, &conCol)
svc, err := services.NewService(distlockSvc, &taskMgr)
dlder := downloader.NewDownloader(config.Cfg().Downloader)
svc, err := services.NewService(distlockSvc, &taskMgr, &dlder)
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)

View File

@ -35,5 +35,8 @@
},
"connectivity": {
"testInterval": 300
},
"downloader": {
"maxStripCacheCount": 100
}
}

View File

@ -28,5 +28,8 @@
},
"connectivity": {
"testInterval": 300
},
"downloader": {
"maxStripCacheCount": 100
}
}

View File

@ -1,63 +0,0 @@
<setting>
<attribute>
<name>local.addr</name>
<value>101.201.215.165</value>
</attribute>
<attribute>
<name>controller.addr</name>
<value>101.201.215.196</value>
</attribute>
<attribute>
<name>agents.addr</name>
<value>/hw-sh/123.60.146.162</value>
<value>/hw-bj/120.46.183.86</value>
<value>/ali/101.201.215.165</value>
</attribute>
<attribute>
<name>agents.location</name>
<value>ali</value>
<value>hw-sh</value>
<value>hw-bj</value>
</attribute>
<attribute>
<name>oec.controller.thread.num</name>
<value>4</value>
</attribute>
<attribute>
<name>oec.agent.thread.num</name>
<value>2</value>
</attribute>
<attribute>
<name>oec.cmddist.thread.num</name>
<value>2</value>
</attribute>
<attribute>
<name>packet.size</name>
<value>131072</value>
</attribute>
<attribute>
<name>ec.concurrent.num</name>
<value>2</value>
</attribute>
<attribute>
<name>ec.policy</name>
<value><ecid>rs_9_6</ecid><class>RS96</class><n>9</n><k>6</k><w>1</w><opt>-1</opt></value>
<value><ecid>rs_3_2</ecid><class>RS96</class><n>3</n><k>2</k><w>1</w><opt>-1</opt></value>
<value><ecid>edu_9_6</ecid><class>EDU96</class><n>9</n><k>6</k><w>1</w><opt>-1</opt></value>
<value><ecid>edu_3_2</ecid><class>EDU32</class><n>3</n><k>2</k><w>1</w><opt>-1</opt></value>
<value><ecid>dfc_9_4</ecid><class>DFC</class><n>9</n><k>4</k><w>1</w><opt>-1</opt><param>3,2</param></value>
</attribute>
<attribute>
<name>inter.inner.addr</name>
<inner>
<dc><ip>172.23.85.69</ip><ip>172.23.85.71</ip><ip>172.23.85.70</ip></dc>
<dc><ip>192.168.0.69</ip></dc>
<dc><ip>192.168.0.76</ip></dc>
</inner>
<inter>
<dc><ip>101.201.215.196</ip><ip>101.201.215.165</ip><ip>101.201.214.111</ip></dc>
<dc><ip>123.60.146.162</ip></dc>
<dc><ip>120.46.183.86</ip></dc>
</inter>
</attribute>
</setting>

View File

@ -1,94 +0,0 @@
package cmd
import (
"fmt"
"io"
"os"
"path/filepath"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
type DownloadPackage struct {
userID cdssdk.UserID
packageID cdssdk.PackageID
outputPath string
}
type DownloadPackageContext struct {
Distlock *distlock.Service
}
func NewDownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *DownloadPackage {
return &DownloadPackage{
userID: userID,
packageID: packageID,
outputPath: outputPath,
}
}
func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
if err != nil {
return fmt.Errorf("getting package object details: %w", err)
}
objIter := iterator.NewDownloadObjectIterator(getObjectDetails.Objects, &iterator.DownloadContext{
Distlock: ctx.Distlock,
})
defer objIter.Close()
return t.writeObjects(objIter)
}
func (t *DownloadPackage) writeObjects(objIter iterator.DownloadingObjectIterator) error {
for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
return err
}
err = func() error {
defer objInfo.File.Close()
fullPath := filepath.Join(t.outputPath, objInfo.Object.Path)
dirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}
outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()
_, err = io.Copy(outputFile, objInfo.File)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}
return nil
}()
if err != nil {
return err
}
}
return nil
}

View File

@ -4,7 +4,7 @@ import (
"database/sql"
"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/math2"
)
const (
@ -27,7 +27,7 @@ func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, call
batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math.Min(batchSize, len(arr))
curBatchSize := math2.Min(batchSize, len(arr))
ret, err := ctx.NamedExec(sql, arr[:curBatchSize])
if err != nil {
@ -59,7 +59,7 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal
batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math.Min(batchSize, len(arr))
curBatchSize := math2.Min(batchSize, len(arr))
ret, err := ctx.NamedQuery(sql, arr[:curBatchSize])
if err != nil {

View File

@ -0,0 +1,6 @@
package downloader
type Config struct {
// EC模式的Object的条带缓存数量
MaxStripCacheCount int `json:"maxStripCacheCount"`
}

View File

@ -0,0 +1,123 @@
package downloader
import (
"fmt"
"io"
lru "github.com/hashicorp/golang-lru/v2"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
const (
DefaultMaxStripCacheCount = 128
)
type DownloadIterator = iterator.Iterator[*Downloading]
type DownloadReqeust struct {
ObjectID cdssdk.ObjectID
Offset int64
Length int64
}
type downloadReqeust2 struct {
Detail *stgmod.ObjectDetail
Raw DownloadReqeust
}
type Downloading struct {
Object *cdssdk.Object
File io.ReadCloser // 文件流如果文件不存在那么为nil
Request DownloadReqeust
}
type Downloader struct {
strips *StripCache
}
func NewDownloader(cfg Config) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}
ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
return Downloader{
strips: ch,
}
}
func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
objIDs := make([]cdssdk.ObjectID, len(reqs))
for i, req := range reqs {
objIDs[i] = req.ObjectID
}
if len(objIDs) == 0 {
return iterator.Empty[*Downloading]()
}
objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs))
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
}
req2s := make([]downloadReqeust2, len(reqs))
for i, req := range reqs {
req2s[i] = downloadReqeust2{
Detail: objDetails.Objects[i],
Raw: req,
}
}
return NewDownloadObjectIterator(d, req2s)
}
func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID))
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
}
req2s := make([]downloadReqeust2, len(pkgDetail.Objects))
for i, objDetail := range pkgDetail.Objects {
dt := objDetail
req2s[i] = downloadReqeust2{
Detail: &dt,
Raw: DownloadReqeust{
ObjectID: objDetail.Object.ObjectID,
Offset: 0,
Length: objDetail.Object.Size,
},
}
}
return NewDownloadObjectIterator(d, req2s)
}
type ObjectECStrip struct {
Data []byte
ObjectFileHash string // 添加这条缓存时Object的FileHash
}
type ECStripKey struct {
ObjectID cdssdk.ObjectID
StripPosition int64
}
type StripCache = lru.Cache[ECStripKey, ObjectECStrip]

View File

@ -0,0 +1,148 @@
package downloader
import (
"fmt"
"io"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
)
type IPFSReader struct {
node cdssdk.Node
fileHash string
stream io.ReadCloser
offset int64
}
func NewIPFSReader(node cdssdk.Node, fileHash string) *IPFSReader {
return &IPFSReader{
node: node,
fileHash: fileHash,
}
}
func NewIPFSReaderWithRange(node cdssdk.Node, fileHash string, rng ipfs.ReadOption) io.ReadCloser {
str := &IPFSReader{
node: node,
fileHash: fileHash,
}
str.Seek(rng.Offset, io.SeekStart)
if rng.Length > 0 {
return io2.Length(str, rng.Length)
}
return str
}
func (r *IPFSReader) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekEnd {
return 0, fmt.Errorf("seek end not supported")
}
if whence == io.SeekCurrent {
return 0, fmt.Errorf("seek current not supported")
}
if r.stream == nil {
r.offset = offset
return r.offset, nil
}
// 如果文件流已经打开那么如果seek的位置和当前位置不同那么需要重新打开文件流
if offset != r.offset {
var err error
r.stream.Close()
r.offset = offset
r.stream, err = r.openStream()
if err != nil {
return 0, fmt.Errorf("reopen stream: %w", err)
}
}
return r.offset, nil
}
func (r *IPFSReader) Read(buf []byte) (int, error) {
if r.stream == nil {
var err error
r.stream, err = r.openStream()
if err != nil {
return 0, err
}
}
n, err := r.stream.Read(buf)
r.offset += int64(n)
return n, err
}
func (r *IPFSReader) Close() error {
if r.stream != nil {
return r.stream.Close()
}
return nil
}
func (r *IPFSReader) openStream() (io.ReadCloser, error) {
if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file")
reader, err := r.fromLocalIPFS()
if err == nil {
return reader, nil
}
logger.Warnf("download from local IPFS failed, so try to download from node %v, err: %s", r.node.Name, err.Error())
}
return r.fromNode()
}
func (r *IPFSReader) fromNode() (io.ReadCloser, error) {
planBld := plans.NewPlanBuilder()
fileStr := planBld.AtAgent(r.node).IPFSRead(r.fileHash, ipfs.ReadOption{
Offset: r.offset,
Length: -1,
}).ToExecutor()
plan, err := planBld.Build()
if err != nil {
return nil, fmt.Errorf("building plan: %w", err)
}
waiter, err := plans.Execute(*plan)
if err != nil {
return nil, fmt.Errorf("execute plan: %w", err)
}
go func() {
waiter.Wait()
}()
return waiter.ReadStream(fileStr)
}
func (r *IPFSReader) fromLocalIPFS() (io.ReadCloser, error) {
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}
reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{
Offset: r.offset,
Length: -1,
})
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}
reader = io2.AfterReadClosed(reader, func(io.ReadCloser) {
ipfsCli.Close()
})
return reader, nil
}

View File

@ -1,4 +1,4 @@
package iterator
package downloader
import (
"fmt"
@ -9,26 +9,23 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
type IterDownloadingObject struct {
Object cdssdk.Object
File io.ReadCloser
}
var errNoDirectReadBlock = fmt.Errorf("no direct read block")
type DownloadNodeInfo struct {
Node cdssdk.Node
@ -43,23 +40,23 @@ type DownloadContext struct {
type DownloadObjectIterator struct {
OnClosing func()
objectDetails []stgmodels.ObjectDetail
currentIndex int
inited bool
downloader *Downloader
reqs []downloadReqeust2
currentIndex int
inited bool
downloadCtx *DownloadContext
coorCli *coormq.Client
allNodes map[cdssdk.NodeID]cdssdk.Node
coorCli *coormq.Client
allNodes map[cdssdk.NodeID]cdssdk.Node
}
func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
func NewDownloadObjectIterator(downloader *Downloader, downloadObjs []downloadReqeust2) *DownloadObjectIterator {
return &DownloadObjectIterator{
objectDetails: objectDetails,
downloadCtx: downloadCtx,
downloader: downloader,
reqs: downloadObjs,
}
}
func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) {
if !i.inited {
if err := i.init(); err != nil {
return nil, err
@ -68,8 +65,8 @@ func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
i.inited = true
}
if i.currentIndex >= len(i.objectDetails) {
return nil, ErrNoMoreItem
if i.currentIndex >= len(i.reqs) {
return nil, iterator.ErrNoMoreItem
}
item, err := i.doMove()
@ -85,12 +82,12 @@ func (i *DownloadObjectIterator) init() error {
i.coorCli = coorCli
allNodeIDs := make(map[cdssdk.NodeID]bool)
for _, obj := range i.objectDetails {
for _, p := range obj.PinnedAt {
for _, obj := range i.reqs {
for _, p := range obj.Detail.PinnedAt {
allNodeIDs[p] = true
}
for _, b := range obj.Blocks {
for _, b := range obj.Detail.Blocks {
allNodeIDs[b.NodeID] = true
}
}
@ -108,45 +105,55 @@ func (i *DownloadObjectIterator) init() error {
return nil
}
func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) {
obj := iter.objectDetails[iter.currentIndex]
func (iter *DownloadObjectIterator) doMove() (*Downloading, error) {
req := iter.reqs[iter.currentIndex]
if req.Detail == nil {
return &Downloading{
Object: nil,
File: nil,
Request: req.Raw,
}, nil
}
switch red := obj.Object.Redundancy.(type) {
switch red := req.Detail.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
reader, err := iter.downloadNoneOrRepObject(obj)
reader, err := iter.downloadNoneOrRepObject(req)
if err != nil {
return nil, fmt.Errorf("downloading object: %w", err)
}
return &IterDownloadingObject{
Object: obj.Object,
File: reader,
return &Downloading{
Object: &req.Detail.Object,
File: reader,
Request: req.Raw,
}, nil
case *cdssdk.RepRedundancy:
reader, err := iter.downloadNoneOrRepObject(obj)
reader, err := iter.downloadNoneOrRepObject(req)
if err != nil {
return nil, fmt.Errorf("downloading rep object: %w", err)
}
return &IterDownloadingObject{
Object: obj.Object,
File: reader,
return &Downloading{
Object: &req.Detail.Object,
File: reader,
Request: req.Raw,
}, nil
case *cdssdk.ECRedundancy:
reader, err := iter.downloadECObject(obj, red)
reader, err := iter.downloadECObject(req, red)
if err != nil {
return nil, fmt.Errorf("downloading ec object: %w", err)
}
return &IterDownloadingObject{
Object: obj.Object,
File: reader,
return &Downloading{
Object: &req.Detail.Object,
File: reader,
Request: req.Raw,
}, nil
}
return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(req.Detail.Object.Redundancy))
}
func (i *DownloadObjectIterator) Close() {
@ -155,15 +162,20 @@ func (i *DownloadObjectIterator) Close() {
}
}
func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(obj)
if err != nil {
return nil, err
}
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash)
return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
Length: obj.Raw.Length,
}), nil
}
// bsc >= osc如果osc是MaxFloat64那么bsc也一定是也就意味着没有足够块来恢复文件
@ -171,48 +183,118 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.Object
return nil, fmt.Errorf("no node has this object")
}
return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
Length: obj.Raw.Length,
}), nil
}
func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(obj)
func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(req)
if err != nil {
return nil, err
}
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K)
osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K)
if bsc < osc {
var fileStrs []io.ReadCloser
rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, fmt.Errorf("new rs: %w", err)
}
for i, b := range blocks {
str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, fmt.Errorf("donwloading file: %w", err)
}
var fileStrs []*IPFSReader
for _, b := range blocks {
str := NewIPFSReader(b.Node, b.Block.FileHash)
fileStrs = append(fileStrs, str)
}
fileReaders, filesCloser := myio.ToReaders(fileStrs)
var indexes []int
for _, b := range blocks {
indexes = append(indexes, b.Block.Index)
rs, err := ec.NewRs(ecRed.K, ecRed.N)
if err != nil {
return nil, fmt.Errorf("new rs: %w", err)
}
outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), nil
pr, pw := io.Pipe()
go func() {
defer func() {
for _, str := range fileStrs {
str.Close()
}
}()
readPos := req.Raw.Offset
totalReadLen := req.Detail.Object.Size - req.Raw.Offset
if req.Raw.Length >= 0 {
totalReadLen = req.Raw.Length
}
for totalReadLen > 0 {
curStripPos := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize)
curStripPosInBytes := curStripPos * int64(ecRed.K) * int64(ecRed.ChunkSize)
nextStripPosInBytes := (curStripPos + 1) * int64(ecRed.K) * int64(ecRed.ChunkSize)
curReadLen := math2.Min(totalReadLen, nextStripPosInBytes-readPos)
readRelativePos := readPos - curStripPosInBytes
cacheKey := ECStripKey{
ObjectID: req.Detail.Object.ObjectID,
StripPosition: curStripPos,
}
cache, ok := iter.downloader.strips.Get(cacheKey)
if ok {
if cache.ObjectFileHash == req.Detail.Object.FileHash {
err := io2.WriteAll(pw, cache.Data[readRelativePos:readRelativePos+curReadLen])
if err != nil {
pw.CloseWithError(err)
break
}
totalReadLen -= curReadLen
readPos += curReadLen
continue
}
// 如果Object的Hash和Cache的Hash不一致说明Cache是无效的需要重新下载
iter.downloader.strips.Remove(cacheKey)
}
for _, str := range fileStrs {
_, err := str.Seek(curStripPosInBytes, io.SeekStart)
if err != nil {
pw.CloseWithError(err)
break
}
}
dataBuf := make([]byte, int64(ecRed.K*ecRed.ChunkSize))
blockArrs := make([][]byte, ecRed.N)
for _, b := range blocks {
if b.Block.Index < ecRed.K {
blockArrs[b.Block.Index] = dataBuf[b.Block.Index*ecRed.ChunkSize : (b.Block.Index+1)*ecRed.ChunkSize]
} else {
blockArrs[b.Block.Index] = make([]byte, ecRed.ChunkSize)
}
}
err := sync2.ParallelDo(blocks, func(b downloadBlock, idx int) error {
_, err := io.ReadFull(fileStrs[idx], blockArrs[b.Block.Index])
return err
})
if err != nil {
pw.CloseWithError(err)
break
}
err = rs.ReconstructData(blockArrs)
if err != nil {
pw.CloseWithError(err)
break
}
iter.downloader.strips.Add(cacheKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: req.Detail.Object.FileHash,
})
// 下次循环就能从Cache中读取数据
}
}()
return pr, nil
}
// bsc >= osc如果osc是MaxFloat64那么bsc也一定是也就意味着没有足够块来恢复文件
@ -220,24 +302,27 @@ func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail,
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
}
return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{
Offset: req.Raw.Offset,
Length: req.Raw.Length,
}), nil
}
func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]*DownloadNodeInfo, error) {
var nodeIDs []cdssdk.NodeID
for _, id := range obj.PinnedAt {
for _, id := range req.Detail.PinnedAt {
if !lo.Contains(nodeIDs, id) {
nodeIDs = append(nodeIDs, id)
}
}
for _, b := range obj.Blocks {
for _, b := range req.Detail.Blocks {
if !lo.Contains(nodeIDs, b.NodeID) {
nodeIDs = append(nodeIDs, b.NodeID)
}
}
downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo)
for _, id := range obj.PinnedAt {
for _, id := range req.Detail.PinnedAt {
node, ok := downloadNodeMap[id]
if !ok {
mod := iter.allNodes[id]
@ -252,7 +337,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail
node.ObjectPinned = true
}
for _, b := range obj.Blocks {
for _, b := range req.Detail.Blocks {
node, ok := downloadNodeMap[b.NodeID]
if !ok {
mod := iter.allNodes[b.NodeID]
@ -326,62 +411,3 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {
return consts.NodeDistanceOther
}
func downloadFile(ctx *DownloadContext, node cdssdk.Node, fileHash string) (io.ReadCloser, error) {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.ExternalIP
grpcPort := node.ExternalGRPCPort
if node.LocationID == stgglb.Local.LocationID {
nodeIP = node.LocalIP
grpcPort = node.LocalGRPCPort
logger.Infof("client and node %d are at the same location, use local ip", node.NodeID)
}
if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file")
reader, err := downloadFromLocalIPFS(ctx, fileHash)
if err == nil {
return reader, nil
}
logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}
return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash)
}
func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return nil, fmt.Errorf("new agent grpc client: %w", err)
}
reader, err := agtCli.GetIPFSFile(fileHash)
if err != nil {
return nil, fmt.Errorf("getting ipfs file: %w", err)
}
reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
agtCli.Close()
})
return reader, nil
}
func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}
reader, err := ipfsCli.OpenRead(fileHash)
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}
reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
ipfsCli.Close()
})
return reader, nil
}

View File

@ -1,38 +1,44 @@
package ec
import (
"fmt"
"os"
"github.com/baohan10/reedsolomon"
"github.com/klauspost/reedsolomon"
)
type rs struct {
r *(reedsolomon.ReedSolomon)
ecN int
ecK int
ecP int
type Rs struct {
encoder reedsolomon.Encoder
ecN int
ecK int
ecP int
}
func NewRsEnc(ecK int, ecN int) *rs {
enc := rs{
ecN: ecN,
ecK: ecK,
ecP: ecN - ecK,
func NewRs(k int, n int) (*Rs, error) {
enc := Rs{
ecN: n,
ecK: k,
ecP: n - k,
}
enc.r = reedsolomon.GetReedSolomonIns(ecK, ecN)
return &enc
}
func (r *rs) Encode(all [][]byte) {
r.r.Encode(all)
encoder, err := reedsolomon.New(k, n-k)
enc.encoder = encoder
return &enc, err
}
func (r *rs) Repair(all [][]byte) error {
return r.r.Reconstruct(all)
}
func checkErr(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s", err.Error())
// 任意k个块恢复出所有原始的数据块。
// blocks的长度必须为N且至少有K个元素不为nil
func (r *Rs) ReconstructData(blocks [][]byte) error {
outIndexes := make([]int, r.ecK)
for i := 0; i < r.ecK; i++ {
outIndexes[i] = i
}
return r.ReconstructAny(blocks, outIndexes)
}
// 重建指定的任意块,可以是数据块或校验块。
// 在input上原地重建因此input的长度必须为N。
func (r *Rs) ReconstructAny(blocks [][]byte, outBlockIdxes []int) error {
required := make([]bool, len(blocks))
for _, idx := range outBlockIdxes {
required[idx] = true
}
return r.encoder.ReconstructAny(blocks, required)
}

View File

@ -11,7 +11,7 @@ import (
func Test_EncodeReconstruct(t *testing.T) {
Convey("编码后使用校验块重建数据", t, func() {
rs, err := NewRs(2, 3, 5)
rs, err := NewStreamRs(2, 3, 5)
So(err, ShouldBeNil)
outputs := rs.EncodeAll([]io.Reader{

View File

@ -4,10 +4,10 @@ import (
"io"
"github.com/klauspost/reedsolomon"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
)
type Rs struct {
type StreamRs struct {
encoder reedsolomon.Encoder
ecN int
ecK int
@ -15,8 +15,8 @@ type Rs struct {
chunkSize int
}
func NewRs(k int, n int, chunkSize int) (*Rs, error) {
enc := Rs{
func NewStreamRs(k int, n int, chunkSize int) (*StreamRs, error) {
enc := StreamRs{
ecN: n,
ecK: k,
ecP: n - k,
@ -28,7 +28,7 @@ func NewRs(k int, n int, chunkSize int) (*Rs, error) {
}
// 编码。仅输出校验块
func (r *Rs) Encode(input []io.Reader) []io.ReadCloser {
func (r *StreamRs) Encode(input []io.Reader) []io.ReadCloser {
outReaders := make([]io.ReadCloser, r.ecP)
outWriters := make([]*io.PipeWriter, r.ecP)
for i := 0; i < r.ecP; i++ {
@ -60,7 +60,7 @@ func (r *Rs) Encode(input []io.Reader) []io.ReadCloser {
//输出到outWriter
for i := range outWriters {
err := myio.WriteAll(outWriters[i], chunks[i+r.ecK])
err := io2.WriteAll(outWriters[i], chunks[i+r.ecK])
if err != nil {
closeErr = err
break loop
@ -77,7 +77,7 @@ func (r *Rs) Encode(input []io.Reader) []io.ReadCloser {
}
// 编码。输出包含所有的数据块和校验块
func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser {
func (r *StreamRs) EncodeAll(input []io.Reader) []io.ReadCloser {
outReaders := make([]io.ReadCloser, r.ecN)
outWriters := make([]*io.PipeWriter, r.ecN)
for i := 0; i < r.ecN; i++ {
@ -109,7 +109,7 @@ func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser {
//输出到outWriter
for i := range outWriters {
err := myio.WriteAll(outWriters[i], chunks[i])
err := io2.WriteAll(outWriters[i], chunks[i])
if err != nil {
closeErr = err
break loop
@ -126,7 +126,7 @@ func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser {
}
// 降级读任意k个块恢复出所有原始的数据块。
func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser {
func (r *StreamRs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser {
outIndexes := make([]int, r.ecK)
for i := 0; i < r.ecK; i++ {
outIndexes[i] = i
@ -137,7 +137,7 @@ func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadClose
// 修复任意k个块恢复指定的数据块。
// 调用者应该保证input的每一个流长度相同且均为chunkSize的整数倍
func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser {
func (r *StreamRs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser {
outReaders := make([]io.ReadCloser, len(outBlockIdx))
outWriters := make([]*io.PipeWriter, len(outBlockIdx))
for i := 0; i < len(outBlockIdx); i++ {
@ -181,7 +181,7 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []
//输出到outWriter
for i := range outBlockIdx {
err := myio.WriteAll(outWriters[i], chunks[outBlockIdx[i]])
err := io2.WriteAll(outWriters[i], chunks[outBlockIdx[i]])
if err != nil {
closeErr = err
break loop
@ -204,7 +204,7 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []
// 重建任意块,包括数据块和校验块。
// 当前的实现会把不需要的块都重建出来,所以应该避免使用这个函数。
func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser {
func (r *StreamRs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser {
outReaders := make([]io.ReadCloser, len(outBlockIdxes))
outWriters := make([]*io.PipeWriter, len(outBlockIdxes))
for i := 0; i < len(outBlockIdxes); i++ {
@ -250,7 +250,7 @@ func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes
for i := range outBlockIdxes {
outIndex := outBlockIdxes[i]
err := myio.WriteAll(outWriters[i], chunks[outIndex])
err := io2.WriteAll(outWriters[i], chunks[outIndex])
if err != nil {
closeErr = err
break loop

View File

@ -5,7 +5,7 @@ import (
"io"
"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -34,7 +34,7 @@ func (o *ChunkedJoin) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error
fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),

View File

@ -4,7 +4,7 @@ import (
"io"
"sync"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -24,7 +24,7 @@ func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) erro
defer str[0].Stream.Close()
wg := sync.WaitGroup{}
outputs := myio.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, myio.ChunkedSplitOption{
outputs := io2.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, io2.ChunkedSplitOption{
PaddingZeros: o.PaddingZeros,
})
@ -33,7 +33,7 @@ func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) erro
sw.StreamReady(planID, ioswitch.NewStream(
o.OutputIDs[i],
myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
}),
))

View File

@ -4,7 +4,7 @@ import (
"io"
"sync"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -21,13 +21,13 @@ func (o *Clone) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
defer strs[0].Stream.Close()
wg := sync.WaitGroup{}
cloned := myio.Clone(strs[0].Stream, len(o.OutputIDs))
cloned := io2.Clone(strs[0].Stream, len(o.OutputIDs))
for i, s := range cloned {
wg.Add(1)
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputIDs[i],
myio.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
wg.Done()
}),
),

View File

@ -6,7 +6,7 @@ import (
"sync"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -20,7 +20,7 @@ type ECReconstructAny struct {
}
func (o *ECReconstructAny) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
if err != nil {
return fmt.Errorf("new ec: %w", err)
}
@ -45,7 +45,7 @@ func (o *ECReconstructAny) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID)
wg := sync.WaitGroup{}
for i, id := range o.OutputIDs {
wg.Add(1)
sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
sw.StreamReady(planID, ioswitch.NewStream(id, io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
})))
}
@ -62,7 +62,7 @@ type ECReconstruct struct {
}
func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
if err != nil {
return fmt.Errorf("new ec: %w", err)
}
@ -87,7 +87,7 @@ func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) err
wg := sync.WaitGroup{}
for i, id := range o.OutputIDs {
wg.Add(1)
sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
sw.StreamReady(planID, ioswitch.NewStream(id, io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
})))
}

View File

@ -8,7 +8,7 @@ import (
"path"
"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -56,7 +56,7 @@ func (o *FileRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
}
fut := future.NewSetVoid()
sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, myio.AfterReadClosed(file, func(closer io.ReadCloser) {
sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, io2.AfterReadClosed(file, func(closer io.ReadCloser) {
fut.SetVoid()
})))

View File

@ -8,7 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -66,7 +66,7 @@ func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
}
fut := future.NewSetVoid()
str = myio.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid()
})

View File

@ -6,8 +6,9 @@ import (
"io"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -15,6 +16,7 @@ import (
type IPFSRead struct {
Output ioswitch.StreamID `json:"output"`
FileHash string `json:"fileHash"`
Option ipfs.ReadOption `json:"option"`
}
func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
@ -30,13 +32,13 @@ func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
}
defer stgglb.IPFSPool.Release(ipfsCli)
file, err := ipfsCli.OpenRead(o.FileHash)
file, err := ipfsCli.OpenRead(o.FileHash, o.Option)
if err != nil {
return fmt.Errorf("reading ipfs: %w", err)
}
fut := future.NewSetVoid()
file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
file = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})

View File

@ -5,7 +5,7 @@ import (
"io"
"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -34,7 +34,7 @@ func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) {
io2.AfterReadClosedOnce(io2.Length(io2.Join(strReaders), o.Length), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),

View File

@ -5,7 +5,7 @@ import (
"io"
"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@ -25,7 +25,7 @@ func (o *Length) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.Length(strs[0].Stream, o.Length), func(closer io.ReadCloser) {
io2.AfterReadClosedOnce(io2.Length(strs[0].Stream, o.Length), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),

View File

@ -1,6 +1,7 @@
package plans
import (
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
@ -59,7 +60,15 @@ func (s *AgentStream) GRPCSend(node cdssdk.Node) *AgentStream {
return agtStr
}
func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStream {
opt := ipfs.ReadOption{
Offset: 0,
Length: -1,
}
if len(opts) > 0 {
opt = opts[0]
}
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
@ -68,6 +77,7 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
b.ops = append(b.ops, &ops.IPFSRead{
Output: agtStr.info.ID,
FileHash: fileHash,
Option: opt,
})
return agtStr

View File

@ -9,7 +9,7 @@ import (
"sync/atomic"
"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@ -76,8 +76,15 @@ func Execute(plan ComposedPlan) (*Executor, error) {
}
func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error {
// TODO 根据地域选择IP
agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort)
// TODO 考虑不使用stgglb的Local
nodeIP := info.toNode.ExternalIP
grpcPort := info.toNode.ExternalGRPCPort
if info.toNode.LocationID == stgglb.Local.LocationID {
nodeIP = info.toNode.LocalIP
grpcPort = info.toNode.LocalGRPCPort
}
agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
@ -87,8 +94,15 @@ func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error
}
func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) {
// TODO 根据地域选择IP
agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort)
// TODO 考虑不使用stgglb的Local
nodeIP := info.fromNode.ExternalIP
grpcPort := info.fromNode.ExternalGRPCPort
if info.fromNode.LocationID == stgglb.Local.LocationID {
nodeIP = info.fromNode.LocalIP
grpcPort = info.fromNode.LocalGRPCPort
}
agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return nil, fmt.Errorf("new agent rpc client: %w", err)
}
@ -98,7 +112,7 @@ func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) {
return nil, err
}
return myio.AfterReadClosed(str, func(closer io.ReadCloser) {
return io2.AfterReadClosed(str, func(closer io.ReadCloser) {
stgglb.AgentRPCPool.Release(agtCli)
}), nil
}

View File

@ -64,12 +64,12 @@ type GetPackageObjectDetailsResp struct {
Objects []stgmod.ObjectDetail `json:"objects"`
}
func NewGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails {
func ReqGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails {
return &GetPackageObjectDetails{
PackageID: packageID,
}
}
func NewGetPackageObjectDetailsResp(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp {
func RespPackageObjectDetails(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp {
return &GetPackageObjectDetailsResp{
Objects: objects,
}

View File

@ -63,7 +63,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails)
return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
}
return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details))
return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
}
func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {

12
go.mod
View File

@ -5,14 +5,13 @@ go 1.20
replace gitlink.org.cn/cloudream/common v0.0.0 => ../common
require (
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7
github.com/beevik/etree v1.2.0
github.com/gin-gonic/gin v1.9.1
github.com/go-ping/ping v1.1.0
github.com/go-sql-driver/mysql v1.7.1
github.com/ipfs/go-ipfs-api v0.7.0
github.com/google/uuid v1.3.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/reedsolomon v1.11.8
github.com/magefile/mage v1.15.0
github.com/samber/lo v1.38.1
github.com/smartystreets/goconvey v1.8.1
@ -40,17 +39,16 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/ipfs/boxo v0.12.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-ipfs-api v0.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
@ -71,7 +69,6 @@ require (
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/otiai10/copy v1.12.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
@ -91,7 +88,6 @@ require (
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect

21
go.sum
View File

@ -1,9 +1,5 @@
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR///dFvb9cRodx5SGbPH4G4jPjw+aVIWkAKE=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8=
github.com/beevik/etree v1.2.0 h1:l7WETslUG/T+xOPs47dtd6jov2Ii/8/OjCldk5fYfQw=
github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
@ -33,8 +29,6 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/go-ping/ping v1.1.0 h1:3MCGhVX4fyEUuhsfwPrsEdQw6xspHkv5zHsiSoDFZYw=
github.com/go-ping/ping v1.1.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
@ -57,17 +51,17 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ=
@ -110,7 +104,6 @@ github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
@ -140,9 +133,6 @@ github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3d
github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -212,27 +202,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=

View File

@ -64,7 +64,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID))
getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
if err != nil {
log.Warnf("getting package objects: %s", err.Error())
return

View File

@ -12,7 +12,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
mymath "gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@ -58,7 +58,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID))
getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
if err != nil {
log.Warnf("getting package objects: %s", err.Error())
return
@ -674,7 +674,7 @@ func (t *CleanPinned) calcMinAccessCost(state *annealingState) float64 {
// 下面的if会在拿到k个块之后跳出循环所以or多了块也没关系
gotBlocks.Or(tarNodeMp)
// 但是算读取块的消耗时不能多算最多算读了k个块的消耗
willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth)
willGetBlocks := math2.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth)
thisCost += float64(willGetBlocks) * float64(tar.Distance)
if gotBlocks.Weight() >= state.object.minBlockCnt {