diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index bfef944..e5c1043 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -98,13 +98,8 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { hubMeta := metaCacheHost.AddHubMeta() conMeta := metaCacheHost.AddConnectivity() - // 分布式锁 - distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) - if err != nil { - logger.Warnf("new distlock service failed, err: %s", err.Error()) - os.Exit(1) - } - go serveDistLock(distlockSvc) + // 公共锁 + publock := distlock.NewService() // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ @@ -124,10 +119,10 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) // 定时任务 - tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub) + tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub, publock) tktk.Start() defer tktk.Stop() @@ -148,7 +143,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(distlockSvc, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) // HTTP接口 httpCfg := config.Cfg().HTTP @@ -250,18 +245,3 @@ loop: } } } - -func serveDistLock(svc *distlock.Service) { - logger.Info("start serving distlock") - - err := svc.Serve() - - if err != nil { - logger.Errorf("distlock stopped with error: %s", err.Error()) - } - - logger.Info("distlock stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 7673d2a..8e739a5 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -91,13 +91,8 @@ func vfsTest(configPath string, opts serveHTTPOptions) { hubMeta := metaCacheHost.AddHubMeta() conMeta := metaCacheHost.AddConnectivity() - // 分布式锁 - distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) - if err != nil { - logger.Warnf("new distlock service failed, err: %s", err.Error()) - os.Exit(1) - } - go serveDistLock(distlockSvc) + // 公共锁 + publock := distlock.NewService() // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ @@ -117,7 +112,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) // 挂载 mntCfg := config.Cfg().Mount @@ -132,7 +127,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(distlockSvc, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) // HTTP接口 httpCfg := config.Cfg().HTTP diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 8d4c95c..c2c519a 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -1,7 +1,6 @@ package config import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/utils/config" @@ -22,7 +21,6 @@ type Config struct { Logger logger.Config `json:"logger"` DB db.Config `json:"db"` RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` DownloadStrategy strategy.Config `json:"downloadStrategy"` diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index fa12bce..a4f30d3 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -29,7 +29,7 @@ type downloadSpaceInfo struct { } type DownloadContext struct { - Distlock *distlock.Service + PubLock *distlock.Service } type DownloadObjectIterator struct { OnClosing func() diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 75e2ddb..164f9e8 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -1,7 +1,6 @@ package services import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" @@ -9,12 +8,13 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" ) // Service 结构体封装了分布锁服务和任务管理服务。 type Service struct { - DistLock *distlock.Service + PubLock *distlock.Service Downloader *downloader.Downloader AccessStat *accessstat.AccessStat Uploader *uploader.Uploader @@ -26,7 +26,7 @@ type Service struct { } func NewService( - distlock *distlock.Service, + publock *distlock.Service, downloader *downloader.Downloader, accStat *accessstat.AccessStat, uploder *uploader.Uploader, @@ -37,7 +37,7 @@ func NewService( mount *mount.Mount, ) *Service { return &Service{ - DistLock: distlock, + PubLock: publock, Downloader: downloader, AccessStat: accStat, Uploader: uploder, diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index f90ce03..9c1eb72 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -15,6 +15,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" @@ -95,17 +96,13 @@ func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspace } } - // TODO2 加锁 - // mutex, err := reqbuilder.NewBuilder(). - // // 保护在userspace目录中下载的文件 - // UserSpace().Buzy(userspaceID). - // // 保护下载文件时同时保存到IPFS的文件 - // Shard().Buzy(userspaceID). - // MutexLock(svc.DistLock) - // if err != nil { - // return fmt.Errorf("acquire locks failed, err: %w", err) - // } - // defer mutex.Unlock() + mutex, err := reqbuilder.NewBuilder(). + Shard().Buzy(userspaceID). + MutexLock(svc.PubLock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() // 记录访问统计 for _, obj := range details { diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index ee82f7f..a10f895 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" ) const ( @@ -119,37 +120,34 @@ func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg clitypes. } lastObjID = objs[len(objs)-1].Object.ObjectID + reen := ctx.ticktock.pubLock.BeginReentrant() + var allUpdatings []db.UpdatingObjectRedundancy var allSysEvts []datamap.SysEventBody ctx.mostBlockStgIDs = j.summaryRepObjectBlockUserSpaces(ctx, objs, 2) - // // TODO 加锁 - // builder := reqbuilder.NewBuilder() - // for _, storage := range newRepStgs { - // builder.Shard().Buzy(storage.Storage.Storage.StorageID) - // } - // for _, storage := range newECStgs { - // builder.Shard().Buzy(storage.Storage.Storage.StorageID) - // } - // mutex, err := builder.MutexLock(execCtx.Args.DistLock) - // if err != nil { - // log.Warnf("acquiring dist lock: %s", err.Error()) - // return - // } - // defer mutex.Unlock() - var willShrinks []clitypes.ObjectDetail for _, obj := range objs { - newRed, selectedStorages := j.chooseRedundancy(ctx, obj) + newRed, selectedSpaces := j.chooseRedundancy(ctx, obj) // 冗余策略不需要调整,就检查是否需要收缩 if newRed == nil { willShrinks = append(willShrinks, obj) continue } - updating, evt, err := j.doChangeRedundancy(ctx, obj, newRed, selectedStorages) + reqBlder := reqbuilder.NewBuilder() + for _, space := range selectedSpaces { + reqBlder.Shard().Buzy(space.UserSpace.UserSpace.UserSpaceID) + } + err := reen.Lock(reqBlder.Build()) + if err != nil { + log.WithField("ObjectID", obj.Object.ObjectID).Warnf("acquire lock: %s", err.Error()) + continue + } + + updating, evt, err := j.doChangeRedundancy(ctx, obj, newRed, selectedSpaces) if updating != nil { allUpdatings = append(allUpdatings, *updating) } @@ -158,24 +156,27 @@ func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg clitypes. } if err != nil { log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error()) + continue } } - udpatings, sysEvts, err := j.doRedundancyShrink(ctx, pkg, willShrinks) + udpatings, sysEvts, err := j.doRedundancyShrink(ctx, pkg, willShrinks, reen) if err != nil { log.Warnf("redundancy shrink: %s", err.Error()) - return err + } else { + allUpdatings = append(allUpdatings, udpatings...) + allSysEvts = append(allSysEvts, sysEvts...) } - allUpdatings = append(allUpdatings, udpatings...) - allSysEvts = append(allSysEvts, sysEvts...) if len(allUpdatings) > 0 { err := db.DoTx10(db2, db2.Object().BatchUpdateRedundancy, allUpdatings) if err != nil { + reen.Unlock() log.Warnf("update object redundancy: %s", err.Error()) return err } } + reen.Unlock() for _, e := range allSysEvts { ctx.ticktock.evtPub.Publish(e) diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index 68908e1..ce49d07 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -18,12 +18,14 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/consts" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" ) -func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, pkg clitypes.PackageDetail, objs []clitypes.ObjectDetail) ([]db.UpdatingObjectRedundancy, []datamap.SysEventBody, error) { +func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, pkg clitypes.PackageDetail, objs []clitypes.ObjectDetail, reen *distlock.Reentrant) ([]db.UpdatingObjectRedundancy, []datamap.SysEventBody, error) { log := logger.WithType[ChangeRedundancy]("TickTock") var readerStgIDs []clitypes.UserSpaceID @@ -78,7 +80,7 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, sysEvents = append(sysEvents, t.generateSysEventForECObject(solu, obj)...) } - ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs) + ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs, reen) if err != nil { log.Warn(err.Error()) return nil, nil, fmt.Errorf("execute plans: %w", err) @@ -904,17 +906,15 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o return []datamap.SysEventBody{transEvt, distEvt} } -func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningStgIDs map[clitypes.UserSpaceID]bool) (map[string]exec.VarValue, error) { - // TODO 统一加锁,有重复也没关系 - // lockBld := reqbuilder.NewBuilder() - // for id := range planningStgIDs { - // lockBld.Shard().Buzy(id) - // } - // lock, err := lockBld.MutexLock(ctx.Args.DistLock) - // if err != nil { - // return nil, fmt.Errorf("acquiring distlock: %w", err) - // } - // defer lock.Unlock() +func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *distlock.Reentrant) (map[string]exec.VarValue, error) { + reqBlder := reqbuilder.NewBuilder() + for id, _ := range planningSpaceIDs { + reqBlder.Shard().Buzy(id) + } + err := reen.Lock(reqBlder.Build()) + if err != nil { + return nil, fmt.Errorf("locking shard resources: %w", err) + } wg := sync.WaitGroup{} diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index be2f03b..9bea1c6 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" ) @@ -30,18 +31,6 @@ func (j *ShardStoreGC) Execute(t *TickTock) { log.Debugf("job end, time: %v", time.Since(startTime)) }() - // TODO 加锁 - // // 使用分布式锁进行资源锁定 - // mutex, err := reqbuilder.NewBuilder(). - // // 执行IPFS垃圾回收 - // Shard().GC(j.StorageID). - // MutexLock(execCtx.Args.DistLock) - // if err != nil { - // log.Warnf("acquire locks failed, err: %s", err.Error()) - // return - // } - // defer mutex.Unlock() - spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) if err != nil { log.Warnf("getting user space ids: %v", err) @@ -63,11 +52,17 @@ func (j *ShardStoreGC) Execute(t *TickTock) { } func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { + mutex, err := reqbuilder.NewBuilder().Shard().GC(space.UserSpace.UserSpaceID).MutexLock(t.pubLock) + if err != nil { + return fmt.Errorf("acquire lock: %w", err) + } + defer mutex.Unlock() + db2 := t.db // 收集需要进行垃圾回收的文件哈希值 var allFileHashes []types.FileHash - err := db2.DoTx(func(tx db.SQLContext) error { + err = db2.DoTx(func(tx db.SQLContext) error { blocks, err := db2.ObjectBlock().GetByUserSpaceID(tx, space.UserSpace.UserSpaceID) if err != nil { return fmt.Errorf("getting object blocks by hub id: %w", err) diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index 46db68d..7d7d61d 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" ) @@ -29,9 +30,10 @@ type TickTock struct { spaceMeta *metacache.UserSpaceMeta stgPool *pool.Pool evtPub *sysevent.Publisher + pubLock *distlock.Service } -func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher) *TickTock { +func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *distlock.Service) *TickTock { sch, _ := gocron.NewScheduler() t := &TickTock{ cfg: cfg, @@ -41,6 +43,7 @@ func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *poo spaceMeta: spaceMeta, stgPool: stgPool, evtPub: evtPub, + pubLock: pubLock, } t.initJobs() return t diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index fad0e1e..05f1cb6 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" @@ -21,10 +22,10 @@ type CreateLoadUploader struct { targetSpaces []types.UserSpaceDetail loadRoots []string uploader *Uploader - // distlock *distlock.Mutex - successes []db.AddObjectEntry - lock sync.Mutex - commited bool + pubLock *distlock.Mutex + successes []db.AddObjectEntry + lock sync.Mutex + commited bool } type CreateLoadResult struct { @@ -92,7 +93,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { } u.commited = true - // defer u.distlock.Unlock() + defer u.pubLock.Unlock() var addedObjs []types.Object err := u.uploader.db.DoTx(func(tx db.SQLContext) error { @@ -125,7 +126,8 @@ func (u *CreateLoadUploader) Abort() { } u.commited = true - // u.distlock.Unlock() + u.pubLock.Unlock() - // TODO 可以考虑删除PackageID + db2 := u.uploader.db + db.DoTx10(db2, db2.Package().DeleteComplete, u.pkg.PackageID) } diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index 1d30bd4..bf9a857 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -12,16 +12,17 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" ) type UpdateUploader struct { - uploader *Uploader - pkgID types.PackageID - targetSpace types.UserSpaceDetail - // distMutex *distlock.Mutex + uploader *Uploader + pkgID types.PackageID + targetSpace types.UserSpaceDetail + pubLock *distlock.Mutex loadToSpaces []types.UserSpaceDetail loadToPath []string successes []db.AddObjectEntry @@ -125,7 +126,7 @@ func (w *UpdateUploader) Commit() (UpdateResult, error) { } w.commited = true - // defer w.distMutex.Unlock() + defer w.pubLock.Unlock() var addedObjs []types.Object err := w.uploader.db.DoTx(func(tx db.SQLContext) error { @@ -157,5 +158,5 @@ func (w *UpdateUploader) Abort() { } w.commited = true - // w.distMutex.Unlock() + w.pubLock.Unlock() } diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index dc98b04..569428b 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -18,6 +18,7 @@ import ( stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" @@ -25,16 +26,16 @@ import ( ) type Uploader struct { - distlock *distlock.Service + pubLock *distlock.Service connectivity *connectivity.Collector stgPool *pool.Pool spaceMeta *metacache.UserSpaceMeta db *db.DB } -func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { +func NewUploader(pubLock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { return &Uploader{ - distlock: distlock, + pubLock: pubLock, connectivity: connectivity, stgPool: stgPool, spaceMeta: spaceMeta, @@ -93,20 +94,17 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS target := u.chooseUploadStorage(uploadSpaces, affinity) - // TODO2 加锁 - // 给上传节点的IPFS加锁 - // TODO 考虑加Object的Create锁 // 防止上传的副本被清除 - // distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.Storage.StorageID).MutexLock(u.distlock) - // if err != nil { - // return nil, fmt.Errorf("acquire distlock: %w", err) - // } + pubLock, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + if err != nil { + return nil, fmt.Errorf("acquire lock: %w", err) + } return &UpdateUploader{ - uploader: u, - pkgID: pkgID, - targetSpace: target.Space, - // distMutex: distMutex, + uploader: u, + pkgID: pkgID, + targetSpace: target.Space, + pubLock: pubLock, loadToSpaces: loadToSpaces, loadToPath: loadToPath, }, nil @@ -158,23 +156,21 @@ func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, load return nil, fmt.Errorf("create package: %w", err) } - // TODO2 加锁 - // reqBld := reqbuilder.NewBuilder() - // for _, stg := range spacesStgs { - // reqBld.Shard().Buzy(stg.Storage.StorageID) - // reqBld.Storage().Buzy(stg.Storage.StorageID) - // } - // lock, err := reqBld.MutexLock(u.distlock) - // if err != nil { - // return nil, fmt.Errorf("acquire distlock: %w", err) - // } + reqBld := reqbuilder.NewBuilder() + for _, stg := range spacesStgs { + reqBld.Shard().Buzy(stg.UserSpace.UserSpaceID) + } + lock, err := reqBld.MutexLock(u.pubLock) + if err != nil { + return nil, fmt.Errorf("acquire lock: %w", err) + } return &CreateLoadUploader{ pkg: pkg, targetSpaces: spacesStgs, loadRoots: loadToPath, uploader: u, - // distlock: lock, + pubLock: lock, }, nil } @@ -236,12 +232,11 @@ func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Read space = u.chooseUploadStorage(userStgs, 0).Space } - // TODO2 加锁 - // lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.Storage.StorageID).MutexLock(u.distlock) - // if err != nil { - // return fmt.Errorf("acquire distlock: %w", err) - // } - // defer lock.Unlock() + lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + if err != nil { + return fmt.Errorf("acquire lock: %w", err) + } + defer lock.Unlock() ft := ioswitch2.NewFromTo() fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) diff --git a/common/README.md b/common/README.md index 5459cf5..2dab922 100644 --- a/common/README.md +++ b/common/README.md @@ -12,7 +12,6 @@ - `pkgs`:一些相对独立的功能模块。 - `cmd`:公用的业务逻辑,比如上传Package和下载Package。 - `db`:数据库的数据结构和操作函数。 - - `distlock`:分布式锁服务,核心机制使用的是`common/pkgs/distlock`,增加了根据存储系统的业务需求设计的锁。 - `ec`:纠删码的库。 - `grpc`:存放proto文件,以及使用protogen工具生成的代码文件。 - `ioswitch`:IOSwitch模块。 diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 7bb8027..3915e04 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -28,14 +28,6 @@ "retryInterval": 5000 } }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a client" - }, "connectivity": { "testInterval": 300 }, diff --git a/common/assets/confs/hub.config.json b/common/assets/confs/hub.config.json index 5d002a9..0fa8b4f 100644 --- a/common/assets/confs/hub.config.json +++ b/common/assets/confs/hub.config.json @@ -25,14 +25,6 @@ "retryInterval": 5000 } }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a hub" - }, "connectivity": { "testInterval": 300 } diff --git a/common/assets/confs/scanner.config.json b/common/assets/confs/scanner.config.json deleted file mode 100644 index e8c8531..0000000 --- a/common/assets/confs/scanner.config.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "accessStatHistoryAmount": 0.8, - "ecFileSizeThreshold": 104857600, - "hubUnavailableSeconds": 300, - "logger": { - "output": "file", - "outputFileName": "scanner", - "outputDirectory": "log", - "level": "debug" - }, - "db": { - "address": "127.0.0.1:3306", - "account": "", - "password": "", - "databaseName": "cloudream" - }, - "rabbitMQ": { - "address": "127.0.0.1:5672", - "account": "", - "password": "", - "vhost": "/", - "param": { - "retryNum": 5, - "retryInterval": 5000 - } - }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a scanner" - } -} \ No newline at end of file diff --git a/common/pkgs/distlock/lockprovider/empty_target.go b/common/pkgs/distlock/lockprovider/empty_target.go new file mode 100644 index 0000000..cb53571 --- /dev/null +++ b/common/pkgs/distlock/lockprovider/empty_target.go @@ -0,0 +1,14 @@ +package lockprovider + +import "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" + +type EmptyTarget struct{} + +func NewEmptyTarget() *EmptyTarget { + return &EmptyTarget{} +} + +func (e *EmptyTarget) Equals(other types.LockTarget) bool { + _, ok := other.(*EmptyTarget) + return ok +} diff --git a/common/pkgs/distlock/lockprovider/lock_compatibility_table.go b/common/pkgs/distlock/lockprovider/lock_compatibility_table.go index 951e708..7c1730a 100644 --- a/common/pkgs/distlock/lockprovider/lock_compatibility_table.go +++ b/common/pkgs/distlock/lockprovider/lock_compatibility_table.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) const ( @@ -16,7 +16,7 @@ const ( type HasSuchLockFn = func() bool // LockCompatibilitySpecialFn 判断锁与指定的锁名是否兼容 -type LockCompatibilitySpecialFn func(lock distlock.Lock, testLockName string) bool +type LockCompatibilitySpecialFn func(lock types.Lock, testLockName string) bool type LockCompatibilityType string @@ -95,7 +95,7 @@ func (t *LockCompatibilityTable) Row(comps ...LockCompatibility) error { return nil } -func (t *LockCompatibilityTable) Test(lock distlock.Lock) error { +func (t *LockCompatibilityTable) Test(lock types.Lock) error { row, ok := lo.Find(t.rows, func(row LockCompatibilityTableRow) bool { return lock.Name == row.LockName }) if !ok { return fmt.Errorf("unknow lock name %s", lock.Name) @@ -108,13 +108,13 @@ func (t *LockCompatibilityTable) Test(lock distlock.Lock) error { if c.Type == LOCK_COMPATIBILITY_UNCOMPATIBLE { if t.rows[i].HasSuchLockFn() { - return distlock.NewLockTargetBusyError(t.rows[i].LockName) + return types.NewLockTargetBusyError(t.rows[i].LockName) } } if c.Type == LOCK_COMPATIBILITY_SPECIAL { if !c.SpecialFn(lock, t.rows[i].LockName) { - return distlock.NewLockTargetBusyError(t.rows[i].LockName) + return types.NewLockTargetBusyError(t.rows[i].LockName) } } } diff --git a/common/pkgs/distlock/lockprovider/lock_compatibility_table_test.go b/common/pkgs/distlock/lockprovider/lock_compatibility_table_test.go index 052be72..05993ad 100644 --- a/common/pkgs/distlock/lockprovider/lock_compatibility_table_test.go +++ b/common/pkgs/distlock/lockprovider/lock_compatibility_table_test.go @@ -4,7 +4,7 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) func Test_LockCompatibilityTable(t *testing.T) { @@ -18,22 +18,22 @@ func Test_LockCompatibilityTable(t *testing.T) { comp := LockCompatible() uncp := LockUncompatible() - spcl := LockSpecial(func(lock distlock.Lock, testLockName string) bool { return true }) + spcl := LockSpecial(func(lock types.Lock, testLockName string) bool { return true }) table.Row(comp, comp, comp) table.Row(comp, uncp, comp) table.Row(comp, comp, spcl) - err := table.Test(distlock.Lock{ + err := table.Test(types.Lock{ Name: "l1", }) So(err, ShouldBeNil) - err = table.Test(distlock.Lock{ + err = table.Test(types.Lock{ Name: "l2", }) So(err, ShouldNotBeNil) - err = table.Test(distlock.Lock{ + err = table.Test(types.Lock{ Name: "l3", }) So(err, ShouldBeNil) diff --git a/common/pkgs/distlock/lockprovider/metadata_lock.go b/common/pkgs/distlock/lockprovider/metadata_lock.go deleted file mode 100644 index cdb6f16..0000000 --- a/common/pkgs/distlock/lockprovider/metadata_lock.go +++ /dev/null @@ -1,122 +0,0 @@ -package lockprovider - -import ( - "fmt" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/utils/lo2" -) - -const ( - MetadataLockPathPrefix = "Metadata" - MetadataCreateLock = "Create" -) - -type metadataElementLock struct { - target StringLockTarget - requestIDs []string -} - -type MetadataLock struct { - createReqIDs []*metadataElementLock - - lockCompatibilityTable LockCompatibilityTable -} - -func NewMetadataLock() *MetadataLock { - - metadataLock := MetadataLock{ - lockCompatibilityTable: LockCompatibilityTable{}, - } - - compTable := &metadataLock.lockCompatibilityTable - - compTable. - Column(MetadataCreateLock, func() bool { return len(metadataLock.createReqIDs) > 0 }) - trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool { - strTar := lock.Target.(StringLockTarget) - return lo.NoneBy(metadataLock.createReqIDs, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) }) - }) - - compTable.MustRow(trgt) - - return &metadataLock -} - -// CanLock 判断这个锁能否锁定成功 -func (l *MetadataLock) CanLock(lock distlock.Lock) error { - return l.lockCompatibilityTable.Test(lock) -} - -// 锁定 -func (l *MetadataLock) Lock(reqID string, lock distlock.Lock) error { - switch lock.Name { - case MetadataCreateLock: - l.createReqIDs = l.addElementLock(lock, l.createReqIDs, reqID) - - default: - return fmt.Errorf("unknow lock name: %s", lock.Name) - } - - return nil -} - -func (l *MetadataLock) addElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock { - strTarget := lock.Target.(StringLockTarget) - lck, ok := lo.Find(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) }) - if !ok { - lck = &metadataElementLock{ - target: strTarget, - } - locks = append(locks, lck) - } - - lck.requestIDs = append(lck.requestIDs, reqID) - return locks -} - -// 解锁 -func (l *MetadataLock) Unlock(reqID string, lock distlock.Lock) error { - switch lock.Name { - case MetadataCreateLock: - l.createReqIDs = l.removeElementLock(lock, l.createReqIDs, reqID) - - default: - return fmt.Errorf("unknow lock name: %s", lock.Name) - } - - return nil -} - -func (l *MetadataLock) removeElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock { - strTarget := lock.Target.(StringLockTarget) - lck, index, ok := lo.FindIndexOf(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) }) - if !ok { - return locks - } - - lck.requestIDs = lo2.Remove(lck.requestIDs, reqID) - - if len(lck.requestIDs) == 0 { - locks = lo2.RemoveAt(locks, index) - } - - return locks -} - -// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD -func (l *MetadataLock) GetTargetString(target any) (string, error) { - tar := target.(StringLockTarget) - return StringLockTargetToString(&tar) -} - -// ParseTargetString 解析字符串格式的锁对象数据 -func (l *MetadataLock) ParseTargetString(targetStr string) (any, error) { - return StringLockTargetFromString(targetStr) -} - -// Clear 清除内部所有状态 -func (l *MetadataLock) Clear() { - l.createReqIDs = nil -} diff --git a/common/pkgs/distlock/lockprovider/shard_store.go b/common/pkgs/distlock/lockprovider/shard_store.go index f5aaaa1..740d05c 100644 --- a/common/pkgs/distlock/lockprovider/shard_store.go +++ b/common/pkgs/distlock/lockprovider/shard_store.go @@ -3,8 +3,8 @@ package lockprovider import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) const ( @@ -27,7 +27,7 @@ func NewShardStoreLock() *ShardStoreLock { } // CanLock 判断这个锁能否锁定成功 -func (l *ShardStoreLock) CanLock(lock distlock.Lock) error { +func (l *ShardStoreLock) CanLock(lock types.Lock) error { nodeLock, ok := l.stgLocks[lock.Path[ShardStoreStorageIDPathIndex]] if !ok { // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 @@ -39,7 +39,7 @@ func (l *ShardStoreLock) CanLock(lock distlock.Lock) error { } // 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 -func (l *ShardStoreLock) Lock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreLock) Lock(reqID types.RequestID, lock types.Lock) error { stgID := lock.Path[ShardStoreStorageIDPathIndex] nodeLock, ok := l.stgLocks[stgID] @@ -52,7 +52,7 @@ func (l *ShardStoreLock) Lock(reqID string, lock distlock.Lock) error { } // 解锁 -func (l *ShardStoreLock) Unlock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreLock) Unlock(reqID types.RequestID, lock types.Lock) error { stgID := lock.Path[ShardStoreStorageIDPathIndex] nodeLock, ok := l.stgLocks[stgID] @@ -63,25 +63,14 @@ func (l *ShardStoreLock) Unlock(reqID string, lock distlock.Lock) error { return nodeLock.Unlock(reqID, lock) } -// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD -func (l *ShardStoreLock) GetTargetString(target any) (string, error) { - tar := target.(StringLockTarget) - return StringLockTargetToString(&tar) -} - -// ParseTargetString 解析字符串格式的锁对象数据 -func (l *ShardStoreLock) ParseTargetString(targetStr string) (any, error) { - return StringLockTargetFromString(targetStr) -} - // Clear 清除内部所有状态 func (l *ShardStoreLock) Clear() { l.stgLocks = make(map[string]*ShardStoreStorageLock) } type ShardStoreStorageLock struct { - buzyReqIDs []string - gcReqIDs []string + buzyReqIDs []types.RequestID + gcReqIDs []types.RequestID lockCompatibilityTable *LockCompatibilityTable } @@ -107,12 +96,12 @@ func NewShardStoreStorageLock() *ShardStoreStorageLock { } // CanLock 判断这个锁能否锁定成功 -func (l *ShardStoreStorageLock) CanLock(lock distlock.Lock) error { +func (l *ShardStoreStorageLock) CanLock(lock types.Lock) error { return l.lockCompatibilityTable.Test(lock) } // 锁定 -func (l *ShardStoreStorageLock) Lock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreStorageLock) Lock(reqID types.RequestID, lock types.Lock) error { switch lock.Name { case ShardStoreBuzyLock: l.buzyReqIDs = append(l.buzyReqIDs, reqID) @@ -126,7 +115,7 @@ func (l *ShardStoreStorageLock) Lock(reqID string, lock distlock.Lock) error { } // 解锁 -func (l *ShardStoreStorageLock) Unlock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreStorageLock) Unlock(reqID types.RequestID, lock types.Lock) error { switch lock.Name { case ShardStoreBuzyLock: l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) diff --git a/common/pkgs/distlock/lockprovider/shard_store_test.go b/common/pkgs/distlock/lockprovider/shard_store_test.go index c38854d..cd88d8f 100644 --- a/common/pkgs/distlock/lockprovider/shard_store_test.go +++ b/common/pkgs/distlock/lockprovider/shard_store_test.go @@ -4,25 +4,25 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) func Test_ShardStoreLock(t *testing.T) { cases := []struct { title string - initLocks []distlock.Lock - doLock distlock.Lock + initLocks []types.Lock + doLock types.Lock wantOK bool }{ { title: "同节点,同一个Buzy锁", - initLocks: []distlock.Lock{ + initLocks: []types.Lock{ { Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreBuzyLock, }, }, - doLock: distlock.Lock{ + doLock: types.Lock{ Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreBuzyLock, }, @@ -30,13 +30,13 @@ func Test_ShardStoreLock(t *testing.T) { }, { title: "同节点,同一个GC锁", - initLocks: []distlock.Lock{ + initLocks: []types.Lock{ { Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreGCLock, }, }, - doLock: distlock.Lock{ + doLock: types.Lock{ Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreGCLock, }, @@ -44,17 +44,17 @@ func Test_ShardStoreLock(t *testing.T) { }, { title: "同时设置Buzy和GC", - initLocks: []distlock.Lock{ + initLocks: []types.Lock{ { Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreBuzyLock, - Target: *NewStringLockTarget(), + Target: NewStringLockTarget(), }, }, - doLock: distlock.Lock{ + doLock: types.Lock{ Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreGCLock, - Target: *NewStringLockTarget(), + Target: NewStringLockTarget(), }, wantOK: false, }, @@ -80,7 +80,7 @@ func Test_ShardStoreLock(t *testing.T) { Convey("解锁", t, func() { ipfsLock := NewShardStoreLock() - lock := distlock.Lock{ + lock := types.Lock{ Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreBuzyLock, } @@ -92,7 +92,7 @@ func Test_ShardStoreLock(t *testing.T) { ipfsLock.Unlock("req1", lock) - lock = distlock.Lock{ + lock = types.Lock{ Path: []string{ShardStoreLockPathPrefix, "hub1"}, Name: ShardStoreGCLock, } diff --git a/common/pkgs/distlock/lockprovider/storage_lock.go b/common/pkgs/distlock/lockprovider/storage_lock.go deleted file mode 100644 index 8e2f5bf..0000000 --- a/common/pkgs/distlock/lockprovider/storage_lock.go +++ /dev/null @@ -1,140 +0,0 @@ -package lockprovider - -import ( - "fmt" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/utils/lo2" -) - -const ( - StorageLockPathPrefix = "Storage" - StorageHubIDPathIndex = 1 - StorageBuzyLock = "Buzy" - StorageGCLock = "GC" -) - -type StorageLock struct { - nodeLocks map[string]*StorageNodeLock - dummyLock *StorageNodeLock -} - -func NewStorageLock() *StorageLock { - return &StorageLock{ - nodeLocks: make(map[string]*StorageNodeLock), - dummyLock: NewStorageNodeLock(), - } -} - -// CanLock 判断这个锁能否锁定成功 -func (l *StorageLock) CanLock(lock distlock.Lock) error { - nodeLock, ok := l.nodeLocks[lock.Path[StorageHubIDPathIndex]] - if !ok { - // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 - // 这里使用一个空Provider来进行检查。 - return l.dummyLock.CanLock(lock) - } - - return nodeLock.CanLock(lock) -} - -// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 -func (l *StorageLock) Lock(reqID string, lock distlock.Lock) error { - hubID := lock.Path[StorageHubIDPathIndex] - - nodeLock, ok := l.nodeLocks[hubID] - if !ok { - nodeLock = NewStorageNodeLock() - l.nodeLocks[hubID] = nodeLock - } - - return nodeLock.Lock(reqID, lock) -} - -// 解锁 -func (l *StorageLock) Unlock(reqID string, lock distlock.Lock) error { - hubID := lock.Path[StorageHubIDPathIndex] - - nodeLock, ok := l.nodeLocks[hubID] - if !ok { - return nil - } - - return nodeLock.Unlock(reqID, lock) -} - -// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD -func (l *StorageLock) GetTargetString(target any) (string, error) { - tar := target.(StringLockTarget) - return StringLockTargetToString(&tar) -} - -// ParseTargetString 解析字符串格式的锁对象数据 -func (l *StorageLock) ParseTargetString(targetStr string) (any, error) { - return StringLockTargetFromString(targetStr) -} - -// Clear 清除内部所有状态 -func (l *StorageLock) Clear() { - l.nodeLocks = make(map[string]*StorageNodeLock) -} - -type StorageNodeLock struct { - buzyReqIDs []string - gcReqIDs []string - - lockCompatibilityTable *LockCompatibilityTable -} - -func NewStorageNodeLock() *StorageNodeLock { - compTable := &LockCompatibilityTable{} - - StorageLock := StorageNodeLock{ - lockCompatibilityTable: compTable, - } - - compTable. - Column(StorageBuzyLock, func() bool { return len(StorageLock.buzyReqIDs) > 0 }). - Column(StorageGCLock, func() bool { return len(StorageLock.gcReqIDs) > 0 }) - - comp := LockCompatible() - uncp := LockUncompatible() - - compTable.MustRow(comp, uncp) - compTable.MustRow(uncp, comp) - - return &StorageLock -} - -// CanLock 判断这个锁能否锁定成功 -func (l *StorageNodeLock) CanLock(lock distlock.Lock) error { - return l.lockCompatibilityTable.Test(lock) -} - -// 锁定 -func (l *StorageNodeLock) Lock(reqID string, lock distlock.Lock) error { - switch lock.Name { - case StorageBuzyLock: - l.buzyReqIDs = append(l.buzyReqIDs, reqID) - case StorageGCLock: - l.gcReqIDs = append(l.gcReqIDs, reqID) - default: - return fmt.Errorf("unknow lock name: %s", lock.Name) - } - - return nil -} - -// 解锁 -func (l *StorageNodeLock) Unlock(reqID string, lock distlock.Lock) error { - switch lock.Name { - case StorageBuzyLock: - l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) - case StorageGCLock: - l.gcReqIDs = lo2.Remove(l.gcReqIDs, reqID) - default: - return fmt.Errorf("unknow lock name: %s", lock.Name) - } - - return nil -} diff --git a/common/pkgs/distlock/lockprovider/string_lock_target.go b/common/pkgs/distlock/lockprovider/string_lock_target.go index 729dc85..fce3f17 100644 --- a/common/pkgs/distlock/lockprovider/string_lock_target.go +++ b/common/pkgs/distlock/lockprovider/string_lock_target.go @@ -5,6 +5,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) type StringLockTarget struct { @@ -43,6 +44,25 @@ func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool { return false } +func (t *StringLockTarget) Equals(other types.LockTarget) bool { + st, ok := other.(*StringLockTarget) + if !ok { + return false + } + + if len(t.Components) != len(st.Components) { + return false + } + + for i := 0; i < len(t.Components); i++ { + if !t.Components[i].IsEquals(&st.Components[i]) { + return false + } + } + + return true +} + type StringLockTargetComponet struct { Values []string `json:"values"` } diff --git a/common/pkgs/distlock/mutex.go b/common/pkgs/distlock/mutex.go new file mode 100644 index 0000000..9a9ac80 --- /dev/null +++ b/common/pkgs/distlock/mutex.go @@ -0,0 +1,15 @@ +package distlock + +import ( + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" +) + +type Mutex struct { + svc *Service + lockReq types.LockRequest + lockReqID types.RequestID +} + +func (m *Mutex) Unlock() { + m.svc.release(m.lockReqID, m.lockReq) +} diff --git a/common/pkgs/distlock/reentrant.go b/common/pkgs/distlock/reentrant.go new file mode 100644 index 0000000..7243d7c --- /dev/null +++ b/common/pkgs/distlock/reentrant.go @@ -0,0 +1,53 @@ +package distlock + +import "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" + +type Reentrant struct { + svc *Service + reqs []types.LockRequest + locked []*Mutex +} + +func (r *Reentrant) Lock(req types.LockRequest, opt ...AcquireOptionFn) error { + var willLock []types.Lock + +loop: + for _, lock := range req.Locks { + for _, req := range r.reqs { + for _, locked := range req.Locks { + if locked.Equals(lock) { + continue loop + } + } + } + + willLock = append(willLock, lock) + } + + if len(willLock) == 0 { + return nil + } + + newReq := types.LockRequest{ + Reason: req.Reason, + Locks: willLock, + } + + m, err := r.svc.Acquire(newReq, opt...) + if err != nil { + return err + } + + r.reqs = append(r.reqs, newReq) + r.locked = append(r.locked, m) + + return nil +} + +func (r *Reentrant) Unlock() { + for i := len(r.reqs) - 1; i >= 0; i-- { + r.locked[i].Unlock() + } + r.locked = nil + r.reqs = nil +} diff --git a/common/pkgs/distlock/reqbuilder/lock_request_builder.go b/common/pkgs/distlock/reqbuilder/lock_request_builder.go index 9acbde3..c06e096 100644 --- a/common/pkgs/distlock/reqbuilder/lock_request_builder.go +++ b/common/pkgs/distlock/reqbuilder/lock_request_builder.go @@ -1,30 +1,29 @@ package reqbuilder import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) type LockRequestBuilder struct { - locks []distlock.Lock + locks []types.Lock } func NewBuilder() *LockRequestBuilder { return &LockRequestBuilder{} } -func (b *LockRequestBuilder) Build() distlock.LockRequest { - return distlock.LockRequest{ +func (b *LockRequestBuilder) IsEmpty() bool { + return len(b.locks) == 0 +} + +func (b *LockRequestBuilder) Build() types.LockRequest { + return types.LockRequest{ Locks: lo2.ArrayClone(b.locks), } } -func (b *LockRequestBuilder) MutexLock(svc *distlock.Service) (*distlock.Mutex, error) { - mutex := distlock.NewMutex(svc, b.Build()) - err := mutex.Lock() - if err != nil { - return nil, err - } - - return mutex, nil +func (b *LockRequestBuilder) MutexLock(svc *distlock.Service, opt ...distlock.AcquireOptionFn) (*distlock.Mutex, error) { + return svc.Acquire(b.Build(), opt...) } diff --git a/common/pkgs/distlock/reqbuilder/metadata.go b/common/pkgs/distlock/reqbuilder/metadata.go deleted file mode 100644 index da2c199..0000000 --- a/common/pkgs/distlock/reqbuilder/metadata.go +++ /dev/null @@ -1,17 +0,0 @@ -package reqbuilder - -import ( - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" -) - -type MetadataLockReqBuilder struct { - *LockRequestBuilder -} - -func (b *LockRequestBuilder) Metadata() *MetadataLockReqBuilder { - return &MetadataLockReqBuilder{LockRequestBuilder: b} -} - -func (b *MetadataLockReqBuilder) makePath(tableName string) []string { - return []string{lockprovider.MetadataLockPathPrefix, tableName} -} diff --git a/common/pkgs/distlock/reqbuilder/metadata_object.go b/common/pkgs/distlock/reqbuilder/metadata_object.go deleted file mode 100644 index fac2328..0000000 --- a/common/pkgs/distlock/reqbuilder/metadata_object.go +++ /dev/null @@ -1,24 +0,0 @@ -package reqbuilder - -import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" - clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" -) - -type MetadataObjectLockReqBuilder struct { - *MetadataLockReqBuilder -} - -func (b *MetadataLockReqBuilder) Object() *MetadataObjectLockReqBuilder { - return &MetadataObjectLockReqBuilder{MetadataLockReqBuilder: b} -} - -func (b *MetadataObjectLockReqBuilder) CreateOne(packageID clitypes.PackageID, objectPath string) *MetadataObjectLockReqBuilder { - b.locks = append(b.locks, distlock.Lock{ - Path: b.makePath("Object"), - Name: lockprovider.MetadataCreateLock, - Target: *lockprovider.NewStringLockTarget().Add(packageID, objectPath), - }) - return b -} diff --git a/common/pkgs/distlock/reqbuilder/shard_store.go b/common/pkgs/distlock/reqbuilder/shard_store.go index a156582..a6b5c06 100644 --- a/common/pkgs/distlock/reqbuilder/shard_store.go +++ b/common/pkgs/distlock/reqbuilder/shard_store.go @@ -3,9 +3,9 @@ package reqbuilder import ( "strconv" - "gitlink.org.cn/cloudream/common/pkgs/distlock" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) type ShardStoreLockReqBuilder struct { @@ -15,24 +15,24 @@ type ShardStoreLockReqBuilder struct { func (b *LockRequestBuilder) Shard() *ShardStoreLockReqBuilder { return &ShardStoreLockReqBuilder{LockRequestBuilder: b} } -func (b *ShardStoreLockReqBuilder) Buzy(stgID cortypes.StorageID) *ShardStoreLockReqBuilder { - b.locks = append(b.locks, distlock.Lock{ - Path: b.makePath(stgID), +func (b *ShardStoreLockReqBuilder) Buzy(spaceID clitypes.UserSpaceID) *ShardStoreLockReqBuilder { + b.locks = append(b.locks, types.Lock{ + Path: b.makePath(spaceID), Name: lockprovider.ShardStoreBuzyLock, - Target: *lockprovider.NewStringLockTarget(), + Target: lockprovider.NewEmptyTarget(), }) return b } -func (b *ShardStoreLockReqBuilder) GC(stgID cortypes.StorageID) *ShardStoreLockReqBuilder { - b.locks = append(b.locks, distlock.Lock{ - Path: b.makePath(stgID), +func (b *ShardStoreLockReqBuilder) GC(spaceID clitypes.UserSpaceID) *ShardStoreLockReqBuilder { + b.locks = append(b.locks, types.Lock{ + Path: b.makePath(spaceID), Name: lockprovider.ShardStoreGCLock, - Target: *lockprovider.NewStringLockTarget(), + Target: lockprovider.NewEmptyTarget(), }) return b } -func (b *ShardStoreLockReqBuilder) makePath(hubID cortypes.StorageID) []string { +func (b *ShardStoreLockReqBuilder) makePath(hubID clitypes.UserSpaceID) []string { return []string{lockprovider.ShardStoreLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} } diff --git a/common/pkgs/distlock/reqbuilder/storage.go b/common/pkgs/distlock/reqbuilder/storage.go deleted file mode 100644 index 43179b9..0000000 --- a/common/pkgs/distlock/reqbuilder/storage.go +++ /dev/null @@ -1,39 +0,0 @@ -package reqbuilder - -import ( - "strconv" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" -) - -type StorageLockReqBuilder struct { - *LockRequestBuilder -} - -func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder { - return &StorageLockReqBuilder{LockRequestBuilder: b} -} - -func (b *StorageLockReqBuilder) Buzy(storageID cortypes.StorageID) *StorageLockReqBuilder { - b.locks = append(b.locks, distlock.Lock{ - Path: b.makePath(storageID), - Name: lockprovider.StorageBuzyLock, - Target: *lockprovider.NewStringLockTarget(), - }) - return b -} - -func (b *StorageLockReqBuilder) GC(storageID cortypes.StorageID) *StorageLockReqBuilder { - b.locks = append(b.locks, distlock.Lock{ - Path: b.makePath(storageID), - Name: lockprovider.StorageGCLock, - Target: *lockprovider.NewStringLockTarget(), - }) - return b -} - -func (b *StorageLockReqBuilder) makePath(storageID cortypes.StorageID) []string { - return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(int64(storageID), 10)} -} diff --git a/common/pkgs/distlock/service.go b/common/pkgs/distlock/service.go index 3308028..bb922c4 100644 --- a/common/pkgs/distlock/service.go +++ b/common/pkgs/distlock/service.go @@ -1,62 +1,194 @@ package distlock import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" + "context" + "fmt" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) -type Service = distlock.Service +type AcquireOption struct { + Timeout time.Duration +} -type Mutex = distlock.Mutex +type AcquireOptionFn func(opt *AcquireOption) -func NewService(cfg *distlock.Config) (*distlock.Service, error) { - srv, err := distlock.NewService(cfg, initProviders()) +func WithTimeout(timeout time.Duration) AcquireOptionFn { + return func(opt *AcquireOption) { + opt.Timeout = timeout + } +} + +type Service struct { + lock *sync.Mutex + provdersTrie *trie.Trie[types.LockProvider] + acquirings []*acquireInfo + nextReqID int64 +} + +func NewService() *Service { + svc := &Service{ + lock: &sync.Mutex{}, + provdersTrie: trie.NewTrie[types.LockProvider](), + } + + svc.provdersTrie.Create([]any{lockprovider.ShardStoreLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewShardStoreLock() + return svc +} + +type acquireInfo struct { + Request types.LockRequest + Callback *future.SetValueFuture[types.RequestID] + LastErr error +} + +func (svc *Service) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (*Mutex, error) { + var opt = AcquireOption{ + Timeout: time.Second * 10, + } + for _, fn := range opts { + fn(&opt) + } + + ctx := context.Background() + if opt.Timeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, opt.Timeout) + defer cancel() + } + + // 就地检测锁是否可用 + svc.lock.Lock() + defer svc.lock.Unlock() + + reqID, err := svc.tryAcquireOne(req) if err != nil { return nil, err } - return srv, nil + if reqID != "" { + return &Mutex{ + svc: svc, + lockReq: req, + lockReqID: reqID, + }, nil + } + + // 就地检测失败,那么就需要异步等待锁可用 + info := &acquireInfo{ + Request: req, + Callback: future.NewSetValue[types.RequestID](), + } + svc.acquirings = append(svc.acquirings, info) + + // 等待的时候不加锁 + svc.lock.Unlock() + reqID, err = info.Callback.Wait(ctx) + svc.lock.Lock() + + if err == nil { + return &Mutex{ + svc: svc, + lockReq: req, + lockReqID: reqID, + }, nil + } + + if err != future.ErrCanceled { + lo2.Remove(svc.acquirings, info) + return nil, err + } + + // 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果 + reqID, err = info.Callback.TryGetValue() + if err == nil { + return &Mutex{ + svc: svc, + lockReq: req, + lockReqID: reqID, + }, nil + } + + lo2.Remove(svc.acquirings, info) + return nil, err } -func initProviders() []distlock.PathProvider { - var provs []distlock.PathProvider - - provs = append(provs, initMetadataLockProviders()...) - - provs = append(provs, initShardLockProviders()...) - - provs = append(provs, initStorageLockProviders()...) - - return provs -} - -func initMetadataLockProviders() []distlock.PathProvider { - return []distlock.PathProvider{ - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Hub"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Storage"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "User"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserBucket"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserHub"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserStorage"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Bucket"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Object"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Package"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectRep"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectBlock"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Cache"), - distlock.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Location"), +func (s *Service) BeginReentrant() *Reentrant { + return &Reentrant{ + svc: s, } } -func initShardLockProviders() []distlock.PathProvider { - return []distlock.PathProvider{ - distlock.NewPathProvider(lockprovider.NewShardStoreLock(), lockprovider.ShardStoreLockPathPrefix, trie.WORD_ANY), +func (s *Service) release(reqID types.RequestID, req types.LockRequest) { + s.lock.Lock() + defer s.lock.Unlock() + + s.releaseRequest(reqID, req) + s.tryAcquirings() +} + +func (a *Service) tryAcquirings() { + for i := 0; i < len(a.acquirings); i++ { + req := a.acquirings[i] + + reqID, err := a.tryAcquireOne(req.Request) + if err != nil { + req.LastErr = err + continue + } + + req.Callback.SetValue(reqID) + a.acquirings[i] = nil + } + + a.acquirings = lo2.RemoveAllDefault(a.acquirings) +} + +func (s *Service) tryAcquireOne(req types.LockRequest) (types.RequestID, error) { + err := s.testOneRequest(req) + if err != nil { + return "", err + } + + reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID)) + s.nextReqID++ + + s.applyRequest(reqID, req) + return reqID, nil +} + +func (s *Service) testOneRequest(req types.LockRequest) error { + for _, lock := range req.Locks { + n, ok := s.provdersTrie.WalkEnd(lock.Path) + if !ok || n.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lock.Path) + } + + err := n.Value.CanLock(lock) + if err != nil { + return err + } + } + + return nil +} + +func (s *Service) applyRequest(reqID types.RequestID, req types.LockRequest) { + for _, lock := range req.Locks { + p, _ := s.provdersTrie.WalkEnd(lock.Path) + p.Value.Lock(reqID, lock) } } -func initStorageLockProviders() []distlock.PathProvider { - return []distlock.PathProvider{ - distlock.NewPathProvider(lockprovider.NewStorageLock(), lockprovider.StorageLockPathPrefix, trie.WORD_ANY), +func (s *Service) releaseRequest(reqID types.RequestID, req types.LockRequest) { + for _, lock := range req.Locks { + p, _ := s.provdersTrie.WalkEnd(lock.Path) + p.Value.Unlock(reqID, lock) } } diff --git a/common/pkgs/distlock/types/models.go b/common/pkgs/distlock/types/models.go new file mode 100644 index 0000000..c946cc1 --- /dev/null +++ b/common/pkgs/distlock/types/models.go @@ -0,0 +1,60 @@ +package types + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type RequestID string + +type Lock struct { + Path []string // 锁路径,存储的是路径的每一部分 + Name string // 锁名 + Target LockTarget // 锁对象,由具体的Provider去解析 +} + +func (b *Lock) Equals(other Lock) bool { + return lo2.ArrayEquals(b.Path, other.Path) && b.Name == other.Name && b.Target.Equals(other.Target) +} + +type LockTarget interface { + Equals(other LockTarget) bool +} + +type LockRequest struct { + Reason string + Locks []Lock +} + +func (b *LockRequest) Add(lock Lock) { + b.Locks = append(b.Locks, lock) +} + +type LockProvider interface { + // CanLock 判断这个锁能否锁定成功 + CanLock(lock Lock) error + + // Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。 + Lock(reqID RequestID, lock Lock) error + + // 解锁 + Unlock(reqID RequestID, lock Lock) error + + // Clear 清除内部所有状态 + Clear() +} + +type LockTargetBusyError struct { + lockName string +} + +func (e *LockTargetBusyError) Error() string { + return fmt.Sprintf("the lock object is locked by %s", e.lockName) +} + +func NewLockTargetBusyError(lockName string) *LockTargetBusyError { + return &LockTargetBusyError{ + lockName: lockName, + } +} diff --git a/deploy/arm64/yaml/config/client.config.json b/deploy/arm64/yaml/config/client.config.json index df81039..1761eba 100644 --- a/deploy/arm64/yaml/config/client.config.json +++ b/deploy/arm64/yaml/config/client.config.json @@ -17,15 +17,6 @@ "password": "123456", "vhost": "/" }, - "ipfs": null, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a client" - }, "connectivity": { "testInterval": 300 }, diff --git a/deploy/arm64/yaml/config/agent-{{NODE_NAME}}.config.json b/deploy/arm64/yaml/config/hub-{{NODE_NAME}}.config.json similarity index 73% rename from deploy/arm64/yaml/config/agent-{{NODE_NAME}}.config.json rename to deploy/arm64/yaml/config/hub-{{NODE_NAME}}.config.json index c57fe24..625ff2a 100644 --- a/deploy/arm64/yaml/config/agent-{{NODE_NAME}}.config.json +++ b/deploy/arm64/yaml/config/hub-{{NODE_NAME}}.config.json @@ -25,14 +25,6 @@ "ipfs": { "address": "127.0.0.1:5001" }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a agent" - }, "connectivity": { "testInterval": 300 }, diff --git a/deploy/arm64/yaml/config/scanner.config.json b/deploy/arm64/yaml/config/scanner.config.json deleted file mode 100644 index e84b602..0000000 --- a/deploy/arm64/yaml/config/scanner.config.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "ecFileSizeThreshold": 104857600, - "nodeUnavailableSeconds": 300, - "logger": { - "output": "file", - "outputFileName": "scanner", - "outputDirectory": "log", - "level": "debug" - }, - "db": { - "address": "127.0.0.1:3306", - "account": "root", - "password": "123456", - "databaseName": "cloudream" - }, - "rabbitMQ": { - "address": "127.0.0.1:5672", - "account": "cloudream", - "password": "123456", - "vhost": "/" - }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a scanner" - } -} \ No newline at end of file diff --git a/deploy/x86/yaml/config/client.config.json b/deploy/x86/yaml/config/client.config.json index df81039..1761eba 100644 --- a/deploy/x86/yaml/config/client.config.json +++ b/deploy/x86/yaml/config/client.config.json @@ -17,15 +17,6 @@ "password": "123456", "vhost": "/" }, - "ipfs": null, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a client" - }, "connectivity": { "testInterval": 300 }, diff --git a/deploy/x86/yaml/config/agent-{{NODE_NAME}}.config.json b/deploy/x86/yaml/config/hub-{{NODE_NAME}}.config.json similarity index 67% rename from deploy/x86/yaml/config/agent-{{NODE_NAME}}.config.json rename to deploy/x86/yaml/config/hub-{{NODE_NAME}}.config.json index 61c323f..1d85e65 100644 --- a/deploy/x86/yaml/config/agent-{{NODE_NAME}}.config.json +++ b/deploy/x86/yaml/config/hub-{{NODE_NAME}}.config.json @@ -22,17 +22,6 @@ "password": "123456", "vhost": "/" }, - "ipfs": { - "address": "127.0.0.1:5001" - }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a agent" - }, "connectivity": { "testInterval": 300 }, diff --git a/deploy/x86/yaml/config/scanner.config.json b/deploy/x86/yaml/config/scanner.config.json deleted file mode 100644 index e84b602..0000000 --- a/deploy/x86/yaml/config/scanner.config.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "ecFileSizeThreshold": 104857600, - "nodeUnavailableSeconds": 300, - "logger": { - "output": "file", - "outputFileName": "scanner", - "outputDirectory": "log", - "level": "debug" - }, - "db": { - "address": "127.0.0.1:3306", - "account": "root", - "password": "123456", - "databaseName": "cloudream" - }, - "rabbitMQ": { - "address": "127.0.0.1:5672", - "account": "cloudream", - "password": "123456", - "vhost": "/" - }, - "distlock": { - "etcdAddress": "127.0.0.1:2379", - "etcdUsername": "", - "etcdPassword": "", - "etcdLockLeaseTimeSec": 5, - "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a scanner" - } -} \ No newline at end of file diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index 89193c8..8ec6246 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -16,7 +16,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/grpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" @@ -130,12 +129,6 @@ func serve(configPath string, httpAddr string) { // }) // go serveAccessStat(acStat) - // 初始化分布式锁服务 - distlock, err := distlock.NewService(&config.Cfg().DistLock) - if err != nil { - logger.Fatalf("new ipfs failed, err: %s", err.Error()) - } - // 初始化系统事件发布器 evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceHub{ HubID: hubCfg.Hub.HubID, @@ -173,8 +166,6 @@ func serve(configPath string, httpAddr string) { hubrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool)) go serveGRPC(s, lis) - go serveDistLock(distlock) - foever := make(chan struct{}) <-foever } @@ -312,42 +303,3 @@ func serveHTTP(server *http.Server) { // TODO 仅简单结束了程序 os.Exit(1) } - -func serveDistLock(svc *distlock.Service) { - logger.Info("start serving distlock") - - err := svc.Serve() - - if err != nil { - logger.Errorf("distlock stopped with error: %s", err.Error()) - } - - logger.Info("distlock stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -// func serveAccessStat(svc *accessstat.AccessStat) { -// logger.Info("start serving access stat") - -// ch := svc.Start() -// loop: -// for { -// val, err := ch.Receive() -// if err != nil { -// logger.Errorf("access stat stopped with error: %v", err) -// break -// } - -// switch val := val.(type) { -// case error: -// logger.Errorf("access stat stopped with error: %v", val) -// break loop -// } -// } -// logger.Info("access stat stopped") - -// // TODO 仅简单结束了程序 -// os.Exit(1) -// } diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index 3a126e1..f832ea3 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -1,7 +1,6 @@ package config import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" @@ -17,7 +16,6 @@ type Config struct { GRPC *grpc.Config `json:"grpc"` Logger log.Config `json:"logger"` RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` }