This commit is contained in:
songjc 2025-07-04 15:29:40 +08:00
commit 70dcc042ab
7 changed files with 23 additions and 470 deletions

View File

@ -1,60 +0,0 @@
package cmdline
/*
import (
"fmt"
"time"
"github.com/jedib0t/go-pretty/v6/table"
cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types"
)
func BucketListUserBuckets(ctx CommandContext) error {
buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets()
if err != nil {
return err
}
fmt.Printf("Find %d buckets for user %d:\n", len(buckets))
tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name"})
for _, bucket := range buckets {
tb.AppendRow(table.Row{bucket.BucketID, bucket.Name})
}
fmt.Print(tb.Render())
return nil
}
func BucketCreateBucket(ctx CommandContext, bucketName string) error {
bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(bucketName, time.Now())
if err != nil {
return err
}
fmt.Printf("Create bucket %v success, id: %v", bucketName, bucketID)
return nil
}
func BucketDeleteBucket(ctx CommandContext, bucketID cdssdk.BucketID) error {
err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(bucketID)
if err != nil {
return err
}
fmt.Printf("Delete bucket %d success ", bucketID)
return nil
}
func init() {
commands.MustAdd(BucketListUserBuckets, "bucket", "ls")
commands.MustAdd(BucketCreateBucket, "bucket", "new")
commands.MustAdd(BucketDeleteBucket, "bucket", "delete")
}
*/

View File

@ -1,47 +0,0 @@
package cmdline
/*
import (
"fmt"
"time"
cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types"
)
func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
startTime := time.Now()
defer func() {
fmt.Printf("%v\n", time.Since(startTime).Seconds())
}()
hubID, taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, stgID)
if err != nil {
return fmt.Errorf("start cache moving package: %w", err)
}
for {
complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(hubID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}
return nil
}
if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}
func CacheRemovePackage(ctx CommandContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
return ctx.Cmdline.Svc.CacheSvc().CacheRemovePackage(packageID, stgID)
}
func init() {
commands.Add(CacheMovePackage, "cache", "move")
commands.Add(CacheRemovePackage, "cache", "remove")
}
*/

View File

@ -1,88 +0,0 @@
package cmdline
/*
import (
"fmt"
"strings"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider"
)
// DistLockLock 尝试获取分布式锁。
// ctx: 命令上下文,包含执行命令所需的服务和配置。
// lockData: 锁数据数组,每个元素包含锁的路径、名称和目标。
// 返回值: 获取锁失败时返回错误。
func DistLockLock(ctx CommandContext, lockData []string) error {
req := distlock.LockRequest{}
// 解析锁数据,填充请求结构体。
for _, lock := range lockData {
l, err := parseOneLock(lock)
if err != nil {
return fmt.Errorf("parse lock data %s failed, err: %w", lock, err)
}
req.Locks = append(req.Locks, l)
}
// 请求分布式锁。
reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
fmt.Printf("%s\n", reqID)
return nil
}
// parseOneLock 解析单个锁数据。
// lockData: 待解析的锁数据,格式为"路径/名称@目标字符串"。
// 返回值: 解析得到的锁对象和可能的错误。
func parseOneLock(lockData string) (distlock.Lock, error) {
var lock distlock.Lock
// 解析锁的路径、名称和目标。
fullPathAndTarget := strings.Split(lockData, "@")
if len(fullPathAndTarget) != 2 {
return lock, fmt.Errorf("lock data must contains lock path, name and target")
}
pathAndName := strings.Split(fullPathAndTarget[0], "/")
if len(pathAndName) < 2 {
return lock, fmt.Errorf("lock data must contains lock path, name and target")
}
lock.Path = pathAndName[0 : len(pathAndName)-1]
lock.Name = pathAndName[len(pathAndName)-1]
// 解析目标字符串。
target := lockprovider.NewStringLockTarget()
comps := strings.Split(fullPathAndTarget[1], "/")
for _, comp := range comps {
target.Add(lo.Map(strings.Split(comp, "."), func(str string, index int) any { return str })...)
}
lock.Target = *target
return lock, nil
}
// DistLockUnlock 释放分布式锁。
// ctx: 命令上下文。
// reqID: 请求ID对应获取锁时返回的ID。
// 返回值: 释放锁失败时返回错误。
func DistLockUnlock(ctx CommandContext, reqID string) error {
ctx.Cmdline.Svc.DistLock.Release(reqID)
return nil
}
// 初始化命令行工具,注册分布式锁相关命令。
func init() {
commands.MustAdd(DistLockLock, "distlock", "lock")
commands.MustAdd(DistLockUnlock, "distlock", "unlock")
}
*/

View File

@ -1,69 +0,0 @@
package cmdline
/*
import (
"fmt"
"os"
"path/filepath"
"time"
cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types"
)
// 必须添加的命令函数,用于处理对象上传。
//
// ctx: 命令上下文,提供必要的服务和环境配置。
// packageID: 上传套餐的唯一标识。
// rootPath: 本地文件系统中待上传文件的根目录。
// storageAffinity: 偏好的节点ID列表上传任务可能会分配到这些节点上。
// 返回值: 执行过程中遇到的任何错误。
var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, spaceAffinity []cdssdk.UserSpaceID) error {
// 记录函数开始时间,用于计算执行时间。
startTime := time.Now()
defer func() {
// 打印函数执行时间。
fmt.Printf("%v\n", time.Since(startTime).Seconds())
}()
// 根据节点亲和性列表设置首选上传节点。
var storageAff cdssdk.UserSpaceID
if len(spaceAffinity) > 0 {
storageAff = spaceAffinity[0]
}
up, err := ctx.Cmdline.Svc.Uploader.BeginUpdate(packageID, storageAff, nil, nil)
if err != nil {
return fmt.Errorf("begin updating package: %w", err)
}
defer up.Abort()
err = filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}
if fi.IsDir() {
return nil
}
file, err := os.Open(fname)
if err != nil {
return err
}
defer file.Close()
return up.Upload(fname, file)
})
if err != nil {
return err
}
_, err = up.Commit()
if err != nil {
return fmt.Errorf("commit updating package: %w", err)
}
return nil
}, "obj", "upload")
*/

View File

@ -1,190 +0,0 @@
package cmdline
/*
import (
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/jedib0t/go-pretty/v6/table"
cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator"
)
// PackageListBucketPackages 列出指定存储桶中的所有包裹。
//
// 参数:
//
// ctx - 命令上下文。
// bucketID - 存储桶ID。
//
// 返回值:
//
// error - 操作过程中发生的任何错误。
func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error {
packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(bucketID)
if err != nil {
return err
}
fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID)
tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "BucketID"})
for _, obj := range packages {
tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID})
}
fmt.Println(tb.Render())
return nil
}
// PackageDownloadPackage 下载指定包裹的所有文件到本地目录。
//
// 参数:
//
// ctx - 命令上下文。
// packageID - 包裹ID。
// outputDir - 输出目录路径。
//
// 返回值:
//
// error - 操作过程中发生的任何错误。
func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error {
startTime := time.Now()
defer func() {
fmt.Printf("%v\n", time.Since(startTime).Seconds())
}()
err := os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err)
}
// 初始化文件下载迭代器
objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(packageID)
if err != nil {
return fmt.Errorf("download object failed, err: %w", err)
}
defer objIter.Close()
madeDirs := make(map[string]bool)
for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
return err
}
err = func() error {
defer objInfo.File.Close()
fullPath := filepath.Join(outputDir, objInfo.Object.Path)
dirPath := filepath.Dir(fullPath)
if !madeDirs[dirPath] {
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}
madeDirs[dirPath] = true
}
outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()
_, err = io.Copy(outputFile, objInfo.File)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}
return nil
}()
if err != nil {
return err
}
}
return nil
}
// PackageCreatePackage 在指定存储桶中创建新包裹。
//
// 参数:
//
// ctx - 命令上下文。
// bucketID - 存储桶ID。
// name - 包裹名称。
//
// 返回值:
//
// error - 操作过程中发生的任何错误。
func PackageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string) error {
pkgID, err := ctx.Cmdline.Svc.PackageSvc().Create(bucketID, name)
if err != nil {
return err
}
fmt.Printf("%v\n", pkgID)
return nil
}
// PackageDeletePackage 删除指定的包裹。
//
// 参数:
//
// ctx - 命令上下文。
// packageID - 包裹ID。
//
// 返回值:
//
// error - 操作过程中发生的任何错误。
func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error {
err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(packageID)
if err != nil {
return fmt.Errorf("delete package %d failed, err: %w", packageID, err)
}
return nil
}
// PackageGetCachedStorages 获取指定包裹的缓存节点信息。
//
// 参数:
//
// ctx - 命令上下文。
// packageID - 包裹ID。
//
// 返回值:
//
// error - 操作过程中发生的任何错误。
func PackageGetCachedStorages(ctx CommandContext, packageID cdssdk.PackageID) error {
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedStorages(packageID)
fmt.Printf("resp: %v\n", resp)
if err != nil {
return fmt.Errorf("get package %d cached storages failed, err: %w", packageID, err)
}
return nil
}
// 初始化命令行工具的包相关命令。
func init() {
commands.MustAdd(PackageListBucketPackages, "pkg", "ls")
commands.MustAdd(PackageDownloadPackage, "pkg", "get")
commands.MustAdd(PackageCreatePackage, "pkg", "new")
commands.MustAdd(PackageDeletePackage, "pkg", "delete")
// 查询package缓存到哪些节点
commands.MustAdd(PackageGetCachedStorages, "pkg", "cached")
}
*/

View File

@ -16,7 +16,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/mount"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/repl"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/services"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock"
@ -179,8 +178,8 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
defer spaceSync.Stop()
// 交互式命令行
rep := repl.New(db, tktk)
replCh := rep.Start()
// rep := repl.New(db, tktk)
// replCh := rep.Start()
// 挂载
mntCfg := config.Cfg().Mount
@ -224,7 +223,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
conColEvt := conColChan.Receive()
acStatEvt := acStatChan.Receive()
spaceSyncEvt := spaceSyncChan.Receive()
replEvt := replCh.Receive()
// replEvt := replCh.Receive()
httpEvt := httpChan.Receive()
mntEvt := mntChan.Receive()
@ -309,18 +308,18 @@ loop:
}
spaceSyncEvt = spaceSyncChan.Receive()
case e := <-replEvt.Chan():
if e.Err != nil {
logger.Errorf("receive repl event: %v", err)
break loop
}
// case e := <-replEvt.Chan():
// if e.Err != nil {
// logger.Errorf("receive repl event: %v", err)
// break loop
// }
switch e.Value.(type) {
case repl.ExitEvent:
logger.Info("exit by repl")
break loop
}
replEvt = replCh.Receive()
// switch e.Value.(type) {
// case repl.ExitEvent:
// logger.Info("exit by repl")
// break loop
// }
// replEvt = replCh.Receive()
case e := <-httpEvt.Chan():
if e.Err != nil {
@ -349,4 +348,7 @@ loop:
mntEvt = mntChan.Receive()
}
}
// TODO 优雅退出
os.Exit(0)
}

View File

@ -640,7 +640,7 @@ func (c *Cache) fastScan(lastScanPath []string) []string {
info := ch.Info()
if info.RefCount > 0 {
logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
// logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
return trie.VisitContinue
}
@ -799,6 +799,11 @@ func (c *Cache) scanningData() {
continue
}
// 按Readdir函数的说法不会存在len(e) == 0且err == nil的情况但实际发生了
if len(e) == 0 {
continue
}
if e[0].IsDir() {
child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name()))
if err != nil {