Compare commits
3 Commits
257f5be922
...
0d87aa9963
Author | SHA1 | Date |
---|---|---|
|
0d87aa9963 | |
|
f8c93898fc | |
|
4dce558592 |
|
@ -889,7 +889,7 @@
|
|||
<td></td>
|
||||
<td></td>
|
||||
<td>N</td>
|
||||
<td>一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</td>
|
||||
<td>每个实例允许多少个任务并行处理,多执行端时叠加</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
|
|
|
@ -151,12 +151,12 @@ public partial class Job
|
|||
public Int32 Offset { get => _Offset; set { if (OnPropertyChanging("Offset", value)) { _Offset = value; OnPropertyChanged("Offset"); } } }
|
||||
|
||||
private Int32 _MaxTask;
|
||||
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
|
||||
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
|
||||
[Category("控制参数")]
|
||||
[DisplayName("并行度")]
|
||||
[Description("并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度")]
|
||||
[Description("并行度。每个实例允许多少个任务并行处理,多执行端时叠加")]
|
||||
[DataObjectField(false, false, false, 0)]
|
||||
[BindColumn("MaxTask", "并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度", "")]
|
||||
[BindColumn("MaxTask", "并行度。每个实例允许多少个任务并行处理,多执行端时叠加", "")]
|
||||
public Int32 MaxTask { get => _MaxTask; set { if (OnPropertyChanging("MaxTask", value)) { _MaxTask = value; OnPropertyChanged("MaxTask"); } } }
|
||||
|
||||
private Int32 _MaxError;
|
||||
|
@ -553,7 +553,7 @@ public partial class Job
|
|||
/// <summary>偏移。距离AntServer当前时间的秒数,避免因服务器之间的时间误差而错过部分数据,秒</summary>
|
||||
public static readonly Field Offset = FindByName("Offset");
|
||||
|
||||
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
|
||||
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
|
||||
public static readonly Field MaxTask = FindByName("MaxTask");
|
||||
|
||||
/// <summary>最大错误。连续错误达到最大错误数时停止</summary>
|
||||
|
@ -685,7 +685,7 @@ public partial class Job
|
|||
/// <summary>偏移。距离AntServer当前时间的秒数,避免因服务器之间的时间误差而错过部分数据,秒</summary>
|
||||
public const String Offset = "Offset";
|
||||
|
||||
/// <summary>并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度</summary>
|
||||
/// <summary>并行度。每个实例允许多少个任务并行处理,多执行端时叠加</summary>
|
||||
public const String MaxTask = "MaxTask";
|
||||
|
||||
/// <summary>最大错误。连续错误达到最大错误数时停止</summary>
|
||||
|
|
|
@ -135,7 +135,7 @@
|
|||
<Column Name="Step" DataType="Int32" ItemType="TimeSpan" Description="步进。切分任务的时间区间,秒" />
|
||||
<Column Name="BatchSize" DataType="Int32" Description="批大小。数据调度每次抽取数据的分页大小,或消息调度每次处理的消息数,定时调度不适用" />
|
||||
<Column Name="Offset" DataType="Int32" ItemType="TimeSpan" Description="偏移。距离AntServer当前时间的秒数,避免因服务器之间的时间误差而错过部分数据,秒" />
|
||||
<Column Name="MaxTask" DataType="Int32" Description="并行度。一共允许多少个任务并行处理,多执行端时平均分配,确保该作业整体并行度" Category="控制参数" />
|
||||
<Column Name="MaxTask" DataType="Int32" Description="并行度。每个实例允许多少个任务并行处理,多执行端时叠加" Category="控制参数" />
|
||||
<Column Name="MaxError" DataType="Int32" Description="最大错误。连续错误达到最大错误数时停止" Category="控制参数" />
|
||||
<Column Name="MaxRetry" DataType="Int32" Description="最大重试。默认10次,超过该次数后将不再重试" Category="控制参数" />
|
||||
<Column Name="MaxTime" DataType="Int32" ItemType="TimeSpan" Description="最大执行时间。默认600秒,超过该时间则认为执行器故障,将会把该任务分配给其它执行器" Category="控制参数" />
|
||||
|
|
|
@ -464,7 +464,8 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
|
|||
{
|
||||
task.Times++;
|
||||
task.Error++;
|
||||
//ji.Message = err.Message;
|
||||
|
||||
job.Error++;
|
||||
|
||||
SetJobError(job, task, online.UpdateIP);
|
||||
|
||||
|
@ -512,7 +513,6 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
|
|||
|
||||
job.Total += task.Total;
|
||||
job.Success += task.Success;
|
||||
//job.Error += task.Error;
|
||||
job.Times++;
|
||||
if (task.Status == JobStatus.错误) job.Error++;
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
|
|||
if (!Active) return false;
|
||||
|
||||
using var span = Tracer?.NewSpan($"job:{Name}:Stop", reason);
|
||||
WriteLog("停止工作");
|
||||
WriteLog("停止工作 {0}", reason);
|
||||
|
||||
Active = false;
|
||||
|
||||
|
|
|
@ -304,17 +304,18 @@ public class Scheduler : DisposeBase
|
|||
}
|
||||
|
||||
// 如果正在处理任务数没达到最大并行度,则继续安排任务
|
||||
var max = job.MaxTask;
|
||||
if (prv is NetworkJobProvider nprv)
|
||||
{
|
||||
// 如果是网络提供者,则根据在线节点数平分并行度
|
||||
var ps = nprv.Peers;
|
||||
if (ps != null && ps.Length > 0)
|
||||
{
|
||||
max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
|
||||
}
|
||||
}
|
||||
var count = max - handler.Busy;
|
||||
//var max = job.MaxTask;
|
||||
//if (prv is NetworkJobProvider nprv)
|
||||
//{
|
||||
// // 如果是网络提供者,则根据在线节点数平分并行度
|
||||
// var ps = nprv.Peers;
|
||||
// if (ps != null && ps.Length > 0)
|
||||
// {
|
||||
// max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
|
||||
// }
|
||||
//}
|
||||
//var count = max - handler.Busy;
|
||||
var count = job.MaxTask - handler.Busy;
|
||||
if (count > 0)
|
||||
{
|
||||
// 循环申请任务,喂饱处理器
|
||||
|
|
Loading…
Reference in New Issue