fix: ADD received message larger than max

This commit is contained in:
qiwang 2025-03-06 10:21:12 +08:00
parent da6ab53b96
commit 4ed1f08d31
9 changed files with 1368 additions and 1122 deletions

View File

@ -153,6 +153,7 @@ type (
GetResourceFlavorsResp_Status = modelarts.GetResourceFlavorsResp_Status
GetTrainingJobLogsPreviewReq = modelarts.GetTrainingJobLogsPreviewReq
GetTrainingJobLogsPreviewResp = modelarts.GetTrainingJobLogsPreviewResp
GetTrainingJobLogsStreamResp = modelarts.GetTrainingJobLogsStreamResp
GetVisualizationJobParam = modelarts.GetVisualizationJobParam
GetVisualizationJobReq = modelarts.GetVisualizationJobReq
GetVisualizationJobResp = modelarts.GetVisualizationJobResp

View File

@ -153,6 +153,7 @@ type (
GetResourceFlavorsResp_Status = modelarts.GetResourceFlavorsResp_Status
GetTrainingJobLogsPreviewReq = modelarts.GetTrainingJobLogsPreviewReq
GetTrainingJobLogsPreviewResp = modelarts.GetTrainingJobLogsPreviewResp
GetTrainingJobLogsStreamResp = modelarts.GetTrainingJobLogsStreamResp
GetVisualizationJobParam = modelarts.GetVisualizationJobParam
GetVisualizationJobReq = modelarts.GetVisualizationJobReq
GetVisualizationJobResp = modelarts.GetVisualizationJobResp
@ -393,6 +394,7 @@ type (
GetAiEnginesList(ctx context.Context, in *ListAiEnginesReq, opts ...grpc.CallOption) (*ListAiEnginesResp, error)
// 查询训练作业指定任务的日志(预览)
GetTrainingJobLogsPreview(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (*GetTrainingJobLogsPreviewResp, error)
GetTrainingJobLogStream(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (modelarts.ModelArtsService_GetTrainingJobLogStreamClient, error)
// export task
ExportTask(ctx context.Context, in *ExportTaskReq, opts ...grpc.CallOption) (*ExportTaskDataResp, error)
GetExportTasksOfDataset(ctx context.Context, in *GetExportTasksOfDatasetReq, opts ...grpc.CallOption) (*GetExportTasksOfDatasetResp, error)
@ -575,6 +577,11 @@ func (m *defaultModelArtsService) GetTrainingJobLogsPreview(ctx context.Context,
return client.GetTrainingJobLogsPreview(ctx, in, opts...)
}
func (m *defaultModelArtsService) GetTrainingJobLogStream(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (modelarts.ModelArtsService_GetTrainingJobLogStreamClient, error) {
client := modelarts.NewModelArtsServiceClient(m.cli.Conn())
return client.GetTrainingJobLogStream(ctx, in, opts...)
}
// export task
func (m *defaultModelArtsService) ExportTask(ctx context.Context, in *ExportTaskReq, opts ...grpc.CallOption) (*ExportTaskDataResp, error) {
client := modelarts.NewModelArtsServiceClient(m.cli.Conn())

View File

@ -46,6 +46,7 @@ func (l *GetTrainingJobLogsPreviewLogic) GetTrainingJobLogsPreview(in *modelarts
return nil, err
}
json.Unmarshal(*body, &resp)
println(resp.Content)
return &resp, nil
}

View File

@ -0,0 +1,60 @@
package modelartsservicelogic
import (
"context"
"fmt"
"gitlink.org.cn/JointCloud/pcm-modelarts/internal/util"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-modelarts/internal/svc"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
)
type GetTrainingJobLogStreamLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewGetTrainingJobLogStreamLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetTrainingJobLogStreamLogic {
return &GetTrainingJobLogStreamLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *GetTrainingJobLogStreamLogic) GetTrainingJobLogStream(in *modelarts.GetTrainingJobLogsPreviewReq, stream modelarts.ModelArtsService_GetTrainingJobLogStreamServer) error {
// todo: add your logic here and delete this line
var resp modelarts.GetTrainingJobLogsStreamResp
platform, err := util.GetModelArtsConfWithPlatform(in.Platform)
if err != nil {
return err
}
url := fmt.Sprintf("%sv2/%s/training-jobs/%s/tasks/%s/logs/preview",
platform.Endpoint,
platform.ProjectId,
in.TrainingJobId,
in.TaskId)
body, err := util.SendRequest("GET", url,
nil, in.Platform)
if err != nil {
logx.Errorf("查询训练作业指定任务的日志(预览)失败,请求url: %s\n err%v", url, err)
return err
}
json.Unmarshal(*body, &resp)
println(resp.Content)
logEntry := &modelarts.GetTrainingJobLogsStreamResp{
Message: resp.Content,
}
if err := stream.Send(logEntry); err != nil {
return err
}
return nil
}

View File

@ -148,6 +148,11 @@ func (s *ModelArtsServiceServer) GetTrainingJobLogsPreview(ctx context.Context,
return l.GetTrainingJobLogsPreview(in)
}
func (s *ModelArtsServiceServer) GetTrainingJobLogStream(in *modelarts.GetTrainingJobLogsPreviewReq, stream modelarts.ModelArtsService_GetTrainingJobLogStreamServer) error {
l := modelartsservicelogic.NewGetTrainingJobLogStreamLogic(stream.Context(), s.svcCtx)
return l.GetTrainingJobLogStream(in, stream)
}
// export task
func (s *ModelArtsServiceServer) ExportTask(ctx context.Context, in *modelarts.ExportTaskReq) (*modelarts.ExportTaskDataResp, error) {
l := modelartsservicelogic.NewExportTaskLogic(ctx, s.svcCtx)

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,7 @@ const (
ModelArtsService_GetTrainingJobFlavors_FullMethodName = "/modelarts.ModelArtsService/GetTrainingJobFlavors"
ModelArtsService_GetAiEnginesList_FullMethodName = "/modelarts.ModelArtsService/GetAiEnginesList"
ModelArtsService_GetTrainingJobLogsPreview_FullMethodName = "/modelarts.ModelArtsService/GetTrainingJobLogsPreview"
ModelArtsService_GetTrainingJobLogStream_FullMethodName = "/modelarts.ModelArtsService/GetTrainingJobLogStream"
ModelArtsService_ExportTask_FullMethodName = "/modelarts.ModelArtsService/ExportTask"
ModelArtsService_GetExportTasksOfDataset_FullMethodName = "/modelarts.ModelArtsService/GetExportTasksOfDataset"
ModelArtsService_GetExportTaskStatusOfDataset_FullMethodName = "/modelarts.ModelArtsService/GetExportTaskStatusOfDataset"
@ -121,6 +122,7 @@ type ModelArtsServiceClient interface {
GetAiEnginesList(ctx context.Context, in *ListAiEnginesReq, opts ...grpc.CallOption) (*ListAiEnginesResp, error)
// 查询训练作业指定任务的日志(预览)
GetTrainingJobLogsPreview(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (*GetTrainingJobLogsPreviewResp, error)
GetTrainingJobLogStream(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (ModelArtsService_GetTrainingJobLogStreamClient, error)
//export task
ExportTask(ctx context.Context, in *ExportTaskReq, opts ...grpc.CallOption) (*ExportTaskDataResp, error)
GetExportTasksOfDataset(ctx context.Context, in *GetExportTasksOfDatasetReq, opts ...grpc.CallOption) (*GetExportTasksOfDatasetResp, error)
@ -363,6 +365,38 @@ func (c *modelArtsServiceClient) GetTrainingJobLogsPreview(ctx context.Context,
return out, nil
}
func (c *modelArtsServiceClient) GetTrainingJobLogStream(ctx context.Context, in *GetTrainingJobLogsPreviewReq, opts ...grpc.CallOption) (ModelArtsService_GetTrainingJobLogStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &ModelArtsService_ServiceDesc.Streams[0], ModelArtsService_GetTrainingJobLogStream_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &modelArtsServiceGetTrainingJobLogStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type ModelArtsService_GetTrainingJobLogStreamClient interface {
Recv() (*GetTrainingJobLogsStreamResp, error)
grpc.ClientStream
}
type modelArtsServiceGetTrainingJobLogStreamClient struct {
grpc.ClientStream
}
func (x *modelArtsServiceGetTrainingJobLogStreamClient) Recv() (*GetTrainingJobLogsStreamResp, error) {
m := new(GetTrainingJobLogsStreamResp)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *modelArtsServiceClient) ExportTask(ctx context.Context, in *ExportTaskReq, opts ...grpc.CallOption) (*ExportTaskDataResp, error) {
out := new(ExportTaskDataResp)
err := c.cc.Invoke(ctx, ModelArtsService_ExportTask_FullMethodName, in, out, opts...)
@ -706,6 +740,7 @@ type ModelArtsServiceServer interface {
GetAiEnginesList(context.Context, *ListAiEnginesReq) (*ListAiEnginesResp, error)
// 查询训练作业指定任务的日志(预览)
GetTrainingJobLogsPreview(context.Context, *GetTrainingJobLogsPreviewReq) (*GetTrainingJobLogsPreviewResp, error)
GetTrainingJobLogStream(*GetTrainingJobLogsPreviewReq, ModelArtsService_GetTrainingJobLogStreamServer) error
//export task
ExportTask(context.Context, *ExportTaskReq) (*ExportTaskDataResp, error)
GetExportTasksOfDataset(context.Context, *GetExportTasksOfDatasetReq) (*GetExportTasksOfDatasetResp, error)
@ -819,6 +854,9 @@ func (UnimplementedModelArtsServiceServer) GetAiEnginesList(context.Context, *Li
func (UnimplementedModelArtsServiceServer) GetTrainingJobLogsPreview(context.Context, *GetTrainingJobLogsPreviewReq) (*GetTrainingJobLogsPreviewResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetTrainingJobLogsPreview not implemented")
}
func (UnimplementedModelArtsServiceServer) GetTrainingJobLogStream(*GetTrainingJobLogsPreviewReq, ModelArtsService_GetTrainingJobLogStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetTrainingJobLogStream not implemented")
}
func (UnimplementedModelArtsServiceServer) ExportTask(context.Context, *ExportTaskReq) (*ExportTaskDataResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method ExportTask not implemented")
}
@ -1309,6 +1347,27 @@ func _ModelArtsService_GetTrainingJobLogsPreview_Handler(srv interface{}, ctx co
return interceptor(ctx, in, info, handler)
}
func _ModelArtsService_GetTrainingJobLogStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetTrainingJobLogsPreviewReq)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ModelArtsServiceServer).GetTrainingJobLogStream(m, &modelArtsServiceGetTrainingJobLogStreamServer{stream})
}
type ModelArtsService_GetTrainingJobLogStreamServer interface {
Send(*GetTrainingJobLogsStreamResp) error
grpc.ServerStream
}
type modelArtsServiceGetTrainingJobLogStreamServer struct {
grpc.ServerStream
}
func (x *modelArtsServiceGetTrainingJobLogStreamServer) Send(m *GetTrainingJobLogsStreamResp) error {
return x.ServerStream.SendMsg(m)
}
func _ModelArtsService_ExportTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ExportTaskReq)
if err := dec(in); err != nil {
@ -2127,7 +2186,13 @@ var ModelArtsService_ServiceDesc = grpc.ServiceDesc{
Handler: _ModelArtsService_GetFile_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetTrainingJobLogStream",
Handler: _ModelArtsService_GetTrainingJobLogStream_Handler,
ServerStreams: true,
},
},
Metadata: "pcm-modelarts.proto",
}

View File

@ -2547,6 +2547,13 @@ message GetTrainingJobLogsPreviewResp {
int32 full_size = 3; //
}
message GetTrainingJobLogsStreamResp {
string content = 1; //n兆n兆n兆的日志>2022/03/01 00:00:00 (GMT+08:00)contextcontent
int32 current_size =2; //5
int32 full_size = 3; //
string message =4;
}
/******************File first*************************/
message GetFileReq{
string path = 1;
@ -2604,7 +2611,7 @@ service ModelArtsService {
rpc GetAiEnginesList(ListAiEnginesReq) returns (ListAiEnginesResp);
//
rpc GetTrainingJobLogsPreview(GetTrainingJobLogsPreviewReq) returns(GetTrainingJobLogsPreviewResp);
rpc GetTrainingJobLogStream(GetTrainingJobLogsPreviewReq) returns(stream GetTrainingJobLogsStreamResp);
//export task
rpc ExportTask(ExportTaskReq) returns (ExportTaskDataResp);
rpc GetExportTasksOfDataset(GetExportTasksOfDatasetReq) returns (GetExportTasksOfDatasetResp);

View File

@ -33,8 +33,6 @@ func startGoZeroServer(wg *sync.WaitGroup) {
ctx := svc.NewServiceContext(util.C)
s := zrpc.MustNewServer(util.C.RpcServerConf, func(grpcServer *grpc.Server) {
grpc.MaxRecvMsgSize(10 * 1024 * 1024) // 设置最大接收消息大小为10MB
grpc.MaxSendMsgSize(10 * 1024 * 1024) // 设置最大发送消息大小为 10MB
modelarts.RegisterModelArtsServiceServer(grpcServer, modelartsserviceServer.NewModelArtsServiceServer(ctx))
modelarts.RegisterImagesServiceServer(grpcServer, imagesserviceServer.NewImagesServiceServer(ctx))
if util.C.Mode == service.DevMode || util.C.Mode == service.TestMode {