diff --git a/AntJob.Agent/AntJob.Agent.csproj b/AntJob.Agent/AntJob.Agent.csproj index bc9ba57..5dd594b 100644 --- a/AntJob.Agent/AntJob.Agent.csproj +++ b/AntJob.Agent/AntJob.Agent.csproj @@ -30,6 +30,16 @@ full true + + + + + + PreserveNewest + true + PreserveNewest + + diff --git a/AntJob.Agent/appsettings.json b/AntJob.Agent/appsettings.json new file mode 100644 index 0000000..cbafde4 --- /dev/null +++ b/AntJob.Agent/appsettings.json @@ -0,0 +1,18 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Membership": { + "connectionString": "Data Source=..\\Data\\Membership.db", + "providerName": "SQLite" + }, + "Log": { + "connectionString": "Data Source=..\\Data\\Log.db", + "providerName": "SQLite" + }, + } +} diff --git a/AntJob.Extensions/SqlHandler.cs b/AntJob.Extensions/SqlHandler.cs index c210714..187be39 100644 --- a/AntJob.Extensions/SqlHandler.cs +++ b/AntJob.Extensions/SqlHandler.cs @@ -1,6 +1,8 @@ using System; +using System.Linq; using AntJob.Data; using AntJob.Extensions; +using AntJob.Providers; using NewLife.Data; using XCode.DataAccessLayer; @@ -37,7 +39,7 @@ namespace AntJob var sections = SqlSection.ParseAll(sqls); if (sections.Length == 0) return -1; - var rs = ExecuteSql(sections, ctx); + var rs = ExecuteSql(sections, ctx, (section, dt) => SqlMessage.ProcessMessage(dt, ctx)); return rs; } @@ -45,8 +47,9 @@ namespace AntJob /// 执行Sql集合 /// /// + /// /// - public static Int32 ExecuteSql(SqlSection[] sections, JobContext ctx) + public static Int32 ExecuteSql(SqlSection[] sections, JobContext ctx, Action callback = null) { if (sections == null || sections.Length == 0) return -1; @@ -62,19 +65,23 @@ namespace AntJob { // 按顺序执行处理Sql语句 DbTable dt = null; - foreach (var item in sections) + foreach (var section in sections) { - switch (item.Action) + switch (section.Action) { case SqlActions.Query: - dt = item.Query(); + dt = section.Query(); if (dt != null) ctx.Total += dt.Rows.Count; + + // 处理生产消息 + callback?.Invoke(section, dt); + break; case SqlActions.Execute: - rs += item.Execute(); + rs += section.Execute(); break; case SqlActions.Insert: - rs += item.BatchInsert(dt); + rs += section.BatchInsert(dt); break; default: break; diff --git a/AntJob.Extensions/SqlMessage.cs b/AntJob.Extensions/SqlMessage.cs index 5ee2026..92f7ea1 100644 --- a/AntJob.Extensions/SqlMessage.cs +++ b/AntJob.Extensions/SqlMessage.cs @@ -1,9 +1,11 @@ using System; +using System.Linq; +using System.Threading; using AntJob.Data; using AntJob.Extensions; using AntJob.Handlers; +using AntJob.Providers; using NewLife.Data; -using XCode.DataAccessLayer; namespace AntJob { @@ -15,14 +17,7 @@ namespace AntJob { #region 构造 /// 实例化 - public SqlMessage() - { - Topic = "Sql"; - //Mode = JobModes.Message; - - //var job = Job; - //job.BatchSize = 8; - } + public SqlMessage() => Topic = "Sql";//Mode = JobModes.Message;//var job = Job;//job.BatchSize = 8; #endregion /// 根据解码后的消息执行任务 @@ -36,12 +31,35 @@ namespace AntJob // 向调度中心返回解析后的Sql语句 ctx.Remark = sqls; + // 分解Sql语句得到片段数组 var sections = SqlSection.ParseAll(sqls); if (sections.Length == 0) return -1; - var rs = SqlHandler.ExecuteSql(sections, ctx); + // 依次执行Sql片段数组。遇到query时,可能需要生产消息 + var rs = SqlHandler.ExecuteSql(sections, ctx, (section, dt) => ProcessMessage(dt, ctx)); return rs; } + + internal static void ProcessMessage(DbTable dt, JobContext ctx) + { + if (dt == null || dt.Columns == null || dt.Columns.Length == 0 || dt.Rows == null || dt.Rows.Count == 0) return; + + // select id as topic_roleId, id as topic_myId from role where updatetime>='{Start}' and updatetime<'{End}' + + for (var i = 0; i < dt.Columns.Length; i++) + { + var col = dt.Columns[i]; + if (col.StartsWithIgnoreCase("topic_")) + { + var topic = col.Substring("topic_".Length); + var messages = dt.Rows.Select(e => "{0}".F(e[i])).Distinct().ToArray(); + if (messages.Length > 0) + { + ctx.Handler.Produce(topic, messages, new MessageOption { Unique = true }); + } + } + } + } } } \ No newline at end of file diff --git a/AntJob.Extensions/SqlSection.cs b/AntJob.Extensions/SqlSection.cs index a4a45ba..0c8ce71 100644 --- a/AntJob.Extensions/SqlSection.cs +++ b/AntJob.Extensions/SqlSection.cs @@ -45,11 +45,20 @@ namespace AntJob.Extensions // 两个换行隔开片段 var ss = sqls.Split(new[] { "\r\n\r", "\r\r", "\n\n" }, StringSplitOptions.RemoveEmptyEntries); + var connName = ""; foreach (var item in ss) { var section = new SqlSection(); section.Parse(item); + // 如果当前片段未指定连接名,则使用上一个 + if (section.ConnName.IsNullOrEmpty()) + section.ConnName = connName; + else + connName = section.ConnName; + + if (section.ConnName.IsNullOrEmpty()) throw new Exception("未指定连接名!"); + list.Add(section); } diff --git a/AntJob.Web/Areas/Ant/Views/AppMessage/_List_Data.cshtml b/AntJob.Web/Areas/Ant/Views/AppMessage/_List_Data.cshtml index ef9b4aa..74e0d2b 100644 --- a/AntJob.Web/Areas/Ant/Views/AppMessage/_List_Data.cshtml +++ b/AntJob.Web/Areas/Ant/Views/AppMessage/_List_Data.cshtml @@ -9,6 +9,15 @@ var fields = ViewBag.Fields as IList; var set = ViewBag.PageSetting as PageSetting; } + @@ -20,6 +29,7 @@ + @if (this.Has(PermissionFlags.Detail, PermissionFlags.Update, PermissionFlags.Delete)) @@ -40,6 +50,7 @@ + @if (this.Has(PermissionFlags.Detail, PermissionFlags.Update, PermissionFlags.Delete))
应用 作业 主题内容 创建时间 更新时间@entity.AppName @entity.JobName @entity.Topic@entity.Data @entity.CreateTime.ToFullString("") @entity.UpdateTime.ToFullString("")