成功向两个topc生产消息;

select id as topic_roleId, id as topic_myId from role  updatetime<'{End}'
This commit is contained in:
大石头 2020-04-19 23:19:48 +08:00
parent 98cbadfc0a
commit fc01893ead
6 changed files with 90 additions and 17 deletions

View File

@ -30,6 +30,16 @@
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
</PropertyGroup>
<ItemGroup>
<None Remove="appsettings.json" />
</ItemGroup>
<ItemGroup>
<Content Include="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<PackageReference Include="NewLife.Agent" Version="8.7.2020.417-beta" />

View File

@ -0,0 +1,18 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"AllowedHosts": "*",
"ConnectionStrings": {
"Membership": {
"connectionString": "Data Source=..\\Data\\Membership.db",
"providerName": "SQLite"
},
"Log": {
"connectionString": "Data Source=..\\Data\\Log.db",
"providerName": "SQLite"
},
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Linq;
using AntJob.Data;
using AntJob.Extensions;
using AntJob.Providers;
using NewLife.Data;
using XCode.DataAccessLayer;
@ -37,7 +39,7 @@ namespace AntJob
var sections = SqlSection.ParseAll(sqls);
if (sections.Length == 0) return -1;
var rs = ExecuteSql(sections, ctx);
var rs = ExecuteSql(sections, ctx, (section, dt) => SqlMessage.ProcessMessage(dt, ctx));
return rs;
}
@ -45,8 +47,9 @@ namespace AntJob
/// <summary>执行Sql集合</summary>
/// <param name="sections"></param>
/// <param name="ctx"></param>
/// <param name="callback"></param>
/// <returns></returns>
public static Int32 ExecuteSql(SqlSection[] sections, JobContext ctx)
public static Int32 ExecuteSql(SqlSection[] sections, JobContext ctx, Action<SqlSection, DbTable> callback = null)
{
if (sections == null || sections.Length == 0) return -1;
@ -62,19 +65,23 @@ namespace AntJob
{
// 按顺序执行处理Sql语句
DbTable dt = null;
foreach (var item in sections)
foreach (var section in sections)
{
switch (item.Action)
switch (section.Action)
{
case SqlActions.Query:
dt = item.Query();
dt = section.Query();
if (dt != null) ctx.Total += dt.Rows.Count;
// 处理生产消息
callback?.Invoke(section, dt);
break;
case SqlActions.Execute:
rs += item.Execute();
rs += section.Execute();
break;
case SqlActions.Insert:
rs += item.BatchInsert(dt);
rs += section.BatchInsert(dt);
break;
default:
break;

View File

@ -1,9 +1,11 @@
using System;
using System.Linq;
using System.Threading;
using AntJob.Data;
using AntJob.Extensions;
using AntJob.Handlers;
using AntJob.Providers;
using NewLife.Data;
using XCode.DataAccessLayer;
namespace AntJob
{
@ -15,14 +17,7 @@ namespace AntJob
{
#region
/// <summary>实例化</summary>
public SqlMessage()
{
Topic = "Sql";
//Mode = JobModes.Message;
//var job = Job;
//job.BatchSize = 8;
}
public SqlMessage() => Topic = "Sql";//Mode = JobModes.Message;//var job = Job;//job.BatchSize = 8;
#endregion
/// <summary>根据解码后的消息执行任务</summary>
@ -36,12 +31,35 @@ namespace AntJob
// 向调度中心返回解析后的Sql语句
ctx.Remark = sqls;
// 分解Sql语句得到片段数组
var sections = SqlSection.ParseAll(sqls);
if (sections.Length == 0) return -1;
var rs = SqlHandler.ExecuteSql(sections, ctx);
// 依次执行Sql片段数组。遇到query时可能需要生产消息
var rs = SqlHandler.ExecuteSql(sections, ctx, (section, dt) => ProcessMessage(dt, ctx));
return rs;
}
internal static void ProcessMessage(DbTable dt, JobContext ctx)
{
if (dt == null || dt.Columns == null || dt.Columns.Length == 0 || dt.Rows == null || dt.Rows.Count == 0) return;
// select id as topic_roleId, id as topic_myId from role where updatetime>='{Start}' and updatetime<'{End}'
for (var i = 0; i < dt.Columns.Length; i++)
{
var col = dt.Columns[i];
if (col.StartsWithIgnoreCase("topic_"))
{
var topic = col.Substring("topic_".Length);
var messages = dt.Rows.Select(e => "{0}".F(e[i])).Distinct().ToArray();
if (messages.Length > 0)
{
ctx.Handler.Produce(topic, messages, new MessageOption { Unique = true });
}
}
}
}
}
}

View File

@ -45,11 +45,20 @@ namespace AntJob.Extensions
// 两个换行隔开片段
var ss = sqls.Split(new[] { "\r\n\r", "\r\r", "\n\n" }, StringSplitOptions.RemoveEmptyEntries);
var connName = "";
foreach (var item in ss)
{
var section = new SqlSection();
section.Parse(item);
// 如果当前片段未指定连接名,则使用上一个
if (section.ConnName.IsNullOrEmpty())
section.ConnName = connName;
else
connName = section.ConnName;
if (section.ConnName.IsNullOrEmpty()) throw new Exception("未指定连接名!");
list.Add(section);
}

View File

@ -9,6 +9,15 @@
var fields = ViewBag.Fields as IList<FieldItem>;
var set = ViewBag.PageSetting as PageSetting;
}
<style>
.OverFlow {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
max-width: 400px;
}
</style>
<table class="table table-bordered table-hover table-striped table-condensed">
<thead>
<tr>
@ -20,6 +29,7 @@
<th class="text-center"><a href="@Html.Raw(page.GetSortUrl("AppID"))">应用</a></th>
<th class="text-center"><a href="@Html.Raw(page.GetSortUrl("JobID"))">作业</a></th>
<th class="text-center" title="主题。区分作业下多种资源"><a href="@Html.Raw(page.GetSortUrl("Topic"))">主题</a></th>
<th class="text-center">内容</th>
<th class="text-center" style="min-width:134px;"><a href="@Html.Raw(page.GetSortUrl("CreateTime"))">创建时间</a></th>
<th class="text-center" style="min-width:134px;"><a href="@Html.Raw(page.GetSortUrl("UpdateTime"))">更新时间</a></th>
@if (this.Has(PermissionFlags.Detail, PermissionFlags.Update, PermissionFlags.Delete))
@ -40,6 +50,7 @@
<td><a href="App?ID=@entity.AppID">@entity.AppName</a></td>
<td><a href="Job?ID=@entity.JobID">@entity.JobName</a></td>
<td>@entity.Topic</td>
<td class="OverFlow" title="@entity.Data">@entity.Data</td>
<td>@entity.CreateTime.ToFullString("")</td>
<td>@entity.UpdateTime.ToFullString("")</td>
@if (this.Has(PermissionFlags.Detail, PermissionFlags.Update, PermissionFlags.Delete))