定时调度不再依赖Step步进

This commit is contained in:
大石头 2024-06-24 17:50:13 +08:00
parent c9075409c4
commit c43ab732c8
5 changed files with 77 additions and 63 deletions

View File

@ -1318,13 +1318,13 @@
<tr>
<td>MsgCount</td>
<td>费消</td>
<td>消息</td>
<td>Int32</td>
<td></td>
<td></td>
<td></td>
<td>N</td>
<td></td>
<td>消费消息数</td>
</tr>
<tr>

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using NewLife;
using NewLife.Data;
using NewLife.Threading;
using XCode;
namespace AntJob.Data.Entity;
@ -37,7 +38,8 @@ public partial class Job : EntityBase<Job>
// throw new ArgumentNullException(nameof(Data), $"{Mode}调度模式要求设置Data模板");
// 参数默认值
if (Step == 0) Step = 5;
var step = Step;
if (step == 0) step = Step = 5;
if (MaxRetain == 0) MaxRetain = 3;
if (MaxIdle == 0) MaxIdle = GetDefaultIdle();
@ -56,14 +58,14 @@ public partial class Job : EntityBase<Job>
// 定时任务自动生成Cron
if (Mode == JobModes.Time && Cron.IsNullOrEmpty())
{
if (Step < 60)
Cron = $"0 */{Step} * * * ?";
else if (Step % 86400 == 0 && Step / 86400 < 30)
Cron = $"0 0 0 0/{Step / 86400} * ?";
else if (Step % 3600 == 0 && Step / 3600 < 24)
Cron = $"0 0 0/{Step / 3600} * * ?";
else if (Step % 60 == 0 && Step / 60 < 60)
Cron = $"0 0/{Step / 60} * * * ?";
if (step < 60)
Cron = $"0 */{step} * * * ?";
else if (step % 86400 == 0 && step / 86400 < 30)
Cron = $"0 0 0 0/{step / 86400} * ?";
else if (step % 3600 == 0 && step / 3600 < 24)
Cron = $"0 0 0/{step / 3600} * * ?";
else if (step % 60 == 0 && step / 60 < 60)
Cron = $"0 0/{step / 60} * * * ?";
}
var app = App;
@ -222,6 +224,37 @@ public partial class Job : EntityBase<Job>
return false;
}
/// <summary>获取下一次执行时间</summary>
/// <returns></returns>
public DateTime GetNext()
{
var step = Step;
if (step <= 0) step = 30;
switch (Mode)
{
case JobModes.Data:
break;
case JobModes.Time:
if (!Cron.IsNullOrEmpty())
{
var cron = new Cron(Cron);
var time = DataTime.Year > 2000 ? DataTime : DateTime.Now;
return cron.GetNext(time);
}
else
{
return DataTime.AddSeconds(step);
}
case JobModes.Message:
break;
default:
break;
}
return DateTime.MinValue;
}
/// <summary>重置任务,让它从新开始工作</summary>
/// <param name="days">重置到多少天之前</param>
/// <param name="stime">开始时间优先级低于days</param>

View File

@ -145,11 +145,11 @@ public partial class JobTask
public JobStatus Status { get => _Status; set { if (OnPropertyChanging("Status", value)) { _Status = value; OnPropertyChanged("Status"); } } }
private Int32 _MsgCount;
/// <summary>消费消息数</summary>
[DisplayName("消费消")]
[Description("消费消息数")]
/// <summary>消息。消费消息数</summary>
[DisplayName("消息")]
[Description("消息。消费消息数")]
[DataObjectField(false, false, false, 0)]
[BindColumn("MsgCount", "消费消息数", "")]
[BindColumn("MsgCount", "消息。消费消息数", "")]
public Int32 MsgCount { get => _MsgCount; set { if (OnPropertyChanging("MsgCount", value)) { _MsgCount = value; OnPropertyChanged("MsgCount"); } } }
private String _Server;
@ -378,7 +378,7 @@ public partial class JobTask
/// <summary>状态</summary>
public static readonly Field Status = FindByName("Status");
/// <summary>消费消息数</summary>
/// <summary>消息。消费消息数</summary>
public static readonly Field MsgCount = FindByName("MsgCount");
/// <summary>服务器</summary>
@ -462,7 +462,7 @@ public partial class JobTask
/// <summary>状态</summary>
public const String Status = "Status";
/// <summary>消费消息数</summary>
/// <summary>消息。消费消息数</summary>
public const String MsgCount = "MsgCount";
/// <summary>服务器</summary>

View File

@ -180,7 +180,7 @@
<Column Name="Cost" DataType="Int32" Description="耗时。秒,执行端计算的执行时间" />
<Column Name="FullCost" DataType="Int32" Description="全部耗时。秒,从任务发放到执行完成的时间" />
<Column Name="Status" DataType="Int32" Description="状态" Type="JobStatus" />
<Column Name="MsgCount" DataType="Int32" Description="消费消息数" />
<Column Name="MsgCount" DataType="Int32" Description="消息。消费消息数" />
<Column Name="Server" DataType="String" Description="服务器" />
<Column Name="ProcessID" DataType="Int32" Description="进程" />
<Column Name="Key" DataType="String" Description="最后键。Handler内记录作为样本的数据" />

View File

@ -82,14 +82,8 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
// 添加定时作业时,计算下一次执行时间
if (job.ID == 0 && job.Mode == JobModes.Time)
{
if (job.Step <= 0 && job.Cron.IsNullOrEmpty()) continue;
// 计算下一次执行时间
var cron = new Cron(job.Cron);
var time = job.DataTime.Year > 2000 ? job.DataTime : DateTime.Now;
job.DataTime = cron.GetNext(time);
if (job.Step <= 0) job.Step = 30;
var next = job.GetNext();
if (next.Year > 2000) job.DataTime = next;
}
if (job.Save() != 0)
@ -130,7 +124,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
job = Job.FindByAppIDAndName(app.ID, jobName);
if (job == null) throw new XException($"应用[{app.ID}/{app.Name}]下未找到作业[{jobName}]");
if (job.Step == 0 || job.DataTime.Year <= 2000) throw new XException("作业[{0}/{1}]未设置数据时间或步进", job.ID, job.Name);
if (job.DataTime.Year <= 2000) throw new XException("作业[{0}/{1}]未设置数据时间", job.ID, job.Name);
var online = _appService.GetOnline(app, ip);
@ -170,7 +164,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
default:
{
// 如果能够切片,则查询数据库后进入,避免缓存导致重复
if (TrySplit(job, job.DataTime, job.Step, out var end))
if (TrySplit(job, job.DataTime, out var end))
{
// 申请任务前,不能再查数据库,那样子会导致多线程脏读,从而出现多客户端分到相同任务的情况
//jb = Job.FindByKey(jb.ID);
@ -181,6 +175,15 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
}
}
if (list.Count > 0)
{
job.LastStatus = JobStatus.;
job.LastTime = DateTime.Now;
job.UpdateTime = DateTime.Now;
job.Save();
}
// 记录状态
online.Tasks += list.Count;
online.SaveAsync();
@ -450,6 +453,10 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
task.Update();
job.LastStatus = result.Status;
job.LastTime = DateTime.Now;
job.SaveAsync();
return true;
}
@ -528,7 +535,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <summary>用于表示切片批次的序号</summary>
private Int32 _idxBatch;
/// <summary>申请任务分片</summary>
/// <summary>申请任务分片(时间调度&数据调度)</summary>
/// <param name="server">申请任务的服务端</param>
/// <param name="ip">申请任务的IP</param>
/// <param name="pid">申请任务的服务端进程ID</param>
@ -541,9 +548,6 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
if (!job.Enable) return list;
var step = job.Step;
if (step <= 0) step = 30;
using var span = _tracer?.NewSpan(nameof(Acquire), new { job.Name, server, ip, pid, count });
using var ts = Job.Meta.CreateTrans();
@ -552,7 +556,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
{
var end = DateTime.MinValue;
if (job.Mode == JobModes.Time && !TrySplitTime(job, true, out end) ||
job.Mode != JobModes.Time && !TrySplit(job, start, step, out end))
job.Mode != JobModes.Time && !TrySplit(job, start, out end))
break;
// 创建新的任务
@ -596,18 +600,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
start = end;
}
if (list.Count > 0)
{
// 任务需要ID不能批量插入优化
//list.Insert(null);
job.LastStatus = JobStatus.;
job.LastTime = DateTime.Now;
job.UpdateTime = DateTime.Now;
job.Save();
ts.Commit();
}
ts.Commit();
// 记录任务数
span?.AppendTag(null, list.Count);
@ -615,7 +608,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
return list;
}
/// <summary>尝试分割时间片</summary>
/// <summary>尝试分割时间片(时间调度)</summary>
/// <param name="job"></param>
/// <param name="end"></param>
/// <returns></returns>
@ -637,32 +630,19 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
// 时间区间判断
if (end.Year > 2000 && start >= end) return false;
// 计算下一次执行时间
if (modify)
{
// 计算下一次执行时间
if (!job.Cron.IsNullOrEmpty())
{
var cron = new Cron(job.Cron);
var time = job.DataTime.Year > 2000 ? job.DataTime : DateTime.Now;
end = job.DataTime = cron.GetNext(time);
}
else
{
var step = job.Step;
if (step <= 0) step = 30;
end = job.DataTime = job.DataTime.AddSeconds(step);
}
}
end = job.DataTime = job.GetNext();
return true;
}
/// <summary>尝试分割时间片</summary>
/// <summary>尝试分割时间片(数据调度)</summary>
/// <param name="start"></param>
/// <param name="step"></param>
/// <param name="end"></param>
/// <returns></returns>
public Boolean TrySplit(Job job, DateTime start, Int32 step, out DateTime end)
public Boolean TrySplit(Job job, DateTime start, out DateTime end)
{
// 当前时间减去偏移量,作为当前时间。数据抽取不许超过该时间
var now = DateTime.Now.AddSeconds(-job.Offset);
@ -674,6 +654,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
// 开始时间和结束时间是否越界
if (start >= now) return false;
var step = job.Step;
if (step <= 0) step = 30;
// 必须严格要求按照步进大小分片除非有合适的End