diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 7ab7c62..99f1c01 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -194,7 +194,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync, tktk) // HTTP接口 httpCfgJSON := config.Cfg().HTTP diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 551f01d..07b1d94 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -185,7 +185,7 @@ func test(configPath string) { spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync, nil) go func() { doTest(svc) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index a9b4993..d4e85f4 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -177,7 +177,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync, nil) // HTTP接口 httpCfgJSON := config.Cfg().HTTP diff --git a/client/internal/http/v1/server.go b/client/internal/http/v1/server.go index 1b6567e..f8736d5 100644 --- a/client/internal/http/v1/server.go +++ b/client/internal/http/v1/server.go @@ -81,4 +81,7 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { rt.GET(cliapi.MountDumpStatusPath, certAuth, s.Mount().DumpStatus) rt.POST(cliapi.MountStartReclaimSpacePath, certAuth, s.Mount().StartReclaimSpace) + + rt.GET(cliapi.TickTockListJobsPath, certAuth, s.TickTock().ListJobs) + rt.POST(cliapi.TickTockRunJobPath, certAuth, s.TickTock().RunJob) } diff --git a/client/internal/http/v1/ticktock.go b/client/internal/http/v1/ticktock.go new file mode 100644 index 0000000..4bb4130 --- /dev/null +++ b/client/internal/http/v1/ticktock.go @@ -0,0 +1,46 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" +) + +type TickTockService struct { + *Server +} + +func (s *Server) TickTock() *TickTockService { + return &TickTockService{s} +} + +func (s *TickTockService) ListJobs(ctx *gin.Context) { + var req cliapi.TickTockListJobs + if err := ctx.ShouldBindQuery(&req); err != nil { + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + names := s.svc.TickTock.GetJobNames() + ctx.JSON(http.StatusOK, types.OK(cliapi.TickTockListJobsResp{ + Jobs: names, + })) +} + +func (s *TickTockService) RunJob(ctx *gin.Context) { + var req cliapi.TickTockRunJob + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + if !s.svc.TickTock.RunNow(req.Name) { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "job %s not found", req.Name)) + return + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.TickTockRunJobResp{})) +} diff --git a/client/internal/services/service.go b/client/internal/services/service.go index fca0cb4..16cbf7b 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" @@ -27,6 +28,7 @@ type Service struct { Mount *mount.Mount StgPool *pool.Pool SpaceSyncer *spacesyncer.SpaceSyncer + TickTock *ticktock.TickTock } func NewService( @@ -41,6 +43,7 @@ func NewService( mount *mount.Mount, stgPool *pool.Pool, spaceSyncer *spacesyncer.SpaceSyncer, + tickTock *ticktock.TickTock, ) *Service { return &Service{ PubLock: publock, @@ -54,5 +57,6 @@ func NewService( Mount: mount, StgPool: stgPool, SpaceSyncer: spaceSyncer, + TickTock: tickTock, } } diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index 55831ed..bd10e1a 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -62,14 +62,15 @@ func (t *TickTock) GetJobNames() []string { return lo.Keys(t.jobs) } -func (t *TickTock) RunNow(jobName string) { +func (t *TickTock) RunNow(jobName string) bool { j, ok := t.jobs[jobName] if !ok { logger.Warnf("job %s not found", jobName) - return + return false } j.cronJob.RunNow() + return true } func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) { diff --git a/client/sdk/api/v1/ticktock.go b/client/sdk/api/v1/ticktock.go new file mode 100644 index 0000000..a02b9fe --- /dev/null +++ b/client/sdk/api/v1/ticktock.go @@ -0,0 +1,55 @@ +package api + +import ( + "net/http" + + "gitlink.org.cn/cloudream/common/sdks" +) + +type TickTockService struct { + *Client +} + +func (c *Client) TickTock() *TickTockService { + return &TickTockService{c} +} + +const TickTockListJobsPath = "/tickTock/listJobs" + +type TickTockListJobs struct{} + +func (r *TickTockListJobs) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, TickTockListJobsPath, r) +} + +type TickTockListJobsResp struct { + Jobs []string `json:"jobs"` +} + +func (r *TickTockListJobsResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *TickTockService) ListJobs(req TickTockListJobs) (*TickTockListJobsResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &TickTockListJobsResp{}) +} + +const TickTockRunJobPath = "/tickTock/runJob" + +type TickTockRunJob struct { + Name string `json:"name"` +} + +func (r *TickTockRunJob) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, TickTockRunJobPath, r) +} + +type TickTockRunJobResp struct{} + +func (r *TickTockRunJobResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *TickTockService) RunJob(req TickTockRunJob) (*TickTockRunJobResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &TickTockRunJobResp{}) +} diff --git a/common/pkgs/storage/s3/base_store.go b/common/pkgs/storage/s3/base_store.go index 516ddc5..38ac555 100644 --- a/common/pkgs/storage/s3/base_store.go +++ b/common/pkgs/storage/s3/base_store.go @@ -14,6 +14,7 @@ import ( s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) @@ -173,24 +174,25 @@ func (s *BaseStore) CleanTemps() { marker = resp.NextMarker } - if len(deletes) == 0 { - return - } + for len(deletes) > 0 { + cnt := math2.Min(500, len(deletes)) + resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ + Bucket: aws.String(s.Bucket), + Delete: &s3types.Delete{ + Objects: deletes[:cnt], + }, + }) + if err != nil { + log.Warnf("delete temp files: %v", err) + return + } - resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ - Bucket: aws.String(s.Bucket), - Delete: &s3types.Delete{ - Objects: deletes, - }, - }) - if err != nil { - log.Warnf("delete temp files: %v", err) - return - } + for _, del := range resp.Deleted { + obj := deleteObjs[*del.Key] + log.Infof("remove unused temp file %v, size: %v, last mod time: %v", *obj.Key, *obj.Size, *obj.LastModified) + } - for _, del := range resp.Deleted { - obj := deleteObjs[*del.Key] - log.Infof("remove unused temp file %v, size: %v, last mod time: %v", *obj.Key, *obj.Size, *obj.LastModified) + deletes = deletes[cnt:] } } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 12e376f..f4d38f8 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) @@ -181,12 +182,14 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { marker = resp.NextMarker } - cnt := 0 - if len(deletes) > 0 { - resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ + totalCnt := len(deletes) + for len(deletes) > 0 { + cnt := math2.Min(500, len(deletes)) + + _, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String(s.Bucket), Delete: &s3types.Delete{ - Objects: deletes, + Objects: deletes[:cnt], }, }) if err != nil { @@ -194,10 +197,10 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { return err } - cnt = len(resp.Deleted) + deletes = deletes[cnt:] } - s.getLogger().Infof("purge %d files", cnt) + s.getLogger().Infof("purge %d files", totalCnt) // TODO 无法保证原子性,所以删除失败只打日志 return nil } diff --git a/jcsctl/cmd/admin/admin.go b/jcsctl/cmd/admin/admin.go new file mode 100644 index 0000000..3960753 --- /dev/null +++ b/jcsctl/cmd/admin/admin.go @@ -0,0 +1,15 @@ +package admin + +import ( + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +var AdminCmd = &cobra.Command{ + Use: "admin", + Aliases: []string{"adm"}, +} + +func init() { + cmd.RootCmd.AddCommand(AdminCmd) +} diff --git a/jcsctl/cmd/admin/ticktock/ls.go b/jcsctl/cmd/admin/ticktock/ls.go new file mode 100644 index 0000000..88394da --- /dev/null +++ b/jcsctl/cmd/admin/ticktock/ls.go @@ -0,0 +1,38 @@ +package ticktock + +import ( + "fmt" + + "github.com/spf13/cobra" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt lsOpt + cmd := cobra.Command{ + Use: "ls", + Args: cobra.ExactArgs(0), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return ls(c, ctx, opt, args) + }, + } + TickTockCmd.AddCommand(&cmd) +} + +type lsOpt struct { +} + +func ls(c *cobra.Command, ctx *cmd.CommandContext, opt lsOpt, args []string) error { + resp, err := ctx.Client.TickTock().ListJobs(cliapi.TickTockListJobs{}) + if err != nil { + return fmt.Errorf("list jobs : %v", err) + } + + for _, job := range resp.Jobs { + fmt.Println(job) + } + + return nil +} diff --git a/jcsctl/cmd/admin/ticktock/run.go b/jcsctl/cmd/admin/ticktock/run.go new file mode 100644 index 0000000..c610102 --- /dev/null +++ b/jcsctl/cmd/admin/ticktock/run.go @@ -0,0 +1,36 @@ +package ticktock + +import ( + "fmt" + + "github.com/spf13/cobra" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt runOpt + cmd := cobra.Command{ + Use: "run ", + Args: cobra.ExactArgs(1), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return run(c, ctx, opt, args) + }, + } + TickTockCmd.AddCommand(&cmd) +} + +type runOpt struct { +} + +func run(c *cobra.Command, ctx *cmd.CommandContext, opt runOpt, args []string) error { + _, err := ctx.Client.TickTock().RunJob(cliapi.TickTockRunJob{ + Name: args[0], + }) + if err != nil { + return fmt.Errorf("run job %v: %v", args[0], err) + } + + return nil +} diff --git a/jcsctl/cmd/admin/ticktock/ticktock.go b/jcsctl/cmd/admin/ticktock/ticktock.go new file mode 100644 index 0000000..ccfd79b --- /dev/null +++ b/jcsctl/cmd/admin/ticktock/ticktock.go @@ -0,0 +1,15 @@ +package ticktock + +import ( + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin" +) + +var TickTockCmd = &cobra.Command{ + Use: "ticktock", + Aliases: []string{"tktk"}, +} + +func init() { + admin.AdminCmd.AddCommand(TickTockCmd) +} diff --git a/jcsctl/cmd/all/all.go b/jcsctl/cmd/all/all.go index a0a7580..da6dbaf 100644 --- a/jcsctl/cmd/all/all.go +++ b/jcsctl/cmd/all/all.go @@ -1,6 +1,8 @@ package all import ( + _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin" + _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin/ticktock" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/bucket" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/geto" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/getp"