[refactor] 重构IJobProvider,跟调度平台通信全部使用异步处理

This commit is contained in:
大石头 2025-07-16 16:04:07 +08:00
parent 1055413502
commit 4c6bd4c35d
16 changed files with 174 additions and 129 deletions

View File

@ -7,7 +7,7 @@
<Description>调度中心下发C#或Sql给蚂蚁代理执行</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2025</Copyright>
<VersionPrefix>4.1</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -5,7 +5,7 @@
<Description>蚂蚁调度系统数据库结构</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2025</Copyright>
<VersionPrefix>4.1</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -6,7 +6,7 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累。</Description>
<Company>新生命开发团队</Company>
<Copyright>©2002-2025 NewLife</Copyright>
<VersionPrefix>4.2</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -98,12 +98,18 @@ public abstract class DataHandler : Handler
public String Selects { get; set; }
/// <summary>每次都只查第一页</summary>
public bool KeepFirstPage { get; set; }
public Boolean KeepFirstPage { get; set; }
#endregion
#region
/// <summary>实例化数据库处理器</summary>
public DataHandler() => Mode = JobModes.Data;
public DataHandler()
{
Mode = JobModes.Data;
// 逼着Init中查询最小时间
Job.DataTime = DateTime.MinValue;
}
#endregion
#region
@ -124,7 +130,7 @@ public abstract class DataHandler : Handler
var job = Job;
if (job.Step == 0) job.Step = 30;
//todo 如果DataTime为空则自动获取最小时间并设置到DataTime以减轻平台设置负担
// 如果DataTime为空则自动获取最小时间并设置到DataTime以减轻平台设置负担
// 获取最小数据时间
if (job.DataTime.Year < 2000)
@ -160,7 +166,7 @@ public abstract class DataHandler : Handler
#endregion
#region
/// <summary>处理任务。内部分批处理</summary>
/// <summary>处理任务。由Process执行内部分批Fetch数据并调用Execute</summary>
/// <param name="ctx"></param>
protected override void OnProcess(JobContext ctx)
{
@ -191,8 +197,9 @@ public abstract class DataHandler : Handler
// 批量处理
ctx.Success += Execute(ctx);
// 报告进度
// 较慢的作业,及时报告进度
ctx.Status = JobStatus.;
if (Speed < 10) Report(ctx, JobStatus.);
// 不满一批,结束
if (list != null && list.Count < ctx.Task.BatchSize) break;
@ -234,7 +241,7 @@ public abstract class DataHandler : Handler
if (!Where.IsNullOrEmpty()) exp &= Where;
var list = Factory.FindAll(exp, OrderBy, Selects, this.KeepFirstPage ? 0 : row, task.BatchSize);
var list = Factory.FindAll(exp, OrderBy, Selects, KeepFirstPage ? 0 : row, task.BatchSize);
// 取到数据,需要滑动窗口
if (list.Count > 0) row += list.Count;
@ -242,7 +249,7 @@ public abstract class DataHandler : Handler
return list;
}
/// <summary>处理一批数据</summary>
/// <summary>处理一批数据。由OnProcess执行</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
public override Int32 Execute(JobContext ctx)
@ -256,7 +263,7 @@ public abstract class DataHandler : Handler
return count;
}
/// <summary>处理一个数据对象</summary>
/// <summary>处理一个数据对象。由Execute执行每个实体对象调用一次</summary>
/// <param name="ctx">上下文</param>
/// <param name="entity"></param>
/// <returns></returns>

View File

@ -7,7 +7,7 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2025</Copyright>
<VersionPrefix>4.1</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -6,7 +6,7 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2025</Copyright>
<VersionPrefix>4.1</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -6,7 +6,7 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累。</Description>
<Company>新生命开发团队</Company>
<Copyright>©2002-2025 NewLife</Copyright>
<VersionPrefix>4.2</VersionPrefix>
<VersionPrefix>4.3</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>

View File

@ -16,12 +16,16 @@ class AntJobWorker(Scheduler scheduler, IServiceProvider serviceProvider) : Back
scheduler.Log = serviceProvider.GetService<ILog>();
scheduler.Tracer = serviceProvider.GetService<ITracer>();
if (scheduler.Provider == null)
{
var set = serviceProvider.GetService<AntSetting>();
set ??= AntSetting.Current;
scheduler.Join(set);
}
var set = serviceProvider.GetService<AntSetting>();
set ??= AntSetting.Current;
scheduler.Setting = set;
//if (scheduler.Provider == null)
//{
// var set = serviceProvider.GetService<AntSetting>();
// set ??= AntSetting.Current;
// scheduler.Join(set);
//}
// 添加作业
//scheduler.AddHandler<HelloJob>();

View File

@ -45,7 +45,8 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <summary>正在处理中的任务数</summary>
public Int32 Busy => _Busy;
private Int32 _speed;
/// <summary>处理速度。调度器可根据处理速度来调节</summary>
protected Int32 Speed { get; set; }
#endregion
#region
@ -65,7 +66,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
{
Name = GetType().Name.TrimEnd(nameof(Handler));
// 默认本月1号
// 默认今天
var now = DateTime.Now;
var job = new JobModel
{
@ -82,7 +83,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
#endregion
#region
/// <summary>初始化。作业处理器启动之前</summary>
/// <summary>初始化。作业处理器启动之前这里设置Job作业属性后将会提交给调度平台</summary>
public virtual void Init()
{
var job = Job;
@ -136,7 +137,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// </remarks>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public virtual ITask[] Acquire(Int32 count)
public virtual Task<ITask[]> Acquire(Int32 count)
{
var prv = Provider;
var job = Job;
@ -151,7 +152,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <param name="task"></param>
internal void Prepare(ITask task) => Interlocked.Increment(ref _Busy);
/// <summary>处理一项新任务</summary>
/// <summary>处理一项新任务。每个作业任务的顶级函数由线程池执行内部调用OnProcess</summary>
/// <param name="task"></param>
public virtual void Process(ITask task)
{
@ -170,7 +171,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
result.TraceId = span?.TraceId;
// 较慢的作业,及时报告进度
if (_speed < 10) Report(ctx, JobStatus.);
if (Speed < 10) Report(ctx, JobStatus.);
var sw = Stopwatch.StartNew();
try
@ -194,15 +195,13 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
sw.Stop();
ctx.Cost = sw.Elapsed.TotalMilliseconds;
_speed = ctx.Speed;
Speed = ctx.Speed;
OnFinish(ctx);
Schedule?.OnFinish(ctx);
//ctx.Items.Clear();
}
/// <summary>处理任务。内部分批处理</summary>
/// <summary>处理任务。内部分批处理由Process执行内部调用Execute</summary>
/// <param name="ctx"></param>
protected virtual void OnProcess(JobContext ctx)
{
@ -221,7 +220,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <summary>整个任务完成</summary>
/// <param name="ctx"></param>
protected virtual void OnFinish(JobContext ctx) => Provider?.Finish(ctx);
protected virtual void OnFinish(JobContext ctx) => Provider?.Finish(ctx).Wait();
#endregion
#region
@ -235,7 +234,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public virtual Int32 Produce(String topic, String[] messages, MessageOption option = null) => Provider.Produce(Job?.Name, topic, messages, option);
public virtual Task<Int32> Produce(String topic, String[] messages, MessageOption option = null) => Provider.Produce(Job?.Name, topic, messages, option);
/// <summary>生产消息</summary>
/// <param name="appId">发布消息到目标应用。留空发布当前应用</param>
@ -243,7 +242,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public virtual Int32 Produce(String appId, String topic, String[] messages, MessageOption option = null)
public virtual Task<Int32> Produce(String appId, String topic, String[] messages, MessageOption option = null)
{
option ??= new();
option.AppId = appId;

View File

@ -22,6 +22,7 @@ public abstract class MessageHandler : Handler
var job = Job;
job.BatchSize = 8;
job.DataTime = DateTime.MinValue;
}
#endregion
@ -41,7 +42,7 @@ public abstract class MessageHandler : Handler
/// </remarks>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public override ITask[] Acquire(Int32 count)
public override Task<ITask[]> Acquire(Int32 count)
{
// 消费模式设置Topic值
var prv = Provider;
@ -50,7 +51,7 @@ public abstract class MessageHandler : Handler
return prv.Acquire(job, Topic, count);
}
/// <summary>解码一批消息,处理任务</summary>
/// <summary>解码一批消息,由Process执行内部调用Execute处理任务</summary>
/// <param name="ctx"></param>
protected override void OnProcess(JobContext ctx)
{
@ -68,7 +69,7 @@ public abstract class MessageHandler : Handler
ctx.Success = Execute(ctx);
}
/// <summary>根据解码后的消息执行任务</summary>
/// <summary>根据解码后的消息执行任务。由OnProcess执行</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
public override Int32 Execute(JobContext ctx)
@ -82,7 +83,7 @@ public abstract class MessageHandler : Handler
return count;
}
/// <summary>处理一个数据对象</summary>
/// <summary>处理一个数据对象。由Execute执行每条消息调用一次</summary>
/// <param name="ctx">上下文</param>
/// <param name="message">消息</param>
/// <returns></returns>

View File

@ -58,7 +58,7 @@ public class JobContext : IExtend
#region
/// <summary>处理速度</summary>
public Int32 Speed => (Cost <= 0 || Total == 0) ? 0 : (Int32)Math.Min(Total * 1000L / Cost, Int32.MaxValue);
public Int32 Speed => (Cost <= 0 || Total == 0) ? 0 : (Int32)Math.Round(Total * 1000L / Cost);
#endregion
#region

View File

@ -92,39 +92,39 @@ public class AntClient : ClientBase
#region
/// <summary>获取指定名称的作业</summary>
/// <returns></returns>
public IJob[] GetJobs() => InvokeAsync<JobModel[]>(nameof(GetJobs)).Result;
public async Task<IJob[]> GetJobs() => await InvokeAsync<JobModel[]>(nameof(GetJobs));
/// <summary>批量添加作业</summary>
/// <param name="jobs"></param>
/// <returns></returns>
public String[] AddJobs(IJob[] jobs) => InvokeAsync<String[]>(nameof(AddJobs), new { jobs }).Result;
public Task<String[]> AddJobs(IJob[] jobs) => InvokeAsync<String[]>(nameof(AddJobs), new { jobs });
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
public IJob SetJob(IDictionary<String, Object> job) => Invoke<JobModel>(nameof(SetJob), job);
public async Task<IJob> SetJob(IDictionary<String, Object> job) => await InvokeAsync<JobModel>(nameof(SetJob), job);
/// <summary>申请作业任务</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public ITask[] Acquire(String job, String topic, Int32 count) => InvokeAsync<TaskModel[]>(nameof(Acquire), new AcquireModel
public async Task<ITask[]> Acquire(String job, String topic, Int32 count) => await InvokeAsync<TaskModel[]>(nameof(Acquire), new AcquireModel
{
Job = job,
Topic = topic,
Count = count,
}).Result;
});
/// <summary>生产消息</summary>
/// <param name="model">模型</param>
/// <returns></returns>
public Int32 Produce(ProduceModel model) => Invoke<Int32>(nameof(Produce), model);
public Task<Int32> Produce(ProduceModel model) => InvokeAsync<Int32>(nameof(Produce), model);
/// <summary>报告状态(进度、成功、错误)</summary>
/// <param name="task"></param>
/// <returns></returns>
public Boolean Report(ITaskResult task)
public async Task<Boolean> Report(ITaskResult task)
{
var retry = 3;
var lastex = new Exception();
@ -132,7 +132,7 @@ public class AntClient : ClientBase
{
try
{
return InvokeAsync<Boolean>(nameof(Report), task).Result;
return await InvokeAsync<Boolean>(nameof(Report), task);
}
catch (Exception ex)
{
@ -145,6 +145,6 @@ public class AntClient : ClientBase
/// <summary>获取当前应用的所有在线实例</summary>
/// <returns></returns>
public IPeer[] GetPeers() => InvokeAsync<PeerModel[]>(nameof(GetPeers)).Result;
public async Task<IPeer[]> GetPeers() => await InvokeAsync<PeerModel[]>(nameof(GetPeers));
#endregion
}

View File

@ -4,6 +4,9 @@ using NewLife;
using NewLife.Log;
using NewLife.Reflection;
using NewLife.Xml;
#if !NET45
using TaskEx = System.Threading.Tasks.Task;
#endif
namespace AntJob.Providers;
@ -22,7 +25,7 @@ public class FileJobProvider : JobProvider
}
/// <summary>开始</summary>
public override void Start()
public override Task Start()
{
var jf = _File = JobFile.Current;
@ -66,12 +69,12 @@ public class FileJobProvider : JobProvider
}
jf.Save();
base.Start();
return base.Start();
}
/// <summary>获取所有作业名称</summary>
/// <returns></returns>
public override IJob[] GetJobs()
public override Task<IJob[]> GetJobs()
{
var jf = _File = JobFile.Current;
@ -85,24 +88,24 @@ public class FileJobProvider : JobProvider
}
}
return list.ToArray();
return Task.FromResult(list.ToArray());
}
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
public override IJob SetJob(IJob job) => null;
public override Task<IJob> SetJob(IJob job) => Task.FromResult((IJob)null);
/// <summary>申请任务</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public override ITask[] Acquire(IJob job, String topic, Int32 count)
public override Task<ITask[]> Acquire(IJob job, String topic, Int32 count)
{
var list = new List<ITask>();
if (!job.Enable) return list.ToArray();
if (!job.Enable) return Task.FromResult(list.ToArray());
// 当前时间减去偏移量,作为当前时间。数据抽取不许超过该时间
var now = DateTime.Now.AddSeconds(-job.Offset);
@ -155,12 +158,12 @@ public class FileJobProvider : JobProvider
_File.SaveAsync();
}
return list.ToArray();
return Task.FromResult(list.ToArray());
}
/// <summary>完成任务,每个任务只调用一次</summary>
/// <param name="ctx">上下文</param>
public override void Finish(JobContext ctx)
public override Task Finish(JobContext ctx)
{
var ex = ctx.Error?.GetTrue();
if (ex != null) XTrace.WriteException(ex);
@ -180,6 +183,8 @@ public class FileJobProvider : JobProvider
XTrace.WriteLine(msg);
}
return TaskEx.CompletedTask;
}
#region

View File

@ -1,6 +1,9 @@
using AntJob.Data;
using NewLife;
using NewLife.Log;
#if !NET45
using TaskEx = System.Threading.Tasks.Task;
#endif
namespace AntJob.Providers;
@ -11,26 +14,26 @@ public interface IJobProvider
Scheduler Schedule { get; set; }
/// <summary>开始工作</summary>
void Start();
Task Start();
/// <summary>停止工作</summary>
void Stop();
Task Stop();
/// <summary>获取所有作业。调度器定期获取以更新作业参数</summary>
/// <returns></returns>
IJob[] GetJobs();
Task<IJob[]> GetJobs();
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
IJob SetJob(IJob job);
Task<IJob> SetJob(IJob job);
/// <summary>申请任务</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
ITask[] Acquire(IJob job, String topic, Int32 count);
Task<ITask[]> Acquire(IJob job, String topic, Int32 count);
/// <summary>生产消息</summary>
/// <param name="job">作业</param>
@ -38,15 +41,15 @@ public interface IJobProvider
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
Int32 Produce(String job, String topic, String[] messages, MessageOption option);
Task<Int32> Produce(String job, String topic, String[] messages, MessageOption option);
/// <summary>报告进度</summary>
/// <param name="ctx">上下文</param>
void Report(JobContext ctx);
Task Report(JobContext ctx);
/// <summary>完成任务</summary>
/// <param name="ctx">上下文</param>
void Finish(JobContext ctx);
Task Finish(JobContext ctx);
}
/// <summary>任务提供者基类</summary>
@ -56,26 +59,26 @@ public abstract class JobProvider : DisposeBase, IJobProvider, ITracerFeature, I
public Scheduler Schedule { get; set; }
/// <summary>开始工作</summary>
public virtual void Start() { }
public virtual Task Start() => TaskEx.CompletedTask;
/// <summary>停止工作</summary>
public virtual void Stop() { }
public virtual Task Stop() => TaskEx.CompletedTask;
/// <summary>获取所有作业名称</summary>
/// <returns></returns>
public abstract IJob[] GetJobs();
public abstract Task<IJob[]> GetJobs();
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
public abstract IJob SetJob(IJob job);
public abstract Task<IJob> SetJob(IJob job);
/// <summary>申请任务</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public abstract ITask[] Acquire(IJob job, String topic, Int32 count);
public abstract Task<ITask[]> Acquire(IJob job, String topic, Int32 count);
/// <summary>生产消息</summary>
/// <param name="job">作业</param>
@ -83,15 +86,15 @@ public abstract class JobProvider : DisposeBase, IJobProvider, ITracerFeature, I
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public virtual Int32 Produce(String job, String topic, String[] messages, MessageOption option = null) => 0;
public virtual Task<Int32> Produce(String job, String topic, String[] messages, MessageOption option = null) => Task.FromResult(0);
/// <summary>报告进度,每个任务多次调用</summary>
/// <param name="ctx">上下文</param>
public virtual void Report(JobContext ctx) { }
public virtual Task Report(JobContext ctx) => TaskEx.CompletedTask;
/// <summary>完成任务,每个任务只调用一次</summary>
/// <param name="ctx">上下文</param>
public virtual void Finish(JobContext ctx) { }
public virtual Task Finish(JobContext ctx) => TaskEx.CompletedTask;
#region
/// <summary>性能跟踪器</summary>

View File

@ -24,7 +24,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
{
base.Dispose(disposing);
Stop();
Stop().Wait(5_000);
_timer.TryDispose();
_timer = null;
@ -33,7 +33,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
#region
/// <summary>开始</summary>
public override void Start()
public override async Task Start()
{
WriteLog("正在连接调度中心:{0}", setting.Server);
@ -43,10 +43,14 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
Tracer = Tracer,
Log = Log,
};
ant.Login().Wait();
await ant.Login();
// 断开前一个连接
Ant.TryDispose();
if (Ant != null)
{
await Ant.Logout("new Start");
Ant.TryDispose();
}
Ant = ant;
var bs = Schedule?.Handlers;
@ -97,9 +101,9 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
{
WriteLog("注册作业[{0}]{1}", list.Count, list.Join(",", e => e.Name));
var rs = Ant.AddJobs(list.ToArray());
var rs = await Ant.AddJobs(list.ToArray());
WriteLog("注册成功[{0}]{1}", rs?.Length, rs.Join());
WriteLog("新增作业[{0}]{1}", rs?.Length, rs.Join());
}
// 通信完成,改回来本地时间
@ -119,13 +123,17 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
}
/// <summary>停止</summary>
public override void Stop()
public override async Task Stop()
{
Ant?.Logout(nameof(Stop)).Wait(1_000);
var ant = Ant;
if (ant != null)
{
await ant.Logout(nameof(Stop));
// 断开前一个连接
Ant.TryDispose();
Ant = null;
// 断开前一个连接
ant.TryDispose();
Ant = null;
}
}
#endregion
@ -135,7 +143,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
private DateTime _NextGetJobs;
/// <summary>获取所有作业名称</summary>
/// <returns></returns>
public override IJob[] GetJobs()
public override async Task<IJob[]> GetJobs()
{
// 周期性获取,避免请求过快
var now = TimerX.Now;
@ -143,11 +151,10 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
{
_NextGetJobs = now.AddSeconds(5);
_jobs = Ant.GetJobs();
if (_jobs != null)
var jobs = await Ant.GetJobs();
if (jobs != null)
{
foreach (var job in _jobs)
foreach (var job in jobs)
{
// 通信约定UTC收到后需转为本地时间
job.DataTime = job.DataTime.ToLocalTime();
@ -156,8 +163,10 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
}
// 备份一份,用于比较
_baks = _jobs.Select(e => ((ICloneable)e).Clone() as IJob).ToArray();
_baks = jobs.Select(e => ((ICloneable)e).Clone() as IJob).ToArray();
}
_jobs = jobs;
}
return _jobs;
@ -166,7 +175,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
public override IJob SetJob(IJob job)
public override Task<IJob> SetJob(IJob job)
{
var dic = job.ToDictionary();
var old = _baks?.FirstOrDefault(e => e.Name == job.Name);
@ -190,9 +199,9 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public override ITask[] Acquire(IJob job, String topic, Int32 count)
public override async Task<ITask[]> Acquire(IJob job, String topic, Int32 count)
{
var rs = Ant.Acquire(job.Name, topic, count);
var rs = await Ant.Acquire(job.Name, topic, count);
if (rs != null)
{
foreach (var task in rs)
@ -213,7 +222,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public override Int32 Produce(String job, String topic, String[] messages, MessageOption option = null)
public override async Task<Int32> Produce(String job, String topic, String[] messages, MessageOption option = null)
{
if (topic.IsNullOrEmpty() || messages == null || messages.Length < 1) return 0;
@ -230,14 +239,14 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
model.AppId = option.AppId;
}
return Ant.Produce(model);
return await Ant.Produce(model);
}
#endregion
#region
/// <summary>报告进度,每个任务多次调用</summary>
/// <param name="ctx">上下文</param>
public override void Report(JobContext ctx)
public override async Task Report(JobContext ctx)
{
// 不用上报抽取中
if (ctx.Status == JobStatus.) return;
@ -253,12 +262,12 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
if (ctx.NextTime.Year > 2000) task.NextTime = ctx.NextTime.ToUniversalTime();
Report(ctx.Handler.Job, task);
await Report(ctx.Handler.Job, task);
}
/// <summary>完成任务,每个任务只调用一次</summary>
/// <param name="ctx">上下文</param>
public override void Finish(JobContext ctx)
public override async Task Finish(JobContext ctx)
{
if (ctx?.Result is not TaskResult task) return;
@ -298,14 +307,14 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
task.Key = ctx.Key;
Report(ctx.Handler.Job, task);
await Report(ctx.Handler.Job, task);
}
private void Report(IJob job, ITaskResult task)
private async Task Report(IJob job, ITaskResult task)
{
try
{
Ant.Report(task);
await Ant.Report(task);
}
catch (Exception ex)
{
@ -317,9 +326,9 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
#region
private TimerX _timer;
private void DoCheckPeer(Object state)
private async Task DoCheckPeer(Object state)
{
var ps = Ant?.GetPeers();
var ps = await Ant?.GetPeers();
if (ps == null || ps.Length == 0) return;
var old = (Peers ?? []).ToList();

View File

@ -24,7 +24,10 @@ public class Scheduler : DisposeBase
/// <summary>服务提供者</summary>
public IServiceProvider ServiceProvider { get; set; }
private ICache _cache = MemoryCache.Default;
/// <summary>配置信息</summary>
public AntSetting Setting { get; set; }
private readonly ICache _cache = MemoryCache.Default;
#endregion
#region
@ -64,19 +67,30 @@ public class Scheduler : DisposeBase
/// <summary>加入调度中心从注册中心获取地址自动识别RPC/Http</summary>
/// <param name="set"></param>
/// <returns></returns>
[Obsolete("=>JoinAsync")]
public IJobProvider Join(AntSetting set)
{
JoinAsync(set).Wait();
return Provider;
}
/// <summary>加入调度中心从注册中心获取地址自动识别RPC/Http</summary>
/// <param name="set"></param>
/// <returns></returns>
public async Task JoinAsync(AntSetting set)
{
var server = set.Server;
var registry = ServiceProvider?.GetService<IRegistry>();
if (registry != null)
{
var rs = registry.ResolveAsync("AntServer").Result;
var svrs = registry.ResolveAddressAsync("AntServer").Result;
//var rs = registry.ResolveAsync("AntServer").Result;
var svrs = await registry.ResolveAddressAsync("AntServer");
if (svrs != null && svrs.Length > 0) server = svrs.Join();
}
if (server.IsNullOrEmpty()) return null;
if (server.IsNullOrEmpty()) return;
set.Server = server;
// 根据地址决定用Http还是RPC
@ -106,14 +120,15 @@ public class Scheduler : DisposeBase
Provider = rpc;
}
return Provider;
}
/// <summary>开始</summary>
/// <summary>开始调度。推荐使用StartAsync</summary>
[Obsolete("=>StartAsync")]
public void Start()
{
OnStart();
if (Setting != null) Join(Setting);
OnStart().Wait();
}
/// <summary>异步开始。使用定时器尝试连接服务端</summary>
@ -124,13 +139,15 @@ public class Scheduler : DisposeBase
private Boolean _inited;
private TimerX _timerStart;
private void CheckStart(Object state)
private async Task CheckStart(Object state)
{
if (!_inited)
{
try
{
OnStart();
if (Setting != null) await JoinAsync(Setting);
await OnStart();
_inited = true;
}
@ -143,7 +160,7 @@ public class Scheduler : DisposeBase
if (_inited) _timerStart.TryDispose();
}
private void OnStart()
private async Task OnStart()
{
// 从容器中获取所有服务
foreach (var item in ServiceProvider.GetServices<Handler>())
@ -176,10 +193,10 @@ public class Scheduler : DisposeBase
if (prv is ITracerFeature tf) tf.Tracer = Tracer;
if (prv is ILogFeature lf) lf.Log = Log;
prv.Start();
await prv.Start();
// 获取本应用在调度中心管理的所有作业
var jobs = prv.GetJobs() ?? [];
var jobs = await prv.GetJobs();
//if (jobs == null || jobs.Length == 0) throw new Exception("调度中心没有可用作业");
// 输出日志
@ -190,17 +207,16 @@ public class Scheduler : DisposeBase
{
handler.Schedule = this;
handler.Provider = prv;
handler.Tracer ??= Tracer;
handler.Log ??= Log;
// 查找作业参数,分配给处理器
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
var job = jobs?.FirstOrDefault(e => e.Name == handler.Name);
if (job == null || !job.Enable) continue;
if (job != null && job.Mode == 0) job.Mode = handler.Mode;
handler.Job = job;
handler.Tracer ??= Tracer;
handler.Log ??= Log;
try
{
handler.Start();
@ -223,8 +239,6 @@ public class Scheduler : DisposeBase
_timer.TryDispose();
_timer = null;
Provider?.Stop();
foreach (var handler in Handlers)
{
try
@ -236,27 +250,29 @@ public class Scheduler : DisposeBase
Log?.Error("作业[{0}]停止失败!{1}", handler.GetType().FullName, ex.Message);
}
}
Provider?.Stop().Wait();
}
/// <summary>任务调度</summary>
/// <returns></returns>
public Boolean Process()
public async Task<Boolean> Process()
{
var prv = Provider;
// 查询所有处理器
var hs = Handlers;
var handlers = Handlers;
// 拿到处理器对应的作业
var jobs = prv.GetJobs();
var jobs = await prv.GetJobs();
if (jobs == null) return false;
// 运行时动态往集合里面加处理器为了配合Sql+C#
CheckHandlers(prv, jobs, hs);
CheckHandlers(prv, jobs, handlers);
var flag = false;
// 遍历处理器,给空闲的增加任务
foreach (var handler in hs)
foreach (var handler in handlers)
{
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
// 找不到或者已停用
@ -302,7 +318,7 @@ public class Scheduler : DisposeBase
if (count > 0)
{
// 循环申请任务,喂饱处理器
var ts = handler.Acquire(count);
var ts = await handler.Acquire(count);
// 送给处理器处理
for (var i = 0; i < count && ts != null && i < ts.Length; i++)
@ -354,6 +370,7 @@ public class Scheduler : DisposeBase
handler.Log ??= Log;
handler.Tracer ??= Tracer;
handler.Init();
handler.Start();
handlers.Add(handler);
@ -381,10 +398,10 @@ public class Scheduler : DisposeBase
private TimerX _timer;
private void Loop(Object state)
private async Task Loop(Object state)
{
// 任务调度
var rs = Process();
var rs = await Process();
// 如果有数据,马上开始下一轮
if (rs) TimerX.Current.SetNext(-1);