JCS-pub/common/pkgs/storage/s3/shard_store.go

231 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package s3
import (
"context"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/math2"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)
type ShardStoreOption struct {
UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验如果不行则使用本地计算。默认使用本地计算。
}
type ShardStore struct {
Detail *clitypes.UserSpaceDetail
Bucket string
workingDir clitypes.JPath
cli *s3.Client
opt ShardStoreOption
lock sync.Mutex
}
func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) {
wd := detail.UserSpace.WorkingDir.Clone()
wd.Push(types.ShardStoreWorkingDir)
return &ShardStore{
Detail: detail,
Bucket: bkt,
workingDir: wd,
cli: cli,
opt: opt,
}, nil
}
func (s *ShardStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("start, root: %v", s.workingDir)
}
func (s *ShardStore) Stop() {
s.getLogger().Infof("component stop")
}
func (s *ShardStore) Store(path clitypes.JPath, hash clitypes.FileHash, size int64) (types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()
log := s.getLogger()
log.Debugf("write file %v finished, size: %v, hash: %v", path, size, hash)
newPath := s.GetFilePathFromHash(hash)
_, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{
Bucket: aws.String(s.Bucket),
CopySource: aws.String(JoinKey(s.Bucket, path.String())),
Key: aws.String(newPath.String()),
})
if err != nil {
log.Warnf("copy file %v to %v: %v", path, newPath, err)
return types.FileInfo{}, err
}
return types.FileInfo{
Hash: hash,
Size: size,
Path: newPath,
}, nil
}
func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()
filePath := s.GetFilePathFromHash(hash)
info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(filePath.String()),
})
if err != nil {
s.getLogger().Warnf("get file %v: %v", filePath, err)
return types.FileInfo{}, err
}
return types.FileInfo{
Hash: hash,
Size: *info.ContentLength,
Path: filePath,
}, nil
}
func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()
var infos []types.FileInfo
var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(s.workingDir.String()),
Marker: marker,
})
if err != nil {
s.getLogger().Warnf("list objects: %v", err)
return nil, err
}
for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
fileHash, err := clitypes.ParseHash(key)
if err != nil {
continue
}
infos = append(infos, types.FileInfo{
Hash: fileHash,
Size: *obj.Size,
Path: clitypes.PathFromJcsPathString(*obj.Key),
})
}
if !*resp.IsTruncated {
break
}
marker = resp.NextMarker
}
return infos, nil
}
func (s *ShardStore) GC(avaiables []clitypes.FileHash) error {
s.lock.Lock()
defer s.lock.Unlock()
avais := make(map[clitypes.FileHash]bool)
for _, hash := range avaiables {
avais[hash] = true
}
var deletes []s3types.ObjectIdentifier
var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(s.workingDir.String()),
Marker: marker,
})
if err != nil {
s.getLogger().Warnf("list objects: %v", err)
return err
}
for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
fileHash, err := clitypes.ParseHash(key)
if err != nil {
continue
}
if !avais[fileHash] {
deletes = append(deletes, s3types.ObjectIdentifier{
Key: obj.Key,
})
}
}
if !*resp.IsTruncated {
break
}
marker = resp.NextMarker
}
totalCnt := len(deletes)
for len(deletes) > 0 {
cnt := math2.Min(500, len(deletes))
_, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String(s.Bucket),
Delete: &s3types.Delete{
Objects: deletes[:cnt],
},
})
if err != nil {
s.getLogger().Warnf("delete objects: %v", err)
return err
}
deletes = deletes[cnt:]
}
s.getLogger().Infof("purge %d files", totalCnt)
// TODO 无法保证原子性,所以删除失败只打日志
return nil
}
func (s *ShardStore) Stats() types.Stats {
// TODO 统计本地存储的相关信息
return types.Stats{
Status: types.StatusOK,
}
}
func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("ShardStore", "S3").WithField("UserSpace", s.Detail)
}
func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) clitypes.JPath {
p := s.workingDir.Clone()
p.Push(hash.GetHashPrefix(2))
return p
}
func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) clitypes.JPath {
p := s.workingDir.Clone()
p.Push(hash.GetHashPrefix(2))
p.Push(string(hash))
return p
}