diff --git a/client/internal/cmdline/migrate.go b/client/internal/cmdline/migrate.go index f734255..849a3ac 100644 --- a/client/internal/cmdline/migrate.go +++ b/client/internal/cmdline/migrate.go @@ -25,8 +25,6 @@ func init() { } func migrate(configPath string) { - // TODO 将create_database.sql的内容逐渐移动到这里来 - err := config.Init(configPath) if err != nil { fmt.Println(err) diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ccb75a7..cf397c3 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -54,12 +54,6 @@ func (s *Server) Start() *ServerEventChan { logger.Infof("start serving http at: %s", s.cfg.Listen) err := s.httpSrv.ListenAndServe() - if err != nil { - logger.Infof("http stopped with error: %s", err.Error()) - } else { - logger.Infof("http stopped") - } - s.eventChan.Send(ExitEvent{Err: err}) }() return s.eventChan diff --git a/client/internal/mount/vfs/fuse_bucket.go b/client/internal/mount/vfs/fuse_bucket.go index f34052c..8a1ed78 100644 --- a/client/internal/mount/vfs/fuse_bucket.go +++ b/client/internal/mount/vfs/fuse_bucket.go @@ -68,8 +68,6 @@ func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, erro ca := r.vfs.cache.Stat(childPathComps) if ca == nil { - // TODO UserID - pkg, err := r.vfs.db.Package().GetByFullName(r.vfs.db.DefCtx(), r.bktName, name) if err == nil { dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{ @@ -156,7 +154,6 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error return nil, fuse.ErrPermission } - // TODO 用户ID,失败了可以打个日志 // TODO 生成系统事件 // 不关注创建是否成功,仅尝试一下 r.vfs.db.DoTx(func(tx db.SQLContext) error { diff --git a/client/internal/mount/vfs/fuse_root.go b/client/internal/mount/vfs/fuse_root.go index 98c9a03..08b7e9d 100644 --- a/client/internal/mount/vfs/fuse_root.go +++ b/client/internal/mount/vfs/fuse_root.go @@ -142,7 +142,6 @@ func (r *FuseRoot) NewDir(ctx context.Context, name string) (fuse.FsDir, error) return nil, fuse.ErrPermission } - // TODO 用户ID,失败了可以打个日志 // TODO 生成系统事件 // 不关注创建是否成功,仅尝试一下 r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), name, cache.ModTime()) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 32af672..69aaab7 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -78,7 +78,6 @@ func (svc *ObjectService) GetByPath(req api.ObjectListByPath) (api.ObjectListByP func (svc *ObjectService) GetByIDs(objectIDs []types.ObjectID) ([]*types.Object, error) { var ret []*types.Object err := svc.DB.DoTx(func(tx db.SQLContext) error { - // TODO 应该检查用户是否有每一个Object所在Package的权限 objs, err := svc.DB.Object().BatchGet(tx, objectIDs) if err != nil { return err @@ -252,7 +251,6 @@ func (svc *ObjectService) Move(movings []api.MovingObject) ([]types.ObjectID, er } func (svc *ObjectService) Download(req downloader.DownloadReqeust) (*downloader.Downloading, error) { - // TODO 检查用户ID iter := svc.Downloader.DownloadObjects([]downloader.DownloadReqeust{req}) // 初始化下载过程 @@ -408,7 +406,6 @@ func (svc *ObjectService) Clone(clonings []api.CloningObject) ([]*types.Object, var evt []*datamap.BodyNewOrUpdateObject - // TODO 要检查用户是否有Object、Package的权限 cloningMap := make(map[types.PackageID]*PackageClonings) for i, cloning := range clonings { pkg, ok := cloningMap[cloning.NewPackageID] diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 3d6756f..e6bcce6 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -48,7 +48,6 @@ func (svc *PackageService) Create(bucketID types.BucketID, name string) (types.P } func (svc *PackageService) DownloadPackage(packageID types.PackageID) (downloader.DownloadIterator, error) { - // TODO 检查用户ID return svc.Downloader.DownloadPackage(packageID), nil } diff --git a/common/assets/confs/hub.config.json b/common/assets/confs/hub.config.json index a15c7ff..ec4437e 100644 --- a/common/assets/confs/hub.config.json +++ b/common/assets/confs/hub.config.json @@ -8,6 +8,9 @@ "rpc": { "listen": "127.0.0.1:5010" }, + "http": { + "listen": "127.0.0.1:5110" + }, "coordinatorRPC": { "address": "127.0.0.1:5009" }, diff --git a/common/pkgs/grpc/config.go b/common/pkgs/grpc/config.go deleted file mode 100644 index f3dfb54..0000000 --- a/common/pkgs/grpc/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package grpc - -import "fmt" - -type Config struct { - IP string `json:"ip"` - Port int `json:"port"` -} - -func (c *Config) MakeListenAddress() string { - return fmt.Sprintf("%s:%d", c.IP, c.Port) -} diff --git a/common/pkgs/grpc/hub/client.go b/common/pkgs/grpc/hub/client.go deleted file mode 100644 index 0bfd25e..0000000 --- a/common/pkgs/grpc/hub/client.go +++ /dev/null @@ -1,206 +0,0 @@ -package hub - -import ( - "context" - "fmt" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/utils/serder" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -type Client struct { - con *grpc.ClientConn - cli HubClient -} - -func NewClient(addr string) (*Client, error) { - con, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, err - } - - return &Client{ - con: con, - cli: NewHubClient(con), - }, nil -} - -func (c *Client) ExecuteIOPlan(ctx context.Context, plan exec.Plan) error { - data, err := serder.ObjectToJSONEx(plan) - if err != nil { - return err - } - - _, err = c.cli.ExecuteIOPlan(ctx, &ExecuteIOPlanReq{ - Plan: string(data), - }) - return err -} - -type grpcStreamReadCloser struct { - io.ReadCloser - stream Hub_GetStreamClient - cancelFn context.CancelFunc - readingData []byte - recvEOF bool -} - -func (s *grpcStreamReadCloser) Read(p []byte) (int, error) { - if len(s.readingData) == 0 && !s.recvEOF { - resp, err := s.stream.Recv() - if err != nil { - return 0, err - } - - if resp.Type == StreamDataPacketType_Data { - s.readingData = resp.Data - - } else if resp.Type == StreamDataPacketType_EOF { - s.readingData = resp.Data - s.recvEOF = true - - } else { - return 0, fmt.Errorf("unsupported packt type: %v", resp.Type) - } - } - - cnt := copy(p, s.readingData) - s.readingData = s.readingData[cnt:] - - if len(s.readingData) == 0 && s.recvEOF { - return cnt, io.EOF - } - - return cnt, nil -} - -func (s *grpcStreamReadCloser) Close() error { - s.cancelFn() - - return nil -} - -func (c *Client) SendStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, str io.Reader) error { - sendCli, err := c.cli.SendStream(ctx) - if err != nil { - return err - } - - err = sendCli.Send(&StreamDataPacket{ - Type: StreamDataPacketType_SendArgs, - PlanID: string(planID), - VarID: int32(varID), - }) - if err != nil { - return fmt.Errorf("sending first stream packet: %w", err) - } - - buf := make([]byte, 1024*64) - for { - rd, err := str.Read(buf) - if err == io.EOF { - err := sendCli.Send(&StreamDataPacket{ - Type: StreamDataPacketType_EOF, - Data: buf[:rd], - }) - if err != nil { - return fmt.Errorf("sending EOF packet: %w", err) - } - - _, err = sendCli.CloseAndRecv() - if err != nil { - return fmt.Errorf("receiving response: %w", err) - } - - return nil - } - - if err != nil { - return fmt.Errorf("reading stream data: %w", err) - } - - err = sendCli.Send(&StreamDataPacket{ - Type: StreamDataPacketType_Data, - Data: buf[:rd], - }) - if err != nil { - return fmt.Errorf("sending data packet: %w", err) - } - } -} - -func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { - ctx, cancel := context.WithCancel(ctx) - - sdata, err := serder.ObjectToJSONEx(signal) - if err != nil { - cancel() - return nil, err - } - - stream, err := c.cli.GetStream(ctx, &GetStreamReq{ - PlanID: string(planID), - VarID: int32(varID), - SignalID: int32(signalID), - Signal: string(sdata), - }) - if err != nil { - cancel() - return nil, fmt.Errorf("request grpc failed, err: %w", err) - } - - return &grpcStreamReadCloser{ - stream: stream, - cancelFn: cancel, - }, nil -} - -func (c *Client) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { - data, err := serder.ObjectToJSONEx(value) - if err != nil { - return err - } - - _, err = c.cli.SendVar(ctx, &SendVarReq{ - PlanID: string(planID), - VarID: int32(id), - VarValue: string(data), - }) - return err -} - -func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { - sdata, err := serder.ObjectToJSONEx(signal) - if err != nil { - return nil, err - } - - resp, err := c.cli.GetVar(ctx, &GetVarReq{ - PlanID: string(planID), - VarID: int32(varID), - SignalID: int32(signalID), - Signal: string(sdata), - }) - if err != nil { - return nil, err - } - - getVar, err := serder.JSONToObjectEx[exec.VarValue]([]byte(resp.Var)) - if err != nil { - return nil, err - } - - return getVar, nil -} - -func (c *Client) Ping() error { - _, err := c.cli.Ping(context.Background(), &PingReq{}) - return err -} - -func (c *Client) Close() { - c.con.Close() -} diff --git a/common/pkgs/grpc/hub/hub.pb.go b/common/pkgs/grpc/hub/hub.pb.go deleted file mode 100644 index e60d843..0000000 --- a/common/pkgs/grpc/hub/hub.pb.go +++ /dev/null @@ -1,983 +0,0 @@ -// 使用的语法版本 - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.34.2 -// protoc v4.22.3 -// source: pkgs/grpc/hub/hub.proto - -package hub - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type StreamDataPacketType int32 - -const ( - StreamDataPacketType_EOF StreamDataPacketType = 0 - StreamDataPacketType_Data StreamDataPacketType = 1 - StreamDataPacketType_SendArgs StreamDataPacketType = 2 -) - -// Enum value maps for StreamDataPacketType. -var ( - StreamDataPacketType_name = map[int32]string{ - 0: "EOF", - 1: "Data", - 2: "SendArgs", - } - StreamDataPacketType_value = map[string]int32{ - "EOF": 0, - "Data": 1, - "SendArgs": 2, - } -) - -func (x StreamDataPacketType) Enum() *StreamDataPacketType { - p := new(StreamDataPacketType) - *p = x - return p -} - -func (x StreamDataPacketType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (StreamDataPacketType) Descriptor() protoreflect.EnumDescriptor { - return file_pkgs_grpc_hub_hub_proto_enumTypes[0].Descriptor() -} - -func (StreamDataPacketType) Type() protoreflect.EnumType { - return &file_pkgs_grpc_hub_hub_proto_enumTypes[0] -} - -func (x StreamDataPacketType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use StreamDataPacketType.Descriptor instead. -func (StreamDataPacketType) EnumDescriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{0} -} - -type ExecuteIOPlanReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Plan string `protobuf:"bytes,1,opt,name=Plan,proto3" json:"Plan,omitempty"` -} - -func (x *ExecuteIOPlanReq) Reset() { - *x = ExecuteIOPlanReq{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ExecuteIOPlanReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ExecuteIOPlanReq) ProtoMessage() {} - -func (x *ExecuteIOPlanReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ExecuteIOPlanReq.ProtoReflect.Descriptor instead. -func (*ExecuteIOPlanReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{0} -} - -func (x *ExecuteIOPlanReq) GetPlan() string { - if x != nil { - return x.Plan - } - return "" -} - -type ExecuteIOPlanResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *ExecuteIOPlanResp) Reset() { - *x = ExecuteIOPlanResp{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ExecuteIOPlanResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ExecuteIOPlanResp) ProtoMessage() {} - -func (x *ExecuteIOPlanResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ExecuteIOPlanResp.ProtoReflect.Descriptor instead. -func (*ExecuteIOPlanResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{1} -} - -// 文件数据。注意:只在Type为Data或EOF的时候,Data字段才能有数据 -type FileDataPacket struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` -} - -func (x *FileDataPacket) Reset() { - *x = FileDataPacket{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *FileDataPacket) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FileDataPacket) ProtoMessage() {} - -func (x *FileDataPacket) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead. -func (*FileDataPacket) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{2} -} - -func (x *FileDataPacket) GetType() StreamDataPacketType { - if x != nil { - return x.Type - } - return StreamDataPacketType_EOF -} - -func (x *FileDataPacket) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - -// 注:EOF时data也可能有数据 -type StreamDataPacket struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` - PlanID string `protobuf:"bytes,2,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - VarID int32 `protobuf:"varint,3,opt,name=VarID,proto3" json:"VarID,omitempty"` - Data []byte `protobuf:"bytes,4,opt,name=Data,proto3" json:"Data,omitempty"` -} - -func (x *StreamDataPacket) Reset() { - *x = StreamDataPacket{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StreamDataPacket) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StreamDataPacket) ProtoMessage() {} - -func (x *StreamDataPacket) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StreamDataPacket.ProtoReflect.Descriptor instead. -func (*StreamDataPacket) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{3} -} - -func (x *StreamDataPacket) GetType() StreamDataPacketType { - if x != nil { - return x.Type - } - return StreamDataPacketType_EOF -} - -func (x *StreamDataPacket) GetPlanID() string { - if x != nil { - return x.PlanID - } - return "" -} - -func (x *StreamDataPacket) GetVarID() int32 { - if x != nil { - return x.VarID - } - return 0 -} - -func (x *StreamDataPacket) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - -type SendStreamResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *SendStreamResp) Reset() { - *x = SendStreamResp{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SendStreamResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendStreamResp) ProtoMessage() {} - -func (x *SendStreamResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendStreamResp.ProtoReflect.Descriptor instead. -func (*SendStreamResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{4} -} - -type GetStreamReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` - SignalID int32 `protobuf:"varint,3,opt,name=SignalID,proto3" json:"SignalID,omitempty"` - Signal string `protobuf:"bytes,4,opt,name=Signal,proto3" json:"Signal,omitempty"` -} - -func (x *GetStreamReq) Reset() { - *x = GetStreamReq{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetStreamReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetStreamReq) ProtoMessage() {} - -func (x *GetStreamReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetStreamReq.ProtoReflect.Descriptor instead. -func (*GetStreamReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{5} -} - -func (x *GetStreamReq) GetPlanID() string { - if x != nil { - return x.PlanID - } - return "" -} - -func (x *GetStreamReq) GetVarID() int32 { - if x != nil { - return x.VarID - } - return 0 -} - -func (x *GetStreamReq) GetSignalID() int32 { - if x != nil { - return x.SignalID - } - return 0 -} - -func (x *GetStreamReq) GetSignal() string { - if x != nil { - return x.Signal - } - return "" -} - -type SendVarReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` - VarValue string `protobuf:"bytes,3,opt,name=VarValue,proto3" json:"VarValue,omitempty"` -} - -func (x *SendVarReq) Reset() { - *x = SendVarReq{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SendVarReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendVarReq) ProtoMessage() {} - -func (x *SendVarReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendVarReq.ProtoReflect.Descriptor instead. -func (*SendVarReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{6} -} - -func (x *SendVarReq) GetPlanID() string { - if x != nil { - return x.PlanID - } - return "" -} - -func (x *SendVarReq) GetVarID() int32 { - if x != nil { - return x.VarID - } - return 0 -} - -func (x *SendVarReq) GetVarValue() string { - if x != nil { - return x.VarValue - } - return "" -} - -type SendVarResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *SendVarResp) Reset() { - *x = SendVarResp{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SendVarResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendVarResp) ProtoMessage() {} - -func (x *SendVarResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[7] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendVarResp.ProtoReflect.Descriptor instead. -func (*SendVarResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{7} -} - -type GetVarReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` - SignalID int32 `protobuf:"varint,3,opt,name=SignalID,proto3" json:"SignalID,omitempty"` - Signal string `protobuf:"bytes,4,opt,name=Signal,proto3" json:"Signal,omitempty"` -} - -func (x *GetVarReq) Reset() { - *x = GetVarReq{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetVarReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetVarReq) ProtoMessage() {} - -func (x *GetVarReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetVarReq.ProtoReflect.Descriptor instead. -func (*GetVarReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{8} -} - -func (x *GetVarReq) GetPlanID() string { - if x != nil { - return x.PlanID - } - return "" -} - -func (x *GetVarReq) GetVarID() int32 { - if x != nil { - return x.VarID - } - return 0 -} - -func (x *GetVarReq) GetSignalID() int32 { - if x != nil { - return x.SignalID - } - return 0 -} - -func (x *GetVarReq) GetSignal() string { - if x != nil { - return x.Signal - } - return "" -} - -type GetVarResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Var string `protobuf:"bytes,1,opt,name=Var,proto3" json:"Var,omitempty"` -} - -func (x *GetVarResp) Reset() { - *x = GetVarResp{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetVarResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetVarResp) ProtoMessage() {} - -func (x *GetVarResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[9] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetVarResp.ProtoReflect.Descriptor instead. -func (*GetVarResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{9} -} - -func (x *GetVarResp) GetVar() string { - if x != nil { - return x.Var - } - return "" -} - -type PingReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *PingReq) Reset() { - *x = PingReq{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PingReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PingReq) ProtoMessage() {} - -func (x *PingReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[10] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PingReq.ProtoReflect.Descriptor instead. -func (*PingReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{10} -} - -type PingResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *PingResp) Reset() { - *x = PingResp{} - if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[11] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PingResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PingResp) ProtoMessage() {} - -func (x *PingResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[11] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PingResp.ProtoReflect.Descriptor instead. -func (*PingResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{11} -} - -var File_pkgs_grpc_hub_hub_proto protoreflect.FileDescriptor - -var file_pkgs_grpc_hub_hub_proto_rawDesc = []byte{ - 0x0a, 0x17, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x2f, - 0x68, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x26, 0x0a, 0x10, 0x45, 0x78, 0x65, - 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, - 0x04, 0x50, 0x6c, 0x61, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x50, 0x6c, 0x61, - 0x6e, 0x22, 0x13, 0x0a, 0x11, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, - 0x61, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x22, 0x4f, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x29, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, - 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x7f, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x29, 0x0a, 0x04, 0x54, - 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, - 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, - 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, - 0x61, 0x72, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, 0x53, 0x65, 0x6e, 0x64, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x70, 0x0a, 0x0c, 0x47, 0x65, - 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, - 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, - 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x53, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x56, 0x0a, 0x0a, - 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, - 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, - 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x56, 0x61, 0x72, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x56, 0x61, 0x72, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x22, 0x6d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, - 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, - 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x1a, - 0x0a, 0x08, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x08, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x69, - 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, - 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, - 0x61, 0x72, 0x22, 0x09, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x22, 0x0a, 0x0a, - 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, - 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x73, - 0x10, 0x02, 0x32, 0x94, 0x02, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x38, 0x0a, 0x0d, 0x45, 0x78, - 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x12, 0x11, 0x2e, 0x45, 0x78, - 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x12, - 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, - 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x31, 0x0a, 0x09, 0x47, 0x65, - 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, - 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x26, 0x0a, - 0x07, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x12, 0x0b, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x56, - 0x61, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x0c, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x23, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x12, - 0x0a, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x0b, 0x2e, 0x47, 0x65, - 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, - 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, - 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x68, - 0x75, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_pkgs_grpc_hub_hub_proto_rawDescOnce sync.Once - file_pkgs_grpc_hub_hub_proto_rawDescData = file_pkgs_grpc_hub_hub_proto_rawDesc -) - -func file_pkgs_grpc_hub_hub_proto_rawDescGZIP() []byte { - file_pkgs_grpc_hub_hub_proto_rawDescOnce.Do(func() { - file_pkgs_grpc_hub_hub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_grpc_hub_hub_proto_rawDescData) - }) - return file_pkgs_grpc_hub_hub_proto_rawDescData -} - -var file_pkgs_grpc_hub_hub_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_hub_hub_proto_msgTypes = make([]protoimpl.MessageInfo, 12) -var file_pkgs_grpc_hub_hub_proto_goTypes = []any{ - (StreamDataPacketType)(0), // 0: StreamDataPacketType - (*ExecuteIOPlanReq)(nil), // 1: ExecuteIOPlanReq - (*ExecuteIOPlanResp)(nil), // 2: ExecuteIOPlanResp - (*FileDataPacket)(nil), // 3: FileDataPacket - (*StreamDataPacket)(nil), // 4: StreamDataPacket - (*SendStreamResp)(nil), // 5: SendStreamResp - (*GetStreamReq)(nil), // 6: GetStreamReq - (*SendVarReq)(nil), // 7: SendVarReq - (*SendVarResp)(nil), // 8: SendVarResp - (*GetVarReq)(nil), // 9: GetVarReq - (*GetVarResp)(nil), // 10: GetVarResp - (*PingReq)(nil), // 11: PingReq - (*PingResp)(nil), // 12: PingResp -} -var file_pkgs_grpc_hub_hub_proto_depIdxs = []int32{ - 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType - 0, // 1: StreamDataPacket.Type:type_name -> StreamDataPacketType - 1, // 2: Hub.ExecuteIOPlan:input_type -> ExecuteIOPlanReq - 4, // 3: Hub.SendStream:input_type -> StreamDataPacket - 6, // 4: Hub.GetStream:input_type -> GetStreamReq - 7, // 5: Hub.SendVar:input_type -> SendVarReq - 9, // 6: Hub.GetVar:input_type -> GetVarReq - 11, // 7: Hub.Ping:input_type -> PingReq - 2, // 8: Hub.ExecuteIOPlan:output_type -> ExecuteIOPlanResp - 5, // 9: Hub.SendStream:output_type -> SendStreamResp - 4, // 10: Hub.GetStream:output_type -> StreamDataPacket - 8, // 11: Hub.SendVar:output_type -> SendVarResp - 10, // 12: Hub.GetVar:output_type -> GetVarResp - 12, // 13: Hub.Ping:output_type -> PingResp - 8, // [8:14] is the sub-list for method output_type - 2, // [2:8] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name -} - -func init() { file_pkgs_grpc_hub_hub_proto_init() } -func file_pkgs_grpc_hub_hub_proto_init() { - if File_pkgs_grpc_hub_hub_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pkgs_grpc_hub_hub_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*ExecuteIOPlanReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*ExecuteIOPlanResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*FileDataPacket); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*StreamDataPacket); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*SendStreamResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*GetStreamReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*SendVarReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*SendVarResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*GetVarReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*GetVarResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*PingReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkgs_grpc_hub_hub_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*PingResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkgs_grpc_hub_hub_proto_rawDesc, - NumEnums: 1, - NumMessages: 12, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_pkgs_grpc_hub_hub_proto_goTypes, - DependencyIndexes: file_pkgs_grpc_hub_hub_proto_depIdxs, - EnumInfos: file_pkgs_grpc_hub_hub_proto_enumTypes, - MessageInfos: file_pkgs_grpc_hub_hub_proto_msgTypes, - }.Build() - File_pkgs_grpc_hub_hub_proto = out.File - file_pkgs_grpc_hub_hub_proto_rawDesc = nil - file_pkgs_grpc_hub_hub_proto_goTypes = nil - file_pkgs_grpc_hub_hub_proto_depIdxs = nil -} diff --git a/common/pkgs/grpc/hub/hub.proto b/common/pkgs/grpc/hub/hub.proto deleted file mode 100644 index c943357..0000000 --- a/common/pkgs/grpc/hub/hub.proto +++ /dev/null @@ -1,75 +0,0 @@ -// 使用的语法版本 -syntax = "proto3"; - -// 生成的go文件包 -option go_package = ".;hub";//grpc这里生效了 - - - -message ExecuteIOPlanReq { - string Plan = 1; -} - -message ExecuteIOPlanResp { -} - -enum StreamDataPacketType { - EOF = 0; - Data = 1; - SendArgs = 2; -} -// 文件数据。注意:只在Type为Data或EOF的时候,Data字段才能有数据 -message FileDataPacket { - StreamDataPacketType Type = 1; - bytes Data = 2; -} - -// 注:EOF时data也可能有数据 -message StreamDataPacket { - StreamDataPacketType Type = 1; - string PlanID = 2; - int32 VarID = 3; - bytes Data = 4; -} - -message SendStreamResp {} - -message GetStreamReq { - string PlanID = 1; - int32 VarID = 2; - int32 SignalID = 3; - string Signal = 4; -} - -message SendVarReq { - string PlanID = 1; - int32 VarID = 2; - string VarValue = 3; -} -message SendVarResp {} - -message GetVarReq { - string PlanID = 1; - int32 VarID = 2; - int32 SignalID = 3; - string Signal = 4; -} -message GetVarResp { - string Var = 1; -} - -message PingReq {} -message PingResp {} - -service Hub { - rpc ExecuteIOPlan(ExecuteIOPlanReq) returns(ExecuteIOPlanResp){} - - rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} - rpc GetStream(GetStreamReq)returns(stream StreamDataPacket){} - - rpc SendVar(SendVarReq)returns(SendVarResp){} - rpc GetVar(GetVarReq)returns(GetVarResp){} - - rpc Ping(PingReq) returns(PingResp){} -} - diff --git a/common/pkgs/grpc/hub/hub_grpc.pb.go b/common/pkgs/grpc/hub/hub_grpc.pb.go deleted file mode 100644 index af595c9..0000000 --- a/common/pkgs/grpc/hub/hub_grpc.pb.go +++ /dev/null @@ -1,358 +0,0 @@ -// 使用的语法版本 - -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.22.3 -// source: pkgs/grpc/hub/hub.proto - -package hub - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - Hub_ExecuteIOPlan_FullMethodName = "/Hub/ExecuteIOPlan" - Hub_SendStream_FullMethodName = "/Hub/SendStream" - Hub_GetStream_FullMethodName = "/Hub/GetStream" - Hub_SendVar_FullMethodName = "/Hub/SendVar" - Hub_GetVar_FullMethodName = "/Hub/GetVar" - Hub_Ping_FullMethodName = "/Hub/Ping" -) - -// HubClient is the client API for Hub service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type HubClient interface { - ExecuteIOPlan(ctx context.Context, in *ExecuteIOPlanReq, opts ...grpc.CallOption) (*ExecuteIOPlanResp, error) - SendStream(ctx context.Context, opts ...grpc.CallOption) (Hub_SendStreamClient, error) - GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Hub_GetStreamClient, error) - SendVar(ctx context.Context, in *SendVarReq, opts ...grpc.CallOption) (*SendVarResp, error) - GetVar(ctx context.Context, in *GetVarReq, opts ...grpc.CallOption) (*GetVarResp, error) - Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) -} - -type hubClient struct { - cc grpc.ClientConnInterface -} - -func NewHubClient(cc grpc.ClientConnInterface) HubClient { - return &hubClient{cc} -} - -func (c *hubClient) ExecuteIOPlan(ctx context.Context, in *ExecuteIOPlanReq, opts ...grpc.CallOption) (*ExecuteIOPlanResp, error) { - out := new(ExecuteIOPlanResp) - err := c.cc.Invoke(ctx, Hub_ExecuteIOPlan_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (Hub_SendStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], Hub_SendStream_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &hubSendStreamClient{stream} - return x, nil -} - -type Hub_SendStreamClient interface { - Send(*StreamDataPacket) error - CloseAndRecv() (*SendStreamResp, error) - grpc.ClientStream -} - -type hubSendStreamClient struct { - grpc.ClientStream -} - -func (x *hubSendStreamClient) Send(m *StreamDataPacket) error { - return x.ClientStream.SendMsg(m) -} - -func (x *hubSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(SendStreamResp) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *hubClient) GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Hub_GetStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[1], Hub_GetStream_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &hubGetStreamClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Hub_GetStreamClient interface { - Recv() (*StreamDataPacket, error) - grpc.ClientStream -} - -type hubGetStreamClient struct { - grpc.ClientStream -} - -func (x *hubGetStreamClient) Recv() (*StreamDataPacket, error) { - m := new(StreamDataPacket) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *hubClient) SendVar(ctx context.Context, in *SendVarReq, opts ...grpc.CallOption) (*SendVarResp, error) { - out := new(SendVarResp) - err := c.cc.Invoke(ctx, Hub_SendVar_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) GetVar(ctx context.Context, in *GetVarReq, opts ...grpc.CallOption) (*GetVarResp, error) { - out := new(GetVarResp) - err := c.cc.Invoke(ctx, Hub_GetVar_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) { - out := new(PingResp) - err := c.cc.Invoke(ctx, Hub_Ping_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// HubServer is the server API for Hub service. -// All implementations must embed UnimplementedHubServer -// for forward compatibility -type HubServer interface { - ExecuteIOPlan(context.Context, *ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) - SendStream(Hub_SendStreamServer) error - GetStream(*GetStreamReq, Hub_GetStreamServer) error - SendVar(context.Context, *SendVarReq) (*SendVarResp, error) - GetVar(context.Context, *GetVarReq) (*GetVarResp, error) - Ping(context.Context, *PingReq) (*PingResp, error) - mustEmbedUnimplementedHubServer() -} - -// UnimplementedHubServer must be embedded to have forward compatible implementations. -type UnimplementedHubServer struct { -} - -func (UnimplementedHubServer) ExecuteIOPlan(context.Context, *ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) { - return nil, status.Errorf(codes.Unimplemented, "method ExecuteIOPlan not implemented") -} -func (UnimplementedHubServer) SendStream(Hub_SendStreamServer) error { - return status.Errorf(codes.Unimplemented, "method SendStream not implemented") -} -func (UnimplementedHubServer) GetStream(*GetStreamReq, Hub_GetStreamServer) error { - return status.Errorf(codes.Unimplemented, "method GetStream not implemented") -} -func (UnimplementedHubServer) SendVar(context.Context, *SendVarReq) (*SendVarResp, error) { - return nil, status.Errorf(codes.Unimplemented, "method SendVar not implemented") -} -func (UnimplementedHubServer) GetVar(context.Context, *GetVarReq) (*GetVarResp, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetVar not implemented") -} -func (UnimplementedHubServer) Ping(context.Context, *PingReq) (*PingResp, error) { - return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") -} -func (UnimplementedHubServer) mustEmbedUnimplementedHubServer() {} - -// UnsafeHubServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to HubServer will -// result in compilation errors. -type UnsafeHubServer interface { - mustEmbedUnimplementedHubServer() -} - -func RegisterHubServer(s grpc.ServiceRegistrar, srv HubServer) { - s.RegisterService(&Hub_ServiceDesc, srv) -} - -func _Hub_ExecuteIOPlan_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ExecuteIOPlanReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).ExecuteIOPlan(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_ExecuteIOPlan_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).ExecuteIOPlan(ctx, req.(*ExecuteIOPlanReq)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(HubServer).SendStream(&hubSendStreamServer{stream}) -} - -type Hub_SendStreamServer interface { - SendAndClose(*SendStreamResp) error - Recv() (*StreamDataPacket, error) - grpc.ServerStream -} - -type hubSendStreamServer struct { - grpc.ServerStream -} - -func (x *hubSendStreamServer) SendAndClose(m *SendStreamResp) error { - return x.ServerStream.SendMsg(m) -} - -func (x *hubSendStreamServer) Recv() (*StreamDataPacket, error) { - m := new(StreamDataPacket) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func _Hub_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(GetStreamReq) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(HubServer).GetStream(m, &hubGetStreamServer{stream}) -} - -type Hub_GetStreamServer interface { - Send(*StreamDataPacket) error - grpc.ServerStream -} - -type hubGetStreamServer struct { - grpc.ServerStream -} - -func (x *hubGetStreamServer) Send(m *StreamDataPacket) error { - return x.ServerStream.SendMsg(m) -} - -func _Hub_SendVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SendVarReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).SendVar(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_SendVar_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).SendVar(ctx, req.(*SendVarReq)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_GetVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GetVarReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).GetVar(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_GetVar_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).GetVar(ctx, req.(*GetVarReq)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PingReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).Ping(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_Ping_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).Ping(ctx, req.(*PingReq)) - } - return interceptor(ctx, in, info, handler) -} - -// Hub_ServiceDesc is the grpc.ServiceDesc for Hub service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Hub_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "Hub", - HandlerType: (*HubServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "ExecuteIOPlan", - Handler: _Hub_ExecuteIOPlan_Handler, - }, - { - MethodName: "SendVar", - Handler: _Hub_SendVar_Handler, - }, - { - MethodName: "GetVar", - Handler: _Hub_GetVar_Handler, - }, - { - MethodName: "Ping", - Handler: _Hub_Ping_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "SendStream", - Handler: _Hub_SendStream_Handler, - ClientStreams: true, - }, - { - StreamName: "GetStream", - Handler: _Hub_GetStream_Handler, - ServerStreams: true, - }, - }, - Metadata: "pkgs/grpc/hub/hub.proto", -} diff --git a/common/pkgs/grpc/hub/pool.go b/common/pkgs/grpc/hub/pool.go deleted file mode 100644 index 5831124..0000000 --- a/common/pkgs/grpc/hub/pool.go +++ /dev/null @@ -1,60 +0,0 @@ -package hub - -import ( - "fmt" - sync "sync" -) - -type PoolConfig struct { -} - -type PoolClient struct { - *Client - owner *Pool -} - -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { - grpcCfg *PoolConfig - shareds map[string]*PoolClient - lock sync.Mutex -} - -func NewPool(grpcCfg *PoolConfig) *Pool { - return &Pool{ - grpcCfg: grpcCfg, - shareds: make(map[string]*PoolClient), - } -} - -// 获取一个GRPC客户端。由于事先不能知道所有hub的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, -// Pool来决定要不要新建客户端。 -func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) { - addr := fmt.Sprintf("%s:%d", ip, port) - - p.lock.Lock() - defer p.lock.Unlock() - - cli, ok := p.shareds[addr] - if !ok { - c, err := NewClient(addr) - if err != nil { - return nil, err - } - cli = &PoolClient{ - Client: c, - owner: p, - } - p.shareds[addr] = cli - } - - return cli, nil - -} - -func (p *Pool) Release(cli *PoolClient) { - // TODO 释放长时间未使用的client -} diff --git a/common/pkgs/rpc/coordinator/pool.go b/common/pkgs/rpc/coordinator/pool.go index 28fbbd1..829c077 100644 --- a/common/pkgs/rpc/coordinator/pool.go +++ b/common/pkgs/rpc/coordinator/pool.go @@ -54,6 +54,10 @@ func (p *Pool) Get() *Client { } p.grpcCon = con + + } else if con.stopClosing != nil { + close(con.stopClosing) + con.stopClosing = nil } con.refCount++ @@ -86,8 +90,8 @@ func (p *Pool) release() { p.lock.Lock() defer p.lock.Unlock() - if grpcCon.refCount == 0 { - grpcCon.grpcCon.Close() + if p.grpcCon.refCount == 0 { + p.grpcCon.grpcCon.Close() p.grpcCon = nil } } diff --git a/common/pkgs/rpc/hub/pool.go b/common/pkgs/rpc/hub/pool.go index 71875f1..6710a94 100644 --- a/common/pkgs/rpc/hub/pool.go +++ b/common/pkgs/rpc/hub/pool.go @@ -59,6 +59,9 @@ func (p *Pool) Get(ip string, port int) *Client { } p.grpcCons[ga] = con + } else if con.stopClosing != nil { + close(con.stopClosing) + con.stopClosing = nil } con.refCount++ @@ -96,6 +99,11 @@ func (p *Pool) release(addr grpcAddr) { p.lock.Lock() defer p.lock.Unlock() + grpcCon := p.grpcCons[addr] + if grpcCon == nil { + return + } + if grpcCon.refCount == 0 { grpcCon.grpcCon.Close() delete(p.grpcCons, addr) diff --git a/common/pkgs/rpc/server.go b/common/pkgs/rpc/server.go index 30b03da..54746f5 100644 --- a/common/pkgs/rpc/server.go +++ b/common/pkgs/rpc/server.go @@ -4,6 +4,7 @@ import ( "net" "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/pkgs/logger" "google.golang.org/grpc" ) @@ -40,6 +41,7 @@ func NewServerBase(cfg Config, srvImpl any, svcDesc *grpc.ServiceDesc) *ServerBa func (s *ServerBase) Start() *ServerEventChan { ch := async.NewUnboundChannel[RPCServerEvent]() go func() { + logger.Infof("start serving rpc at: %v", s.cfg.Listen) lis, err := net.Listen("tcp", s.cfg.Listen) if err != nil { ch.Send(&ExitEvent{Err: err}) diff --git a/coordinator/internal/cmd/migrate.go b/coordinator/internal/cmd/migrate.go index d4e6663..7c21527 100644 --- a/coordinator/internal/cmd/migrate.go +++ b/coordinator/internal/cmd/migrate.go @@ -25,8 +25,6 @@ func init() { } func migrate(configPath string) { - // TODO 将create_database.sql的内容逐渐移动到这里来 - err := config.Init(configPath) if err != nil { fmt.Println(err) diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index 764761a..8936eb8 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -27,21 +27,26 @@ import ( func init() { var configPath string - var httpAddr string - - cmd := &cobra.Command{ + var opt serveOptions + cmd := cobra.Command{ Use: "serve", - Short: "start jcs-pub hub service", + Short: "start serving hub", Run: func(cmd *cobra.Command, args []string) { - serve(configPath, httpAddr) + serve(configPath, opt) }, } - cmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file") - cmd.Flags().StringVarP(&httpAddr, "http", "", "127.0.0.1:8890", "http listen address") - RootCmd.AddCommand(cmd) + cmd.Flags().StringVarP(&configPath, "config", "c", "", "config file path") + cmd.Flags().BoolVarP(&opt.DisableHTTP, "no-http", "", false, "disable http server") + cmd.Flags().StringVarP(&opt.HTTPListenAddr, "http", "", "", "http listen address, will override config file") + RootCmd.AddCommand(&cmd) } -func serve(configPath string, httpAddr string) { +type serveOptions struct { + DisableHTTP bool + HTTPListenAddr string +} + +func serve(configPath string, opts serveOptions) { err := config.Init(configPath) if err != nil { fmt.Printf("init config failed, err: %s", err.Error()) @@ -67,12 +72,18 @@ func serve(configPath string, httpAddr string) { // 初始化执行器 worker := exec.NewWorker() - // 初始化HTTP服务 - httpSvr, err := http.NewServer(httpAddr, http.NewService(&worker, stgPool)) - if err != nil { - logger.Fatalf("new http server failed, err: %s", err.Error()) + // HTTP接口 + httpCfg := config.Cfg().HTTP + if !opts.DisableHTTP && httpCfg != nil && httpCfg.Enabled { + if opts.HTTPListenAddr != "" { + httpCfg.Listen = opts.HTTPListenAddr + } + } else { + httpCfg = nil } - go serveHTTP(httpSvr) + httpSvr := http.NewServer(httpCfg, http.NewService(&worker, stgPool)) + httpChan := httpSvr.Start() + defer httpSvr.Stop() // 启动访问统计服务 // acStat := accessstat.NewAccessStat(accessstat.Config{ @@ -90,7 +101,8 @@ func serve(configPath string, httpAddr string) { logger.Errorf("new sysevent publisher: %v", err) os.Exit(1) } - go servePublisher(evtPub) + evtPubChan := evtPub.Start() + defer evtPub.Stop() // 初始化定时任务执行器 tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool) @@ -103,11 +115,36 @@ func serve(configPath string, httpAddr string) { defer rpcSvr.Stop() /// 开始监听各个模块的事件 + evtPubEvt := evtPubChan.Receive() rpcEvt := rpcSvrChan.Receive() + httpEvt := httpChan.Receive() loop: for { select { + case e := <-evtPubEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive publisher event: %v", err) + break loop + } + + switch val := e.Value.(type) { + case sysevent.PublishError: + logger.Errorf("publishing event: %v", val) + + case sysevent.PublisherExited: + if val.Err != nil { + logger.Errorf("publisher exited with error: %v", val.Err) + } else { + logger.Info("publisher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent: %v", val) + } + evtPubEvt = evtPubChan.Receive() + case e := <-rpcEvt.Chan(): if e.Err != nil { logger.Errorf("receive rpc event: %v", e.Err) @@ -124,6 +161,19 @@ loop: break loop } rpcEvt = rpcSvrChan.Receive() + + case e := <-httpEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive http event: %v", err) + break loop + } + + switch e := e.Value.(type) { + case http.ExitEvent: + logger.Infof("http server exited, err: %v", e.Err) + break loop + } + httpEvt = httpChan.Receive() } } @@ -144,53 +194,3 @@ func downloadHubConfig() coormq.GetHubConfigResp { return *cfgResp } - -func servePublisher(evtPub *sysevent.Publisher) { - logger.Info("start serving sysevent publisher") - - ch := evtPub.Start() - -loop: - for { - val, err := ch.Receive().Wait(context.Background()) - if err != nil { - logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) - break - } - - switch val := val.(type) { - case sysevent.PublishError: - logger.Errorf("publishing event: %v", val) - - case sysevent.PublisherExited: - if val.Err != nil { - logger.Errorf("publisher exited with error: %v", val.Err) - } else { - logger.Info("publisher exited") - } - break loop - - case sysevent.OtherError: - logger.Errorf("sysevent: %v", val) - } - } - logger.Info("sysevent publisher stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func serveHTTP(server *http.Server) { - logger.Info("start serving http") - - err := server.Serve() - - if err != nil { - logger.Errorf("http stopped with error: %s", err.Error()) - } - - logger.Info("http stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index 6ec80ae..db51a77 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" + "gitlink.org.cn/cloudream/jcs-pub/hub/internal/http" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock" ) @@ -15,6 +16,7 @@ type Config struct { ID cortypes.HubID `json:"id"` Local stgglb.LocalMachineInfo `json:"local"` RPC rpc.Config `json:"rpc"` + HTTP *http.Config `json:"http"` CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"` Logger log.Config `json:"logger"` RabbitMQ mq.Config `json:"rabbitMQ"` diff --git a/hub/internal/http/config.go b/hub/internal/http/config.go new file mode 100644 index 0000000..26cc5ff --- /dev/null +++ b/hub/internal/http/config.go @@ -0,0 +1,6 @@ +package http + +type Config struct { + Enabled bool `json:"enabled"` + Listen string `json:"listen"` +} diff --git a/hub/internal/http/server.go b/hub/internal/http/server.go index db861f2..2af9f51 100644 --- a/hub/internal/http/server.go +++ b/hub/internal/http/server.go @@ -1,46 +1,76 @@ package http import ( + "context" + "net/http" + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/logger" hubapi "gitlink.org.cn/cloudream/jcs-pub/hub/sdk/api" ) +type ServerEventChan = async.UnboundChannel[ServerEvent] + +type ServerEvent interface { + IsServerEvent() bool +} + +type ExitEvent struct { + ServerEvent + Err error +} + type Server struct { - engine *gin.Engine - listenAddr string - svc *Service + cfg *Config + httpSrv *http.Server + svc *Service + eventChan *ServerEventChan } -func NewServer(listenAddr string, svc *Service) (*Server, error) { - engine := gin.New() - +func NewServer(cfg *Config, svc *Service) *Server { return &Server{ - engine: engine, - listenAddr: listenAddr, - svc: svc, - }, nil + cfg: cfg, + svc: svc, + eventChan: async.NewUnboundChannel[ServerEvent](), + } } -func (s *Server) Serve() error { - s.initRouters() +func (s *Server) Start() *ServerEventChan { + go func() { + if s.cfg == nil { + return + } - logger.Infof("start serving http at: %s", s.listenAddr) - err := s.engine.Run(s.listenAddr) + engine := gin.New() + s.httpSrv = &http.Server{ + Addr: s.cfg.Listen, + Handler: engine, + } - if err != nil { - logger.Infof("http stopped with error: %s", err.Error()) - return err + s.initRouters(engine) + + logger.Infof("start serving http at: %s", s.cfg.Listen) + + err := s.httpSrv.ListenAndServe() + s.eventChan.Send(ExitEvent{Err: err}) + }() + return s.eventChan +} + +func (s *Server) Stop() { + if s.httpSrv == nil { + s.eventChan.Send(ExitEvent{}) + return } - logger.Infof("http stopped") - return nil + s.httpSrv.Shutdown(context.Background()) } -func (s *Server) initRouters() { - s.engine.GET(hubapi.GetStreamPath, s.IOSvc().GetStream) - s.engine.POST(hubapi.SendStreamPath, s.IOSvc().SendStream) - s.engine.POST(hubapi.ExecuteIOPlanPath, s.IOSvc().ExecuteIOPlan) - s.engine.POST(hubapi.SendVarPath, s.IOSvc().SendVar) - s.engine.GET(hubapi.GetVarPath, s.IOSvc().GetVar) +func (s *Server) initRouters(engine *gin.Engine) { + engine.GET(hubapi.GetStreamPath, s.IOSvc().GetStream) + engine.POST(hubapi.SendStreamPath, s.IOSvc().SendStream) + engine.POST(hubapi.ExecuteIOPlanPath, s.IOSvc().ExecuteIOPlan) + engine.POST(hubapi.SendVarPath, s.IOSvc().SendVar) + engine.GET(hubapi.GetVarPath, s.IOSvc().GetVar) } diff --git a/hub/sdk/api/hub_io.go b/hub/sdk/api/hub_io.go index d013575..5b9ceeb 100644 --- a/hub/sdk/api/hub_io.go +++ b/hub/sdk/api/hub_io.go @@ -12,8 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) -// TODO2 重新梳理代码 - const GetStreamPath = "/hubIO/getStream" type GetStreamReq struct {