改进ITask,统一指定数据时间Time

This commit is contained in:
大石头 2024-04-22 19:27:37 +08:00
parent 723f05c7af
commit aadcc1a08e
12 changed files with 90 additions and 96 deletions

View File

@ -199,6 +199,7 @@ public partial class JobTask : EntityBase<JobTask>
{
ID = ID,
Time = Start,
Start = Start,
End = End,
//Offset = Offset,

View File

@ -147,7 +147,7 @@ public abstract class DataHandler : Handler
if (task == null) throw new ArgumentNullException(nameof(task), "没有设置数据抽取配置");
// 验证时间段
var start = task.Start;
var start = task.Time;
var end = task.End;
// 区间无效

View File

@ -28,7 +28,7 @@ public class SqlHandler : Handler
{
//var sqls = ctx.Task.Data as String;
var sqls = Job.Data;
sqls = TemplateHelper.Build(sqls, ctx.Task.Start, ctx.Task.End);
sqls = TemplateHelper.Build(sqls, ctx.Task.Time, ctx.Task.End);
// 向调度中心返回解析后的Sql语句
ctx.Remark = sqls;

View File

@ -1,23 +1,20 @@
using System;
namespace AntJob.Data;
namespace AntJob.Data
/// <summary>任务参数</summary>
public interface ITask
{
/// <summary>任务参数</summary>
public interface ITask
{
/// <summary>任务项编号</summary>
Int32 ID { get; set; }
/// <summary>任务项编号</summary>
Int32 ID { get; set; }
/// <summary>开始。大于等于</summary>
DateTime Start { get; set; }
/// <summary>数据时间。定时调度的执行时间点,或者数据调度的开始时间</summary>
DateTime Time { get; set; }
/// <summary>结束。小于</summary>
DateTime End { get; set; }
/// <summary>结束。小于</summary>
DateTime End { get; set; }
/// <summary>批大小</summary>
Int32 BatchSize { get; set; }
/// <summary>批大小</summary>
Int32 BatchSize { get; set; }
/// <summary>数据</summary>
String Data { get; set; }
}
/// <summary>数据</summary>
String Data { get; set; }
}

View File

@ -1,17 +1,14 @@
using System;
namespace AntJob.Data;
namespace AntJob.Data
/// <summary>任务结果</summary>
public interface ITaskResult
{
/// <summary>任务结果</summary>
public interface ITaskResult
{
/// <summary>任务项编号</summary>
Int32 ID { get; set; }
/// <summary>任务项编号</summary>
Int32 ID { get; set; }
/// <summary>状态</summary>
JobStatus Status { get; set; }
/// <summary>状态</summary>
JobStatus Status { get; set; }
/// <summary>消息内容。异常信息或其它任务消息</summary>
String Message { get; set; }
}
/// <summary>消息内容。异常信息或其它任务消息</summary>
String Message { get; set; }
}

View File

@ -1,30 +1,29 @@
using System.ComponentModel;
namespace AntJob.Data
namespace AntJob.Data;
/// <summary>作业模式</summary>
/// <remarks>定时调度只要达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑</remarks>
//[Description("作业模式")]
public enum JobModes
{
/// <summary>作业模式</summary>
/// <remarks>定时调度只要达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑</remarks>
//[Description("作业模式")]
public enum JobModes
{
/// <summary>数据调度</summary>
[Description("数据调度")]
Data = 1,
/// <summary>数据调度</summary>
[Description("数据调度")]
Data = 1,
/// <summary>定时调度</summary>
[Description("定时调度")]
Time = 2,
/// <summary>定时调度</summary>
[Description("定时调度")]
Time = 2,
/// <summary>消息调度</summary>
[Description("消息调度")]
Message = 3,
/// <summary>消息调度</summary>
[Description("消息调度")]
Message = 3,
///// <summary>C#调度</summary>
//[Description("C#调度")]
//CSharp = 4,
/// <summary>C#调度</summary>
[Description("C#调度")]
CSharp = 4,
///// <summary>SQL调度</summary>
//[Description("SQL调度")]
//Sql = 5,
}
/// <summary>SQL调度</summary>
[Description("SQL调度")]
Sql = 5,
}

View File

@ -1,25 +1,25 @@
using System;
namespace AntJob.Data;
namespace AntJob.Data
/// <summary>任务模型</summary>
public partial class TaskModel : ITask
{
/// <summary>任务模型</summary>
public partial class TaskModel : ITask
{
#region
/// <summary>编号</summary>
public Int32 ID { get; set; }
#region
/// <summary>编号</summary>
public Int32 ID { get; set; }
/// <summary>开始。大于等于</summary>
public DateTime Start { get; set; }
/// <summary>数据时间。定时调度的执行时间点,或者数据调度的开始时间</summary>
public DateTime Time { get; set; }
/// <summary>结束。小于,不等于</summary>
public DateTime End { get; set; }
/// <summary>开始。大于等于</summary>
public DateTime Start { get; set; }
/// <summary>批大小</summary>
public Int32 BatchSize { get; set; }
/// <summary>结束。小于,不等于</summary>
public DateTime End { get; set; }
/// <summary>数据</summary>
public String Data { get; set; }
#endregion
}
/// <summary>批大小</summary>
public Int32 BatchSize { get; set; }
/// <summary>数据</summary>
public String Data { get; set; }
#endregion
}

View File

@ -146,7 +146,7 @@ public abstract class Handler : IExtend
};
// APM埋点
var span = Schedule.Tracer?.NewSpan($"job:{Name}", task.Data ?? $"({task.Start.ToFullString()}, {task.End.ToFullString()})");
var span = Schedule.Tracer?.NewSpan($"job:{Name}", task.Data ?? $"({task.Time.ToFullString()}, {task.End.ToFullString()})");
ctx.Remark = span?.ToString();
var sw = Stopwatch.StartNew();

View File

@ -35,9 +35,6 @@ public class JobContext : IExtend
/// <summary>最后处理键值。由业务决定,便于分析问题</summary>
public String Key { get; set; }
///// <summary>当前处理对象</summary>
//public Object Entity { get; set; }
/// <summary>处理异常</summary>
public Exception Error { get; set; }

View File

@ -130,7 +130,7 @@ public class FileJobProvider : JobProvider
// 切分新任务
var set = new TaskModel
{
Start = start,
Time = start,
End = end,
//Step = job.Step,
//Offset = job.Offset,
@ -163,13 +163,16 @@ public class FileJobProvider : JobProvider
if (ctx.Total > 0)
{
var set = ctx.Task;
var time = set.Time;
var end = set.End;
var n = 0;
if (set.End > set.Start) n = (Int32)(set.End - set.Start).TotalSeconds;
var msg = $"{ctx.Handler.Name} 处理{ctx.Total:n0} 行,区间({set.Start} + {n}, {set.End:HH:mm:ss}";
if (end > time) n = (Int32)(end - time).TotalSeconds;
var msg = $"{ctx.Handler.Name} 处理{ctx.Total:n0} 行,区间({time} + {n}, {end:HH:mm:ss}";
if (ctx.Handler.Mode == JobModes.Time)
msg += $",耗时{ctx.Cost:n0}ms";
else
msg += $",速度{ctx.Speed:n0}tps耗时{ctx.Cost:n0}ms";
XTrace.WriteLine(msg);
}
}

View File

@ -27,6 +27,7 @@ v4版本亮点
2. []增强定时调度支持指定Cron表达式逐步替代Start+Step的恒定间隔定时调度
3. []提前生成任务,提前下发给执行器,到时间后马上执行,提高任务执行时间精度
4. []支持任务主动延迟,任务在执行中发现数据条件未满足时,可以向调度中心请求延迟一段时间后再执行,增加执行次数但不增加错误次数
5. []扩充调度模式常态化部署AntAgent正式把Sql调度和C#调度加入主线,将来增加数据抽取和数据推送等多种调度模式
# 功能特点
AntJob的核心是**蚂蚁算法****把任意大数据拆分成为小块,采用蚂蚁搬家策略计算每一块!**

View File

@ -2,28 +2,27 @@
using System.ComponentModel;
using AntJob;
namespace HisAgent
namespace HisAgent;
[DisplayName("定时欢迎")]
[Description("简单的定时任务")]
internal class HelloJob : Handler
{
[DisplayName("定时欢迎")]
[Description("简单的定时任务")]
internal class HelloJob : Handler
public HelloJob()
{
public HelloJob()
{
// 今天零点开始每10秒一次
var job = Job;
job.Start = DateTime.Today;
job.Step = 10;
}
// 今天零点开始每10秒一次
var job = Job;
job.Start = DateTime.Today;
job.Step = 10;
}
protected override Int32 Execute(JobContext ctx)
{
// 当前任务时间
var time = ctx.Task.Start;
WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);
protected override Int32 Execute(JobContext ctx)
{
// 当前任务时间
var time = ctx.Task.Time;
WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);
// 成功处理数据量
return 1;
}
// 成功处理数据量
return 1;
}
}