MaxTask 并行度。每个实例允许多少个任务并行处理,多执行端时叠加。多执行端均分的效果并不好,反而导致启动时并行任务数过大。

This commit is contained in:
智能大石头 2025-07-29 13:34:24 +08:00
parent 4dce558592
commit f8c93898fc
4 changed files with 19 additions and 18 deletions

View File

@ -889,7 +889,7 @@
<td></td>
<td></td>
<td>N</td>
<td>一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</td>
<td>每个实例允许多少个任务并行处理,多执行端时叠加</td>
</tr>
<tr>

View File

@ -151,12 +151,12 @@ public partial class Job
public Int32 Offset { get => _Offset; set { if (OnPropertyChanging("Offset", value)) { _Offset = value; OnPropertyChanged("Offset"); } } }
private Int32 _MaxTask;
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
[Category("控制参数")]
[DisplayName("并行度")]
[Description("并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度")]
[Description("并行度。每个实例允许多少个任务并行处理,多执行端时叠加")]
[DataObjectField(false, false, false, 0)]
[BindColumn("MaxTask", "并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度", "")]
[BindColumn("MaxTask", "并行度。每个实例允许多少个任务并行处理,多执行端时叠加", "")]
public Int32 MaxTask { get => _MaxTask; set { if (OnPropertyChanging("MaxTask", value)) { _MaxTask = value; OnPropertyChanged("MaxTask"); } } }
private Int32 _MaxError;
@ -553,7 +553,7 @@ public partial class Job
/// <summary>偏移。距离AntServer当前时间的秒数避免因服务器之间的时间误差而错过部分数据秒</summary>
public static readonly Field Offset = FindByName("Offset");
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
public static readonly Field MaxTask = FindByName("MaxTask");
/// <summary>最大错误。连续错误达到最大错误数时停止</summary>
@ -685,7 +685,7 @@ public partial class Job
/// <summary>偏移。距离AntServer当前时间的秒数避免因服务器之间的时间误差而错过部分数据秒</summary>
public const String Offset = "Offset";
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
public const String MaxTask = "MaxTask";
/// <summary>最大错误。连续错误达到最大错误数时停止</summary>

View File

@ -135,7 +135,7 @@
<Column Name="Step" DataType="Int32" ItemType="TimeSpan" Description="步进。切分任务的时间区间,秒" />
<Column Name="BatchSize" DataType="Int32" Description="批大小。数据调度每次抽取数据的分页大小,或消息调度每次处理的消息数,定时调度不适用" />
<Column Name="Offset" DataType="Int32" ItemType="TimeSpan" Description="偏移。距离AntServer当前时间的秒数避免因服务器之间的时间误差而错过部分数据秒" />
<Column Name="MaxTask" DataType="Int32" Description="并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度" Category="控制参数" />
<Column Name="MaxTask" DataType="Int32" Description="并行度。每个实例允许多少个任务并行处理,多执行端时叠加" Category="控制参数" />
<Column Name="MaxError" DataType="Int32" Description="最大错误。连续错误达到最大错误数时停止" Category="控制参数" />
<Column Name="MaxRetry" DataType="Int32" Description="最大重试。默认10次超过该次数后将不再重试" Category="控制参数" />
<Column Name="MaxTime" DataType="Int32" ItemType="TimeSpan" Description="最大执行时间。默认600秒超过该时间则认为执行器故障将会把该任务分配给其它执行器" Category="控制参数" />

View File

@ -304,17 +304,18 @@ public class Scheduler : DisposeBase
}
// 如果正在处理任务数没达到最大并行度,则继续安排任务
var max = job.MaxTask;
if (prv is NetworkJobProvider nprv)
{
// 如果是网络提供者,则根据在线节点数平分并行度
var ps = nprv.Peers;
if (ps != null && ps.Length > 0)
{
max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
}
}
var count = max - handler.Busy;
//var max = job.MaxTask;
//if (prv is NetworkJobProvider nprv)
//{
// // 如果是网络提供者,则根据在线节点数平分并行度
// var ps = nprv.Peers;
// if (ps != null && ps.Length > 0)
// {
// max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
// }
//}
//var count = max - handler.Busy;
var count = job.MaxTask - handler.Busy;
if (count > 0)
{
// 循环申请任务,喂饱处理器