设计缓存等级机制;解决上传到远端后无法设置修改时间的问题

This commit is contained in:
Sydonian 2025-04-16 16:35:29 +08:00
parent 8a587c1095
commit 5da0edff16
18 changed files with 571 additions and 277 deletions

View File

@ -359,7 +359,7 @@ type AddObjectEntry struct {
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash types.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
CreateTime time.Time `json:"createTime"` // 开始上传文件的时间
UserSpaceIDs []types.UserSpaceID `json:"userSpaceIDs"`
}
@ -394,8 +394,8 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID types.PackageID, adds []A
Size: adds[i].Size,
FileHash: adds[i].FileHash,
Redundancy: types.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: adds[i].UploadTime,
UpdateTime: adds[i].UploadTime,
CreateTime: adds[i].CreateTime,
UpdateTime: adds[i].CreateTime,
}
e, ok := existsObjsMap[adds[i].Path]
@ -566,6 +566,29 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, updates []UpdatingObje
return nil
}
func (*ObjectDB) BatchUpdateUpdateTimeByPath(ctx SQLContext, packageID types.PackageID, pathes []string, updateTimes []time.Time) error {
if len(pathes) != len(updateTimes) {
return fmt.Errorf("pathes and updateTimes must have the same length")
}
if len(pathes) == 0 {
return nil
}
sb := strings.Builder{}
args := make([]any, 0, len(pathes)*2+1)
sb.WriteString("UPDATE Object SET UpdateTime = CASE Path \n")
for i := range pathes {
sb.WriteString("WHEN ? THEN ? \n")
args = append(args, pathes[i], updateTimes[i])
}
sb.WriteString("END WHERE PackageID = ? AND Path IN (?)")
args = append(args, packageID, pathes)
return ctx.Exec(sb.String(), args...).Error
}
func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID types.PackageID, oldPrefix string, newPkgID types.PackageID, newPrefix string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", oldPkgID, escapeLike("", "%", oldPrefix)).
Updates(map[string]any{

View File

@ -134,7 +134,7 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) {
}
path = filepath.ToSlash(path)
err = up.Upload(path, file.Size, f)
err = up.Upload(path, f)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err)))

View File

@ -97,7 +97,11 @@ func (fi *fileInfo) Size() int64 {
}
func (fi *fileInfo) Mode() os.FileMode {
return os.FileMode(fi.entry.Mode())
if fi.entry.IsDir() {
return os.ModeDir | fi.entry.Perm()
}
return fi.entry.Perm()
}
// ModTime returns the modification time of the file

View File

@ -52,7 +52,7 @@ var _ fusefs.FileWriter = (*fileHandle)(nil)
// of a descriptor that was duplicated using dup(2), it may be called
// more than once for the same fileHandle.
func (f *fileHandle) Flush(ctx context.Context) syscall.Errno {
return translateError(f.hd.Close())
return translateError(f.hd.Flush())
}
var _ fusefs.FileFlusher = (*fileHandle)(nil)
@ -62,7 +62,7 @@ var _ fusefs.FileFlusher = (*fileHandle)(nil)
// so any cleanup that requires specific synchronization or
// could fail with I/O errors should happen in Flush instead.
func (f *fileHandle) Release(ctx context.Context) syscall.Errno {
return translateError(f.hd.Release())
return translateError(f.hd.Close())
}
var _ fusefs.FileReleaser = (*fileHandle)(nil)

View File

@ -57,13 +57,13 @@ func translateError(err error) syscall.Errno {
// get the Mode from a fs Node
func (v *Fuse) getMode(node FsEntry) uint32 {
Mode := node.Mode().Perm()
mode := node.Perm()
if node.IsDir() {
Mode |= fuse.S_IFDIR
mode |= fuse.S_IFDIR
} else {
Mode |= fuse.S_IFREG
mode |= fuse.S_IFREG
}
return uint32(Mode)
return uint32(mode)
}
// fill in attr from node

View File

@ -29,7 +29,7 @@ type FsStats struct {
type FsEntry interface {
Name() string
Size() int64
Mode() os.FileMode
Perm() os.FileMode
ModTime() time.Time
IsDir() bool
}
@ -66,7 +66,11 @@ type FileHandle interface {
Entry() FsFile
ReadAt(buf []byte, off int64) (int, error)
WriteAt(buf []byte, off int64) (int, error)
// 按GPT解释Flush会在文件描述符被Close时调用目的是将用户缓冲区的数据送到内核缓冲区。
// 要注意的是可以通过dup函数创建多个文件描述符而它们都引用这个文件句柄所以要小心Flush是可能会被多次调用的。
Flush() error
// 而这个函数是将内核缓冲区的数据写入到磁盘
Sync() error
// 这个函数只有在句柄的所有引用都被释放时才会被调用,目的是释放内核资源
Close() error
Release() error
}

View File

@ -32,9 +32,21 @@ type CacheEntry interface {
type CacheEntryInfo struct {
PathComps []string
Size int64
Mode os.FileMode
Perm os.FileMode
ModTime time.Time
IsDir bool
// 元数据版本号
MetaRevision int
// 文件数据版本号
DataRevision int
// 引用计数
RefCount int
// 上次引用计数归零的时间
FreeTime time.Time
// 缓存等级
Level CacheLevel
// 缓存等级改变时间
ChangeLevelTime time.Time
}
type Cache struct {
@ -92,10 +104,12 @@ func (c *Cache) Dump() CacheStatus {
return trie.VisitContinue
}
info := node.Value.Info()
activeFiles = append(activeFiles, CacheFileStatus{
Path: filepath.Join(path...),
RefCount: node.Value.state.refCount,
IsLoaded: node.Value.state.isLoaded,
RefCount: info.RefCount,
Level: info.Level.String(),
IsUploading: node.Value.state.uploading != nil,
})
return trie.VisitContinue
@ -162,7 +176,7 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile {
return nil
}
ch.state.refCount++
ch.IncRef()
c.activeCache.CreateWords(pathComps).Value = ch
logger.Debugf("create new cache file %v", pathComps)
@ -178,21 +192,14 @@ func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
node, ok := c.activeCache.WalkEnd(pathComps)
if ok && node.Value != nil {
if !node.Value.state.isLoaded {
err := node.Value.Load()
if err != nil {
logger.Warnf("load cache %v: %v", pathComps, err)
return nil
}
}
node.Value.IncRef()
return node.Value
}
ch, err := loadCacheFile(c, pathComps)
if err == nil {
ch.remoteObj = obj
ch.state.refCount++
ch.IncRef()
c.activeCache.CreateWords(pathComps).Value = ch
logger.Debugf("load cache %v", pathComps)
@ -215,7 +222,7 @@ func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
return nil
}
ch.state.refCount++
ch.IncRef()
c.activeCache.CreateWords(pathComps).Value = ch
logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
@ -439,7 +446,7 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error {
return nil
}
type uploadingPackage struct {
type syncPackage struct {
bktName string
pkgName string
pkg clitypes.Package
@ -449,11 +456,20 @@ type uploadingPackage struct {
type uploadingObject struct {
pathComps []string
cache *CacheFile
reader *CacheFileHandle
reader *CacheFileHandle
modTime time.Time
metaRevision int
isDeleted bool
isSuccess bool
}
type packageFullName struct {
bktName string
pkgName string
}
func (c *Cache) scanningCache() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
@ -472,12 +488,7 @@ func (c *Cache) scanningCache() {
c.lock.Lock()
type packageFullName struct {
bktName string
pkgName string
}
uploadingPkgs := make(map[packageFullName]*uploadingPackage)
uploadingPkgs := make(map[packageFullName]*syncPackage)
visitCnt := 0
visitBreak := false
@ -489,58 +500,16 @@ func (c *Cache) scanningCache() {
return trie.VisitContinue
}
if ch.state.refCount > 0 {
logger.Debugf("skip cache %v, refCount: %v", path, ch.state.refCount)
info := ch.Info()
if info.RefCount > 0 {
logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
return trie.VisitContinue
}
visitCnt++
shouldUpload := true
// 不存放在Package里的文件不需要上传
if len(ch.pathComps) <= 2 {
shouldUpload = false
}
if ch.Revision() > 0 && shouldUpload {
// 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
if time.Since(ch.state.freeTime) > c.cfg.UploadPendingTime && ch.state.uploading == nil {
fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
pkg, ok := uploadingPkgs[fullName]
if !ok {
pkg = &uploadingPackage{
bktName: ch.pathComps[0],
pkgName: ch.pathComps[1],
}
uploadingPkgs[fullName] = pkg
}
obj := &uploadingObject{
pathComps: lo2.ArrayClone(ch.pathComps),
cache: ch,
reader: ch.OpenReadWhenScanning(),
}
pkg.upObjs = append(pkg.upObjs, obj)
ch.state.uploading = obj
}
} else if ch.state.isLoaded {
// 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载
if time.Since(ch.state.freeTime) > c.cfg.CacheActiveTime {
ch.Unload()
ch.state.isLoaded = false
ch.state.unloadTime = time.Now()
}
} else {
// 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
if time.Since(ch.state.unloadTime) > c.cfg.CacheExpireTime {
// 如果文件已经同步到远端,则可以直接删除本地缓存
if ch.Revision() == 0 {
ch.Delete()
}
node.RemoveSelf(true)
}
}
c.visitNode(path, node, ch, info, uploadingPkgs)
// 每次最多遍历500个节点防止占用锁太久
if visitCnt > 500 {
@ -557,11 +526,80 @@ func (c *Cache) scanningCache() {
c.lock.Unlock()
if len(uploadingPkgs) > 0 {
go c.doUploading(lo.Values(uploadingPkgs))
go c.doSync(lo.Values(uploadingPkgs))
}
}
}
func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheFile, info CacheEntryInfo, uploadingPkgs map[packageFullName]*syncPackage) {
shouldUpload := true
// 不存放在Package里的文件不需要上传
if len(ch.pathComps) <= 2 {
shouldUpload = false
}
// 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) {
if time.Since(info.FreeTime) < c.cfg.UploadPendingTime {
return
}
if ch.state.uploading != nil {
return
}
fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
pkg, ok := uploadingPkgs[fullName]
if !ok {
pkg = &syncPackage{
bktName: ch.pathComps[0],
pkgName: ch.pathComps[1],
}
uploadingPkgs[fullName] = pkg
}
obj := &uploadingObject{
pathComps: lo2.ArrayClone(ch.pathComps),
cache: ch,
}
pkg.upObjs = append(pkg.upObjs, obj)
ch.state.uploading = obj
if info.DataRevision > 0 {
obj.reader = ch.OpenReadWhenScanning()
}
if info.MetaRevision > 0 {
obj.modTime = info.ModTime
obj.metaRevision = info.MetaRevision
}
return
}
// 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载
if info.Level > LevelReadOnly {
if time.Since(info.FreeTime) > c.cfg.CacheActiveTime {
ch.LevelDown(LevelReadOnly)
}
return
}
// 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
if info.Level <= LevelReadOnly {
// 需要同时满足距上次使用时间和距上次卸载时间超过配置的时间,才可以删除
if time.Since(info.FreeTime) > c.cfg.CacheExpireTime && time.Since(info.ChangeLevelTime) > c.cfg.CacheExpireTime {
// 如果文件已经同步到远端,则可以直接删除本地缓存
if info.MetaRevision == 0 && info.DataRevision == 0 {
ch.Delete()
}
node.RemoveSelf(true)
}
return
}
}
func (c *Cache) scanningData() {
ticker := time.NewTicker(c.cfg.ScanDataDirInterval)
defer ticker.Stop()
@ -655,10 +693,115 @@ func (c *Cache) scanningData() {
}
}
func (c *Cache) doUploading(pkgs []*uploadingPackage) {
func (c *Cache) doSync(pkgs []*syncPackage) {
var uploadPkgs []*syncPackage
var updateOnlyPkgs []*syncPackage
for _, p := range pkgs {
var updateOnly *syncPackage
var upload *syncPackage
for _, o := range p.upObjs {
if o.reader != nil {
if upload == nil {
upload = &syncPackage{
bktName: p.bktName,
pkgName: p.pkgName,
}
}
upload.upObjs = append(upload.upObjs, o)
} else {
if updateOnly == nil {
updateOnly = &syncPackage{
bktName: p.bktName,
pkgName: p.pkgName,
}
}
updateOnly.upObjs = append(updateOnly.upObjs, o)
}
}
if upload != nil {
uploadPkgs = append(uploadPkgs, upload)
}
if updateOnly != nil {
updateOnlyPkgs = append(updateOnlyPkgs, updateOnly)
}
}
// 先上传文件再更新文件元数据。上传文件时会创建Package这样后续更新元数据时就能查到Package。
if len(uploadPkgs) > 0 {
c.doUploading(uploadPkgs)
}
if len(updateOnlyPkgs) > 0 {
c.doUpdatingOnly(updateOnlyPkgs)
}
}
func (c *Cache) doUpdatingOnly(pkgs []*syncPackage) {
/// 1. 只是更新元数据那么就只尝试查询Package
var sucPkgs []*syncPackage
var failedPkgs []*syncPackage
for _, pkg := range pkgs {
p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
if err != nil {
logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
failedPkgs = append(failedPkgs, pkg)
continue
}
pkg.pkg = p
sucPkgs = append(sucPkgs, pkg)
}
/// 2. 对于创建失败的Package 在锁的保护下取消上传状态
c.lock.Lock()
for _, pkg := range failedPkgs {
for _, obj := range pkg.upObjs {
obj.cache.state.uploading = nil
}
}
c.lock.Unlock()
/// 3. 开始更新每个Package
for _, p := range sucPkgs {
pathes := make([]string, 0, len(p.upObjs))
modTimes := make([]time.Time, 0, len(p.upObjs))
for _, obj := range p.upObjs {
pathes = append(pathes, clitypes.JoinObjectPath(obj.pathComps[2:]...))
modTimes = append(modTimes, obj.modTime)
}
err := c.db.Object().BatchUpdateUpdateTimeByPath(c.db.DefCtx(), p.pkg.PackageID, pathes, modTimes)
if err != nil {
logger.Warnf("batch update package %v/%v: %v", p.bktName, p.pkgName, err)
c.lock.Lock()
for _, obj := range p.upObjs {
obj.cache.state.uploading = nil
}
c.lock.Unlock()
continue
}
logger.Infof("update %v object in package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
// 登记上传结果
c.lock.Lock()
for _, obj := range p.upObjs {
obj.cache.state.uploading = nil
obj.cache.RevisionUploaded(0, obj.metaRevision)
}
c.lock.Unlock()
}
}
func (c *Cache) doUploading(pkgs []*syncPackage) {
/// 1. 先尝试创建Package
var sucPkgs []*uploadingPackage
var failedPkgs []*uploadingPackage
var sucPkgs []*syncPackage
var failedPkgs []*syncPackage
for _, pkg := range pkgs {
p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName)
if err != nil {
@ -680,7 +823,6 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
}
}
c.lock.Unlock()
// 关闭文件必须在锁外
for _, pkg := range failedPkgs {
for _, obj := range pkg.upObjs {
obj.reader.Close()
@ -689,9 +831,9 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
/// 3. 开始上传每个Package
for _, p := range sucPkgs {
uploader, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
upder, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
if err != nil {
logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err)
logger.Warnf("begin upload package %v/%v: %v", p.bktName, p.pkgName, err)
// 取消上传状态
c.lock.Lock()
@ -719,7 +861,9 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
counter := io2.Counter(&rd)
err = uploader.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter)
err = upder.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter, uploader.UploadOption{
CreateTime: o.modTime,
})
if err != nil {
logger.Warnf("upload object %v: %v", o.pathComps, err)
upFailed++
@ -749,14 +893,14 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...)
if o.isDeleted {
uploader.CancelObject(oldPath)
upder.CancelObject(oldPath)
upCancel++
continue
}
// 如果对象移动到了另一个Package那么也要取消上传
if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) {
uploader.CancelObject(oldPath)
upder.CancelObject(oldPath)
upCancel++
continue
@ -764,19 +908,19 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
// 只有仍在同Package内移动的对象才能直接重命名
if newPath != oldPath {
uploader.RenameObject(oldPath, newPath)
upder.RenameObject(oldPath, newPath)
upRename++
}
sucObjs = append(sucObjs, o)
}
_, err = uploader.Commit()
_, err = upder.Commit()
if err != nil {
logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err)
} else {
for _, obj := range sucObjs {
obj.cache.RevisionUploaded(obj.reader.revision)
obj.cache.RevisionUploaded(obj.reader.revision, obj.metaRevision)
}
upTime := time.Since(upStartTime)
@ -786,7 +930,6 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
c.lock.Unlock()
// 在Cache锁以外关闭文件。
// 关闭文件会影响refCount所以无论是上传失败还是上传成功都会在等待一段时间后才进行下一阶段的操作
for _, obj := range p.upObjs {
obj.reader.Close()

View File

@ -88,7 +88,7 @@ func loadCacheDirInfo(c *Cache, pathComps []string, dataFileInfo os.FileInfo) (*
return &CacheEntryInfo{
PathComps: pathComps,
Size: 0,
Mode: dataFileInfo.Mode(),
Perm: dataFileInfo.Mode(),
ModTime: dataFileInfo.ModTime(),
IsDir: true,
}, nil
@ -122,7 +122,7 @@ func (f *CacheDir) Info() CacheEntryInfo {
return CacheEntryInfo{
PathComps: f.pathComps,
Size: 0,
Mode: f.perm,
Perm: f.perm,
ModTime: f.modTime,
IsDir: true,
}

View File

@ -17,12 +17,32 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
)
type CacheLevel int
const (
// 未加载
LevelNotLoaded CacheLevel = iota
// 缓存数据都完整,但仅加载了一次元数据,只能读取,不能修改。
LevelReadOnly
// 缓存数据都完整存在,但仅加载了元数据,没有加载文件数据
LevelMetaLoaded
// 缓存数据都完整存在,且已加载到内存中
LevelComplete
)
func (l CacheLevel) String() string {
var levels = []string{"NotLoaded", "ReadOnly", "MetaLoaded", "Complete"}
return levels[l]
}
type FileInfo struct {
// 文件总大小。可能会超过对应的远端文件的大小。
// 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。
Size int64
// 如果大于0则代表有未提交的修改
Revision int
// 文件数据的版本号。如果大于0则代表有未提交的修改
DataRevision int
// 文件元数据的版本号
MetaRevision int
// 数据段列表,按照段开始位置从小到大排列
Segments []*Range
// 文件对应的对象ID仅在文件是一个缓存文件时才有值
@ -87,9 +107,14 @@ type CacheFile struct {
readers []*CacheFileHandle
writers []*CacheFileHandle
saveMetaChan chan any
noSaveMeta bool // 防止在Unload之后又保存了文件
stopSaveChan chan any
isDeleted bool
level CacheLevel
refCount int
freeTime time.Time
changeLevelTime time.Time
metaFile *os.File
dataFile *os.File
writeLock *sync.RWMutex
@ -99,11 +124,7 @@ type CacheFile struct {
}
type cacheState struct {
refCount int
freeTime time.Time
unloadTime time.Time
isLoaded bool
uploading *uploadingObject
uploading *uploadingObject
}
func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
@ -111,9 +132,9 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
dataPath := cache.GetCacheDataPath(pathComps...)
info := FileInfo{
Revision: 1,
ModTime: time.Now(),
Perm: 0777,
DataRevision: 1,
ModTime: time.Now(),
Perm: 0755,
}
infoData, err := serder.ObjectToJSON(info)
@ -149,15 +170,15 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
info: info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
stopSaveChan: make(chan any),
level: LevelComplete,
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
state: cacheState{},
}
go ch.serving(ch.saveMetaChan)
go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
return ch, nil
}
@ -204,7 +225,8 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
info.ModTime = stat.ModTime()
info.Perm = stat.Mode().Perm()
info.Segments = []*Range{{Position: 0, Length: info.Size}}
info.Revision = 1 // 未同步的文件视为已修改
info.MetaRevision = 1 // 未同步的文件视为已修改
info.DataRevision = 1
} else {
err = serder.JSONToObjectStream(metaFile, info)
@ -220,15 +242,15 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
info: *info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
stopSaveChan: make(chan any),
level: LevelComplete,
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
state: cacheState{},
}
go ch.serving(ch.saveMetaChan)
go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
return ch, nil
}
@ -278,15 +300,15 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Obje
remoteObj: obj,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
stopSaveChan: make(chan any),
level: LevelComplete,
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
state: cacheState{},
}
go ch.serving(ch.saveMetaChan)
go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
return ch, nil
}
@ -305,7 +327,7 @@ func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInf
return &CacheEntryInfo{
PathComps: pathComps,
Size: info.Size,
Mode: info.Perm,
Perm: info.Perm,
ModTime: info.ModTime,
IsDir: false,
}, nil
@ -318,96 +340,182 @@ func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInf
return &CacheEntryInfo{
PathComps: pathComps,
Size: dataFileInfo.Size(),
Mode: dataFileInfo.Mode(),
Perm: dataFileInfo.Mode(),
ModTime: dataFileInfo.ModTime(),
IsDir: false,
}, nil
}
// 加载缓存文件。如果已经加载了,则无任何效果
func (f *CacheFile) Load() error {
// 增加一个引用计数。不应该被Cache之外的代码调用。
func (f *CacheFile) IncRef() {
f.rwLock.Lock()
defer f.rwLock.Unlock()
if f.isDeleted {
return fmt.Errorf("cache deleted")
}
metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
dataPath := f.cache.GetCacheDataPath(f.pathComps...)
metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
if err != nil {
return err
}
dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
if err != nil {
metaFile.Close()
return err
}
f.saveMetaChan = make(chan any)
f.noSaveMeta = false
f.metaFile = metaFile
f.dataFile = dataFile
go f.serving(f.saveMetaChan)
return nil
f.refCount++
}
// 关闭缓存文件,保存元数据。但缓存对象依然会留在内存里,以备随时查询元数据。
//
// 只应该在引用计数为0时调用。
func (f *CacheFile) Unload() {
// 减少一个引用计数
func (f *CacheFile) Release() {
f.rwLock.Lock()
defer f.rwLock.Unlock()
if !f.isDeleted {
// TODO 日志
f.saveMeta(f.info)
f.refCount--
if f.refCount == 0 {
f.freeTime = time.Now()
}
// 防止在关闭缓存后又保存了文件
close(f.saveMetaChan)
f.saveMetaChan = nil
f.noSaveMeta = true
f.metaFile.Close()
f.dataFile.Close()
}
// 可在Unload状态下调用
func (f *CacheFile) RevisionUploaded(rev int) {
func (f *CacheFile) LevelDown(level CacheLevel) bool {
f.rwLock.Lock()
defer f.rwLock.Unlock()
if f.info.Revision == rev {
f.info.Revision = 0
if level >= f.level {
return true
}
if f.saveMetaChan != nil {
f.letSave()
// 缓存正在被使用时,不能降级
if f.refCount > 0 {
return false
}
switch f.level {
case LevelComplete:
f.dataFile.Close()
f.level = LevelMetaLoaded
if level >= f.level {
break
}
fallthrough
case LevelMetaLoaded:
if !f.isDeleted {
// TODO 日志
f.saveMeta(f.info)
}
// 这里会等待直到saveMeta线程退出
f.stopSaveChan <- nil
f.saveMetaChan = nil
f.stopSaveChan = nil
f.metaFile.Close()
f.level = LevelReadOnly
if level >= f.level {
break
}
fallthrough
case LevelReadOnly:
f.level = LevelNotLoaded
if level >= f.level {
break
}
fallthrough
case LevelNotLoaded:
}
f.changeLevelTime = time.Now()
return true
}
func (f *CacheFile) LevelUp(level CacheLevel) bool {
f.rwLock.Lock()
defer f.rwLock.Unlock()
if level <= f.level {
return true
}
// 缓存正在使用时,可以升级
switch f.level {
case LevelNotLoaded:
f.level = LevelReadOnly
if level <= f.level {
break
}
fallthrough
case LevelReadOnly:
metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
if err != nil {
logger.Warnf("open meta file %v: %v", metaPath, err)
return false
}
f.saveMetaChan = make(chan any, 1)
f.stopSaveChan = make(chan any)
f.metaFile = metaFile
f.level = LevelMetaLoaded
go f.serving(f.saveMetaChan, f.stopSaveChan)
if level <= f.level {
break
}
fallthrough
case LevelMetaLoaded:
dataPath := f.cache.GetCacheDataPath(f.pathComps...)
dataFile, err := os.OpenFile(dataPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logger.Warnf("open data file %v: %v", dataPath, err)
return false
}
f.dataFile = dataFile
f.level = LevelComplete
if level <= f.level {
break
}
fallthrough
case LevelComplete:
}
f.changeLevelTime = time.Now()
return true
}
func (f *CacheFile) RevisionUploaded(dataRev int, metaRev int) {
f.rwLock.Lock()
defer f.rwLock.Unlock()
if dataRev != 0 && f.info.DataRevision == dataRev {
f.info.DataRevision = 0
}
if metaRev != 0 && f.info.MetaRevision == metaRev {
f.info.MetaRevision = 0
}
f.letSave()
}
// 可在Unload状态下调用
func (f *CacheFile) Info() CacheEntryInfo {
return CacheEntryInfo{
PathComps: f.pathComps,
Size: f.info.Size,
Mode: f.info.Perm,
ModTime: f.info.ModTime,
IsDir: false,
}
}
func (f *CacheFile) Revision() int {
f.rwLock.RLock()
defer f.rwLock.RUnlock()
return f.info.Revision
return CacheEntryInfo{
PathComps: f.pathComps,
Size: f.info.Size,
Perm: f.info.Perm,
ModTime: f.info.ModTime,
IsDir: false,
MetaRevision: f.info.MetaRevision,
DataRevision: f.info.DataRevision,
RefCount: f.refCount,
FreeTime: f.freeTime,
Level: f.level,
ChangeLevelTime: f.changeLevelTime,
}
}
// 可在Unload状态下调用
func (f *CacheFile) Delete() {
f.writeLock.Lock()
defer f.writeLock.Unlock()
@ -420,7 +528,7 @@ func (f *CacheFile) Delete() {
os.Remove(metaPath)
os.Remove(dataPath)
// 可能是在被使用状态下删除也可能是在Unload状态下删除所以这里不关闭saveMetaChan而是设置isDeleted为true
// 不可能将isDeleted从true改为false所以这里不需要使用stopSaveChan来等待saveMeta线程退出
f.isDeleted = true
if f.saveMetaChan != nil {
@ -428,7 +536,6 @@ func (f *CacheFile) Delete() {
}
}
// 可在Unload状态下调用
func (f *CacheFile) Move(newPathComps []string) {
f.writeLock.Lock()
defer f.writeLock.Unlock()
@ -444,22 +551,18 @@ func (f *CacheFile) Move(newPathComps []string) {
}
// 打开一个写入句柄,同时支持读取
//
// 不可在Unload状态下调用
func (f *CacheFile) Open(flags uint32) *CacheFileHandle {
logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags)
f.cache.lock.Lock()
f.state.refCount++
f.cache.lock.Unlock()
f.rwLock.Lock()
defer f.rwLock.Unlock()
f.refCount++
h := &CacheFileHandle{
file: f,
remoteLock: &sync.Mutex{},
revision: f.info.Revision,
revision: f.info.DataRevision,
}
if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) {
@ -484,19 +587,17 @@ func (f *CacheFile) Open(flags uint32) *CacheFileHandle {
return h
}
// 打开一个读取句柄用于同步本地文件到远端。由于此方法会在扫描缓存时调用所以refCount增加时不需要加锁
//
// 不可在Unload状态下调用
// 打开一个读取句柄,用于同步本地文件到远端
func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle {
f.rwLock.Lock()
defer f.rwLock.Unlock()
f.state.refCount++
f.refCount++
h := &CacheFileHandle{
file: f,
remoteLock: &sync.Mutex{},
revision: f.info.Revision,
revision: f.info.DataRevision,
readable: true,
}
@ -508,19 +609,18 @@ func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle {
return h
}
// 不可在Unload状态下调用
func (f *CacheFile) SetModTime(modTime time.Time) error {
logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime)
f.rwLock.Lock()
f.info.ModTime = modTime
f.info.MetaRevision++
f.rwLock.Unlock()
f.letSave()
return nil
}
// 不可在Unload状态下调用
func (f *CacheFile) Truncate(size int64) error {
logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size)
@ -546,7 +646,7 @@ func (f *CacheFile) Truncate(size int64) error {
f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
}
if f.info.Size != size {
f.info.Revision++
f.info.DataRevision++
}
f.info.Size = size
@ -554,18 +654,7 @@ func (f *CacheFile) Truncate(size int64) error {
return nil
}
// 减少一个引用计数
func (f *CacheFile) Release() {
f.cache.lock.Lock()
defer f.cache.lock.Unlock()
f.state.refCount--
if f.state.refCount == 0 {
f.state.freeTime = time.Now()
}
}
func (f *CacheFile) serving(saveMetaChan chan any) {
func (f *CacheFile) serving(saveMetaChan chan any, stopSaveChan chan any) {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
@ -576,6 +665,9 @@ func (f *CacheFile) serving(saveMetaChan chan any) {
return
}
case <-stopSaveChan:
return
case <-ticker.C:
}
@ -587,11 +679,6 @@ func (f *CacheFile) serving(saveMetaChan chan any) {
break
}
// 停止保存元数据的线程
if f.noSaveMeta {
f.rwLock.RUnlock()
break
}
f.rwLock.RUnlock()
// TODO 错误日志
@ -771,7 +858,7 @@ func (h *CacheFileHandle) WriteAt(buf []byte, off int64) (int, error) {
h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)})
h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen))
h.file.info.Revision++
h.file.info.DataRevision++
h.file.letSave()
@ -789,16 +876,14 @@ func (f *CacheFileHandle) Close() error {
f.remote.Close()
}
f.file.cache.lock.Lock()
f.file.state.refCount--
if f.file.state.refCount == 0 {
f.file.state.freeTime = time.Now()
}
f.file.cache.lock.Unlock()
f.file.rwLock.Lock()
defer f.file.rwLock.Unlock()
f.file.refCount--
if f.file.refCount == 0 {
f.file.freeTime = time.Now()
}
if f.writeable {
f.file.writers = lo2.Remove(f.file.writers, f)
} else if f.readable {

View File

@ -7,6 +7,6 @@ type CacheStatus struct {
type CacheFileStatus struct {
Path string `json:"path"`
RefCount int `json:"refCount"`
IsLoaded bool `json:"isLoaded"`
Level string `json:"level"`
IsUploading bool `json:"isUploading"`
}

View File

@ -159,16 +159,21 @@ func newDir(vfs *Vfs, ctx context.Context, name string, parent FuseNode) (fuse.F
}
func newFile(vfs *Vfs, ctx context.Context, name string, parent FuseNode, flags uint32) (fuse.FileHandle, uint32, error) {
cache := vfs.cache.CreateFile(lo2.AppendNew(parent.PathComps(), name))
if cache == nil {
ch := vfs.cache.CreateFile(lo2.AppendNew(parent.PathComps(), name))
if ch == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
defer ch.Release()
if !ch.LevelUp(cache.LevelComplete) {
return nil, 0, fuse.ErrIOError
}
// Open之后会给cache的引用计数额外+1即使cache先于FileHandle被关闭
// 也有有FileHandle的计数保持cache的有效性
fileNode := newFileFromCache(cache.Info(), vfs)
hd := cache.Open(flags)
fileNode := newFileFromCache(ch.Info(), vfs)
hd := ch.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

View File

@ -17,7 +17,7 @@ type FuseBucket struct {
vfs *Vfs
bktName string
modTime time.Time
mode os.FileMode
perm os.FileMode
}
func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
@ -25,7 +25,7 @@ func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
vfs: vfs,
bktName: c.PathComps[len(c.PathComps)-1],
modTime: c.ModTime,
mode: c.Mode,
perm: c.Perm,
}
}
@ -41,8 +41,8 @@ func (r *FuseBucket) Size() int64 {
return 0
}
func (r *FuseBucket) Mode() os.FileMode {
return os.ModeDir | r.mode
func (r *FuseBucket) Perm() os.FileMode {
return r.perm
}
func (r *FuseBucket) ModTime() time.Time {

View File

@ -24,7 +24,7 @@ func newDirFromCache(ch cache.CacheEntryInfo, vfs *Vfs) *FuseDir {
vfs: vfs,
pathComps: ch.PathComps,
modTime: ch.ModTime,
mode: ch.Mode,
mode: ch.Perm,
}
}
@ -40,8 +40,8 @@ func (r *FuseDir) Size() int64 {
return 0
}
func (r *FuseDir) Mode() os.FileMode {
return os.ModeDir | r.mode
func (r *FuseDir) Perm() os.FileMode {
return r.mode
}
func (r *FuseDir) ModTime() time.Time {

View File

@ -15,7 +15,7 @@ type FuseFileNode struct {
pathComps []string
size int64
modTime time.Time
mode os.FileMode
perm os.FileMode
}
func newFileFromCache(info cache.CacheEntryInfo, vfs *Vfs) *FuseFileNode {
@ -24,7 +24,7 @@ func newFileFromCache(info cache.CacheEntryInfo, vfs *Vfs) *FuseFileNode {
pathComps: info.PathComps,
size: info.Size,
modTime: info.ModTime,
mode: info.Mode,
perm: info.Perm,
}
}
@ -34,7 +34,7 @@ func newFileFromObject(vfs *Vfs, pathComps []string, obj clitypes.Object) *FuseF
pathComps: pathComps,
size: obj.Size,
modTime: obj.UpdateTime,
mode: os.FileMode(0755), // TODO Object元数据中是没有保存权限的
perm: os.FileMode(0755), // TODO Object元数据中是没有保存权限的
}
}
@ -51,8 +51,8 @@ func (n *FuseFileNode) Size() int64 {
return info.Size
}
func (n *FuseFileNode) Mode() os.FileMode {
return n.mode
func (n *FuseFileNode) Perm() os.FileMode {
return n.perm
}
func (n *FuseFileNode) ModTime() time.Time {
@ -69,34 +69,46 @@ func (n *FuseFileNode) IsDir() bool {
}
func (n *FuseFileNode) Truncate(size uint64) error {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
ch := n.loadCacheFile()
if ch == nil {
return fuse.ErrNotExists
}
defer cacheFile.Release()
defer ch.Release()
return cacheFile.Truncate(int64(size))
if !ch.LevelUp(cache.LevelComplete) {
return fuse.ErrIOError
}
return ch.Truncate(int64(size))
}
func (n *FuseFileNode) SetModTime(time time.Time) error {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
ch := n.loadCacheFile()
if ch == nil {
return fuse.ErrNotExists
}
defer cacheFile.Release()
defer ch.Release()
return cacheFile.SetModTime(time)
if !ch.LevelUp(cache.LevelMetaLoaded) {
return fuse.ErrIOError
}
return ch.SetModTime(time)
}
func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, uint32, error) {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
ch := n.loadCacheFile()
if ch == nil {
// 如果文件不存在,也不进行创建,因为创建不应该调用这个接口
return nil, 0, fuse.ErrNotExists
}
defer cacheFile.Release()
defer ch.Release()
hd := cacheFile.Open(flags)
if !ch.LevelUp(cache.LevelComplete) {
return nil, 0, fuse.ErrIOError
}
hd := ch.Open(flags)
return newFileHandle(n, hd), flags, nil
}
@ -150,11 +162,10 @@ func (hd *FuseFileHandle) Sync() error {
return hd.chd.Sync()
}
func (hd *FuseFileHandle) Flush() error {
return nil
}
func (hd *FuseFileHandle) Close() error {
return hd.chd.Close()
}
func (hd *FuseFileHandle) Release() error {
// TODO 其他清理工作
return nil
}

View File

@ -14,7 +14,7 @@ type FusePackage struct {
bktName string
pkgName string
modTime time.Time
mode os.FileMode
perm os.FileMode
}
func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
@ -24,7 +24,7 @@ func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
bktName: pathComps[0],
pkgName: pathComps[1],
modTime: cache.ModTime,
mode: cache.Mode,
perm: cache.Perm,
}
}
@ -40,8 +40,8 @@ func (r *FusePackage) Size() int64 {
return 0
}
func (r *FusePackage) Mode() os.FileMode {
return os.ModeDir | r.mode
func (r *FusePackage) Perm() os.FileMode {
return r.perm
}
func (r *FusePackage) ModTime() time.Time {

View File

@ -34,8 +34,8 @@ func (r *FuseRoot) Size() int64 {
return 0
}
func (r *FuseRoot) Mode() os.FileMode {
return os.ModeDir | 0755
func (r *FuseRoot) Perm() os.FileMode {
return 0755
}
func (r *FuseRoot) ModTime() time.Time {

View File

@ -32,15 +32,23 @@ type CreateLoadResult struct {
Objects map[string]types.Object
}
func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) error {
uploadTime := time.Now()
func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error {
opt := UploadOption{}
if len(opts) > 0 {
opt = opts[0]
}
if opt.CreateTime.IsZero() {
opt.CreateTime = time.Now()
}
spaceIDs := make([]types.UserSpaceID, 0, len(u.targetSpaces))
ft := ioswitch2.FromTo{}
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec)
for i, space := range u.targetSpaces {
ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shardInfo"))
ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(u.loadRoots[i], pa)))
spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID)
}
@ -64,12 +72,12 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
defer u.lock.Unlock()
// 记录上传结果
fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash
shardInfo := ret["fileHash"].(*ops2.ShardInfoValue)
u.successes = append(u.successes, db.AddObjectEntry{
Path: pa,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
Size: shardInfo.Size,
FileHash: shardInfo.Hash,
CreateTime: opt.CreateTime,
UserSpaceIDs: spaceIDs,
})
return nil

View File

@ -40,8 +40,19 @@ type UpdateResult struct {
Objects map[string]types.Object
}
func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
uploadTime := time.Now()
type UploadOption struct {
CreateTime time.Time // 设置文件的上传时间如果为0值则使用开始上传时的时间。
}
func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOption) error {
opt := UploadOption{}
if len(opts) > 0 {
opt = opts[0]
}
if opt.CreateTime.IsZero() {
opt.CreateTime = time.Now()
}
ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
@ -76,7 +87,7 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
Path: pat,
Size: shardInfo.Size,
FileHash: shardInfo.Hash,
UploadTime: uploadTime,
CreateTime: opt.CreateTime,
UserSpaceIDs: []types.UserSpaceID{w.targetSpace.UserSpace.UserSpaceID},
})
return nil