[feat]应用消息增加延迟时间,用于实现延迟消息,该时间到达后才能消费消息

This commit is contained in:
智能大石头 2024-09-03 12:27:15 +08:00
parent e4abb1f6c7
commit 16c77f4b07
5 changed files with 42 additions and 5 deletions

View File

@ -1713,6 +1713,17 @@
<td>可以是Json数据比如StatID</td>
</tr>
<tr>
<td>DelayTime</td>
<td>延迟时间</td>
<td>DateTime</td>
<td></td>
<td></td>
<td></td>
<td></td>
<td>延迟到该时间执行</td>
</tr>
<tr>
<td>TraceId</td>
<td>追踪</td>

View File

@ -119,7 +119,7 @@ public partial class AppMessage : EntityBase<AppMessage>
/// <returns></returns>
public static IList<AppMessage> GetTopic(Int32 appid, String topic, DateTime endTime, Int32 count)
{
return FindAll(_.AppID == appid & _.Topic == topic & _.Id <= Meta.Factory.Snow.GetId(endTime), _.Id.Asc(), null, 0, count);
return FindAll(_.AppID == appid & _.Topic == topic & _.DelayTime <= endTime, _.Id.Asc(), null, 0, count);
}
/// <summary>去重过滤</summary>

View File

@ -18,6 +18,7 @@ namespace AntJob.Data.Entity;
[DataObject]
[Description("应用消息。消息调度,某些作业负责生产消息,供其它作业进行消费处理")]
[BindIndex("IX_AppMessage_AppID_Topic_UpdateTime", false, "AppID,Topic,UpdateTime")]
[BindIndex("IX_AppMessage_AppID_Topic_DelayTime", false, "AppID,Topic,DelayTime")]
[BindTable("AppMessage", Description = "应用消息。消息调度,某些作业负责生产消息,供其它作业进行消费处理", ConnName = "Ant", DbType = DatabaseType.None)]
public partial class AppMessage
{
@ -62,6 +63,14 @@ public partial class AppMessage
[BindColumn("Data", "数据。可以是Json数据比如StatID", "")]
public String Data { get => _Data; set { if (OnPropertyChanging("Data", value)) { _Data = value; OnPropertyChanged("Data"); } } }
private DateTime _DelayTime;
/// <summary>延迟时间。延迟到该时间执行</summary>
[DisplayName("延迟时间")]
[Description("延迟时间。延迟到该时间执行")]
[DataObjectField(false, false, true, 0)]
[BindColumn("DelayTime", "延迟时间。延迟到该时间执行", "")]
public DateTime DelayTime { get => _DelayTime; set { if (OnPropertyChanging("DelayTime", value)) { _DelayTime = value; OnPropertyChanged("DelayTime"); } } }
private String _TraceId;
/// <summary>追踪。链路追踪用于APM性能追踪定位还原该事件的调用链</summary>
[Category("扩展")]
@ -121,6 +130,7 @@ public partial class AppMessage
"JobID" => _JobID,
"Topic" => _Topic,
"Data" => _Data,
"DelayTime" => _DelayTime,
"TraceId" => _TraceId,
"CreateIP" => _CreateIP,
"CreateTime" => _CreateTime,
@ -137,6 +147,7 @@ public partial class AppMessage
case "JobID": _JobID = value.ToInt(); break;
case "Topic": _Topic = Convert.ToString(value); break;
case "Data": _Data = Convert.ToString(value); break;
case "DelayTime": _DelayTime = value.ToDateTime(); break;
case "TraceId": _TraceId = Convert.ToString(value); break;
case "CreateIP": _CreateIP = Convert.ToString(value); break;
case "CreateTime": _CreateTime = value.ToDateTime(); break;
@ -177,6 +188,16 @@ public partial class AppMessage
return Find(_.Id == id);
}
/// <summary>根据应用查找</summary>
/// <param name="appId">应用</param>
/// <returns>实体列表</returns>
public static IList<AppMessage> FindAllByAppID(Int32 appId)
{
if (appId < 0) return [];
return FindAll(_.AppID == appId);
}
#endregion
#region
@ -209,6 +230,9 @@ public partial class AppMessage
/// <summary>数据。可以是Json数据比如StatID</summary>
public static readonly Field Data = FindByName("Data");
/// <summary>延迟时间。延迟到该时间执行</summary>
public static readonly Field DelayTime = FindByName("DelayTime");
/// <summary>追踪。链路追踪用于APM性能追踪定位还原该事件的调用链</summary>
public static readonly Field TraceId = FindByName("TraceId");
@ -245,6 +269,9 @@ public partial class AppMessage
/// <summary>数据。可以是Json数据比如StatID</summary>
public const String Data = "Data";
/// <summary>延迟时间。延迟到该时间执行</summary>
public const String DelayTime = "DelayTime";
/// <summary>追踪。链路追踪用于APM性能追踪定位还原该事件的调用链</summary>
public const String TraceId = "TraceId";

View File

@ -230,6 +230,7 @@
<Column Name="JobID" DataType="Int32" Map="Job@ID@$" Description="作业。生产消息的作业" />
<Column Name="Topic" DataType="String" Description="主题。区分作业下多种消息" />
<Column Name="Data" DataType="String" Length="2000" Description="数据。可以是Json数据比如StatID" />
<Column Name="DelayTime" DataType="DateTime" Description="延迟时间。延迟到该时间执行" />
<Column Name="TraceId" DataType="String" Length="200" Description="追踪。链路追踪用于APM性能追踪定位还原该事件的调用链" Category="扩展" />
<Column Name="CreateIP" DataType="String" Description="创建地址" Category="扩展" />
<Column Name="CreateTime" DataType="DateTime" Description="创建时间" Category="扩展" />
@ -238,6 +239,7 @@
</Columns>
<Indexes>
<Index Columns="AppID,Topic,UpdateTime" />
<Index Columns="AppID,Topic,DelayTime" />
</Indexes>
</Table>
</Tables>

View File

@ -361,10 +361,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
jm.CreateTime = jm.UpdateTime = now;
if (model.DelayTime > 0)
{
jm.UpdateTime = dTime;
}
if (model.DelayTime > 0) jm.DelayTime = dTime;
ms.Add(jm);
}