[feat]支持跨应用发布消息。close: https://github.com/NewLifeX/AntJob/issues/8

This commit is contained in:
智能大石头 2025-04-02 08:20:12 +08:00
parent 2552475092
commit a981c85d14
6 changed files with 39 additions and 9 deletions

View File

@ -324,6 +324,13 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
var messages = model?.Messages?.Where(e => !e.IsNullOrEmpty()).Distinct().ToArray();
if (messages == null || messages.Length == 0) return 0;
var target = app;
if (!model.AppId.IsNullOrEmpty())
{
target = App.FindByName(model.AppId);
if (target == null) throw new XException($"找不到目标应用[{model.AppId}]");
}
// 去重过滤
if (model.Unique)
{
@ -333,14 +340,14 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
var msgs = new List<String>();
foreach (var item in messages)
{
var key = $"antjob:{app.ID}:{model.Topic}:{item}";
var key = $"antjob:{target.ID}:{model.Topic}:{item}";
if (_cacheProvider.Cache.Add(key, item, 2 * 3600)) msgs.Add(item);
}
messages = msgs.ToArray();
}
else
{
messages = AppMessage.Filter(app.ID, model.Topic, messages);
messages = AppMessage.Filter(target.ID, model.Topic, messages);
if (messages.Length == 0) return 0;
}
}
@ -352,14 +359,15 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
// 延迟需要基于任务开始时间,而不能用使用当前时间,防止回头跑数据时无法快速执行
var dTime = now.AddSeconds(model.DelayTime);
var jb = Job.FindByAppIDAndName(app.ID, model.Job);
//var jb = Job.FindByAppIDAndName(app.ID, model.Job);
var jb = app.Jobs.FirstOrDefault(e => e.Name == model.Job);
var snow = AppMessage.Meta.Factory.Snow;
foreach (var item in messages)
{
var jm = new AppMessage
{
Id = snow.NewId(),
AppID = app.ID,
AppID = target.ID,
JobID = jb == null ? 0 : jb.ID,
Topic = model.Topic,
Data = item,
@ -382,15 +390,15 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
if (total < 0) total = messages.Length;
if (total > 0)
{
var job2 = app.Jobs?.FirstOrDefault(e => e.Topic == model.Topic);
var job2 = target.Jobs?.FirstOrDefault(e => e.Topic == model.Topic);
if (job2 != null)
{
job2.MessageCount += total;
job2.SaveAsync();
}
app.MessageCount += total;
app.SaveAsync();
target.MessageCount += total;
target.SaveAsync();
}
return total;

View File

@ -227,6 +227,19 @@ public abstract class Handler : IExtend, ITracerFeature, ILogFeature
/// <returns></returns>
public virtual Int32 Produce(String topic, String[] messages, MessageOption option = null) => Provider.Produce(Job?.Name, topic, messages, option);
/// <summary>生产消息</summary>
/// <param name="appId">发布消息到目标应用。留空发布当前应用</param>
/// <param name="topic">主题</param>
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public virtual Int32 Produce(String appId, String topic, String[] messages, MessageOption option = null)
{
option ??= new();
option.AppId = appId;
return Provider.Produce(Job?.Name, topic, messages, option);
}
/// <summary>延迟执行,指定下一次执行时间</summary>
/// <param name="ctx"></param>
/// <param name="nextTime"></param>

View File

@ -3,6 +3,10 @@
/// <summary>生成消息的模型</summary>
public class ProduceModel
{
/// <summary>应用标识</summary>
/// <remarks>要发布消息的目标应用。默认为空,发布到当前应用</remarks>
public String AppId { get; set; }
/// <summary>作业名</summary>
public String Job { get; set; }

View File

@ -102,7 +102,7 @@ public class AntClient : ClientBase
/// <summary>设置作业。支持控制作业启停、数据时间、步进等参数</summary>
/// <param name="job"></param>
/// <returns></returns>
public IJob SetJob(IDictionary<String, Object> job) => InvokeAsync<JobModel>(nameof(SetJob), job).Result;
public IJob SetJob(IDictionary<String, Object> job) => Invoke<JobModel>(nameof(SetJob), job);
/// <summary>申请作业任务</summary>
/// <param name="job">作业</param>
@ -119,7 +119,7 @@ public class AntClient : ClientBase
/// <summary>生产消息</summary>
/// <param name="model">模型</param>
/// <returns></returns>
public Int32 Produce(ProduceModel model) => InvokeAsync<Int32>(nameof(Produce), model).Result;
public Int32 Produce(ProduceModel model) => Invoke<Int32>(nameof(Produce), model);
/// <summary>报告状态(进度、成功、错误)</summary>
/// <param name="task"></param>

View File

@ -8,4 +8,8 @@ public class MessageOption
/// <summary>消息去重。避免单个消息被重复生产</summary>
public Boolean Unique { get; set; }
/// <summary>应用标识</summary>
/// <remarks>要发布消息的目标应用。默认为空,发布到当前应用</remarks>
public String AppId { get; set; }
}

View File

@ -225,6 +225,7 @@ public class NetworkJobProvider(AntSetting setting) : JobProvider
{
model.DelayTime = option.DelayTime;
model.Unique = option.Unique;
model.AppId = option.AppId;
}
return Ant.Produce(model);