[break]Execute等方法设置为公开,方便对处理器代码逻辑进行单元测试

This commit is contained in:
大石头 2024-07-12 14:04:06 +08:00
parent 43c4b4385c
commit 5a2f98db5b
9 changed files with 30 additions and 20 deletions

View File

@ -9,8 +9,10 @@ namespace AntJob.Extensions;
/// <summary>数据处理作业(泛型)</summary>
/// <remarks>
/// 文档https://newlifex.com/blood/antjob
///
/// 定时调度只要达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑。
/// 任务切片条件StartTime + Step + Offset &lt;= Now
/// 任务切片条件:DataTime + Step + Offset &lt;= Now
/// </remarks>
/// <typeparam name="TEntity"></typeparam>
public abstract class DataHandler<TEntity> : DataHandler where TEntity : Entity<TEntity>, new()
@ -22,7 +24,7 @@ public abstract class DataHandler<TEntity> : DataHandler where TEntity : Entity<
/// <summary>处理一批数据</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
var count = 0;
foreach (var item in ctx.Data as IEnumerable)
@ -35,14 +37,16 @@ public abstract class DataHandler<TEntity> : DataHandler where TEntity : Entity<
/// <param name="ctx">上下文</param>
/// <param name="entity"></param>
/// <returns></returns>
protected virtual Boolean ProcessItem(JobContext ctx, TEntity entity) => true;
public virtual Boolean ProcessItem(JobContext ctx, TEntity entity) => true;
#endregion
}
/// <summary>数据处理作业</summary>
/// <remarks>
/// 文档https://newlifex.com/blood/antjob
///
/// 定时调度只要达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑。
/// 任务切片条件StartTime + Step + Offset &lt;= Now
/// 任务切片条件:DataTime + Step + Offset &lt;= Now
/// </remarks>
public abstract class DataHandler : Handler
{
@ -183,7 +187,7 @@ public abstract class DataHandler : Handler
/// <summary>处理一批数据</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
var count = 0;
foreach (var item in ctx.Data as IEnumerable)
@ -198,6 +202,6 @@ public abstract class DataHandler : Handler
/// <param name="ctx">上下文</param>
/// <param name="entity"></param>
/// <returns></returns>
protected virtual Boolean ProcessItem(JobContext ctx, IEntity entity) => true;
public virtual Boolean ProcessItem(JobContext ctx, IEntity entity) => true;
#endregion
}

View File

@ -24,7 +24,7 @@ public class SqlHandler : Handler
/// <summary>执行</summary>
/// <param name="ctx"></param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
//var sqls = ctx.Task.Data as String;
var sqls = Job.Data;

View File

@ -20,7 +20,7 @@ public class SqlMessage : MessageHandler
/// <summary>根据解码后的消息执行任务</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
var msgs = ctx.Data as String[];
var sqls = Job.Data;

View File

@ -9,10 +9,16 @@ namespace AntJob;
/// <summary>处理器基类,每个作业一个处理器</summary>
/// <remarks>
/// 文档https://newlifex.com/blood/antjob
///
/// 每个作业一个处理器类,负责一个业务处理模块。
/// 例如在数据同步或数据清洗中,每张表就写一个处理器,如果一组数据表有共同特性,还可以为它们封装一个自己的处理器基类。
///
/// 定时调度只要当前时间达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑。
///
/// 调度器控制方法Start|Stop|Acquire
/// 任务处理流程Process->OnProcess->Execute->OnFinish
/// 任务控制方法Produce|Delay
/// </remarks>
public abstract class Handler : IExtend, ITracerFeature, ILogFeature
{
@ -134,7 +140,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <summary>处理一项新任务</summary>
/// <param name="task"></param>
public void Process(ITask task)
public virtual void Process(ITask task)
{
if (task == null) return;
@ -147,7 +153,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
};
// APM埋点
using var span = Schedule.Tracer?.NewSpan($"job:{Name}", task.Data ?? $"({task.DataTime.ToFullString()}, {task.End.ToFullString()})");
using var span = Schedule?.Tracer?.NewSpan($"job:{Name}", task.Data ?? $"({task.DataTime.ToFullString()}, {task.End.ToFullString()})");
result.TraceId = span?.TraceId;
// 较慢的作业,及时报告进度
@ -180,7 +186,7 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
OnFinish(ctx);
Schedule?.OnFinish(ctx);
ctx.Items.Clear();
//ctx.Items.Clear();
}
/// <summary>处理任务。内部分批处理</summary>
@ -209,19 +215,19 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <summary>处理一批数据,一个任务内多次调用</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected abstract Int32 Execute(JobContext ctx);
public abstract Int32 Execute(JobContext ctx);
/// <summary>生产消息</summary>
/// <param name="topic">主题</param>
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public Int32 Produce(String topic, String[] messages, MessageOption option = null) => Provider.Produce(Job?.Name, topic, messages, option);
public virtual Int32 Produce(String topic, String[] messages, MessageOption option = null) => Provider.Produce(Job?.Name, topic, messages, option);
/// <summary>延迟执行,指定下一次执行时间</summary>
/// <param name="ctx"></param>
/// <param name="nextTime"></param>
public void Delay(JobContext ctx, DateTime nextTime)
public virtual void Delay(JobContext ctx, DateTime nextTime)
{
ctx.Status = JobStatus.;
ctx.NextTime = nextTime;

View File

@ -26,7 +26,7 @@ public class CSharpHandler : Handler
/// <summary>执行</summary>
/// <param name="ctx"></param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
var code = ctx.Data as String;
if (code.IsNullOrWhiteSpace()) return -1;

View File

@ -71,7 +71,7 @@ public abstract class MessageHandler : Handler
/// <summary>根据解码后的消息执行任务</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
var count = 0;
foreach (String item in ctx.Data as IEnumerable)
@ -86,6 +86,6 @@ public abstract class MessageHandler : Handler
/// <param name="ctx">上下文</param>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual Boolean ProcessItem(JobContext ctx, String message) => true;
public virtual Boolean ProcessItem(JobContext ctx, String message) => true;
#endregion
}

View File

@ -17,7 +17,7 @@ internal class BuildPatient : Handler
Job.Cron = "5 1/3 * * * ?";
}
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
// 随机造几个病人
var count = Rand.Next(1, 9);

View File

@ -29,7 +29,7 @@ class BuildWill : DataHandler
return base.Start();
}
protected override Boolean ProcessItem(JobContext ctx, IEntity entity)
public override Boolean ProcessItem(JobContext ctx, IEntity entity)
{
var pi = entity as ZYBH0;

View File

@ -15,7 +15,7 @@ internal class HelloJob : Handler
Job.Cron = "7/30 * * * * ?";
}
protected override Int32 Execute(JobContext ctx)
public override Int32 Execute(JobContext ctx)
{
using var span = Tracer?.NewSpan("HelloJob", ctx.Task.DataTime);