v2.0 大版本升级,完全支持执行Sql语句集合,继承定时调度和消息调度。

This commit is contained in:
大石头 2020-04-19 21:36:47 +08:00
parent c7061167d4
commit 98cbadfc0a
17 changed files with 151 additions and 47 deletions

View File

@ -8,9 +8,9 @@
<Description>调度中心下发C#或Sql给蚂蚁代理执行</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0111</Version>
<FileVersion>1.0.2020.0111</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin\Agent</OutputPath>
<AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>

View File

@ -45,8 +45,9 @@ namespace AntJob.Agent
};
// 添加作业处理器
sc.Handlers.Add(new CSharpHandler());
//sc.Handlers.Add(new CSharpHandler());
sc.Handlers.Add(new SqlHandler());
sc.Handlers.Add(new SqlMessage());
// 启动调度引擎,调度器内部多线程处理
sc.Start();

View File

@ -5,9 +5,9 @@
<Description>蚂蚁调度系统数据库结构</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0110</Version>
<FileVersion>1.0.2020.0110</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin</OutputPath>
<DocumentationFile>$(OutputPath)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>

View File

@ -246,6 +246,9 @@ namespace AntJob.Data.Entity
/// <returns></returns>
public JobModel ToModel()
{
// 如果禁用,仅返回最简单的字段
if (!Enable) return new JobModel { Name = Name, Enable = Enable };
return new JobModel
{
Name = Name,
@ -254,6 +257,8 @@ namespace AntJob.Data.Entity
Start = Start,
End = End,
Data = Data,
Offset = Offset,
Step = Step,
BatchSize = BatchSize,
@ -306,8 +311,8 @@ namespace AntJob.Data.Entity
ti.CreateTime = DateTime.Now;
ti.UpdateTime = DateTime.Now;
// 如果有模板,则进行计算替换
if (!Data.IsNullOrEmpty()) ti.Data = TemplateHelper.Build(Data, ti.Start, ti.End);
//// 如果有模板,则进行计算替换
//if (!Data.IsNullOrEmpty()) ti.Data = TemplateHelper.Build(Data, ti.Start, ti.End);
ti.Insert();

View File

@ -5,9 +5,9 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0409</Version>
<FileVersion>1.0.2020.0409</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin</OutputPath>
<DocumentationFile>$(OutputPath)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>

View File

@ -28,10 +28,28 @@ namespace AntJob
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
{
var sqls = ctx.Task.Data as String;
//var sqls = ctx.Task.Data as String;
var sqls = Job.Data;
sqls = TemplateHelper.Build(sqls, ctx.Task.Start, ctx.Task.End);
// 向调度中心返回解析后的Sql语句
ctx.Remark = sqls;
var sections = SqlSection.ParseAll(sqls);
if (sections.Length == 0) return -1;
var rs = ExecuteSql(sections, ctx);
return rs;
}
/// <summary>执行Sql集合</summary>
/// <param name="sections"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public static Int32 ExecuteSql(SqlSection[] sections, JobContext ctx)
{
if (sections == null || sections.Length == 0) return -1;
var rs = 0;
ctx.Total = 0;

View File

@ -0,0 +1,47 @@
using System;
using AntJob.Data;
using AntJob.Extensions;
using AntJob.Handlers;
using NewLife.Data;
using XCode.DataAccessLayer;
namespace AntJob
{
/// <summary>SQL消息处理器使用消息匹配SQL语句</summary>
/// <remarks>
/// 应用型处理器,可直接使用
/// </remarks>
public class SqlMessage : MessageHandler
{
#region
/// <summary>实例化</summary>
public SqlMessage()
{
Topic = "Sql";
//Mode = JobModes.Message;
//var job = Job;
//job.BatchSize = 8;
}
#endregion
/// <summary>根据解码后的消息执行任务</summary>
/// <param name="ctx">上下文</param>
/// <returns></returns>
protected override Int32 Execute(JobContext ctx)
{
var msgs = ctx.Data as String[];
var sqls = Job.Data;
sqls = TemplateHelper.Build(sqls, msgs);
// 向调度中心返回解析后的Sql语句
ctx.Remark = sqls;
var sections = SqlSection.ParseAll(sqls);
if (sections.Length == 0) return -1;
var rs = SqlHandler.ExecuteSql(sections, ctx);
return rs;
}
}
}

View File

@ -7,9 +7,9 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0110</Version>
<FileVersion>1.0.2020.0110</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin\Server</OutputPath>
<AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>

View File

@ -635,7 +635,7 @@ namespace AntJob.Server
{
var app = Session["App"] as App;
var ns = Session as INetSession;
var ip = ns.Remote.Host;
var ip = ns.Remote?.Host;
AppHistory.Create(app, action, success, remark, Local + "", ip);
}

View File

@ -6,9 +6,9 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0110</Version>
<FileVersion>1.0.2020.0110</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin\Web</OutputPath>
<AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>

View File

@ -5,9 +5,9 @@
<Description>分布式任务调度系统纯NET打造的重量级大数据实时计算平台万亿级调度经验积累</Description>
<Company>新生命开发团队</Company>
<Copyright>版权所有(C) 新生命开发团队 2020</Copyright>
<Version>1.0.2020.0409</Version>
<FileVersion>1.0.2020.0409</FileVersion>
<AssemblyVersion>1.0.*</AssemblyVersion>
<Version>2.0.2020.0419</Version>
<FileVersion>2.0.2020.0419</FileVersion>
<AssemblyVersion>2.0.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin</OutputPath>
<DocumentationFile>$(OutputPath)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>

View File

@ -34,6 +34,9 @@ namespace AntJob.Data
/// <summary>调度模式</summary>
JobModes Mode { get; set; }
/// <summary>数据</summary>
String Data { get; set; }
}
public partial class JobModel : IJob { }

View File

@ -54,6 +54,10 @@ namespace AntJob.Data
/// <summary>描述</summary>
[XmlAttribute]
public String Description { get; set; }
/// <summary>数据</summary>
[XmlAttribute]
public String Data { get; set; }
#endregion
#region

View File

@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
using NewLife.Collections;
namespace AntJob.Data
@ -79,5 +77,37 @@ namespace AntJob.Data
// 左括号位置,右括号位置,格式化字符串
return new Tuple<Int32, Int32, String>(p1, p2, format);
}
/// <summary>使用消息数组处理模板</summary>
/// <param name="template"></param>
/// <param name="messages"></param>
/// <returns></returns>
public static String Build(String template, String[] messages)
{
if (template.IsNullOrEmpty()) return template;
var str = template;
var sb = Pool.StringBuilder.Get();
var p = 0;
while (true)
{
var p1 = str.IndexOf("{Message}", p);
if (p1 < 0)
{
sb.Append(str.Substring(p));
break;
}
// 准备替换
var val = messages.Join();
sb.Append(str.Substring(p, p1 - p));
sb.Append(val);
// 移动指针
p = p1 + "{Message}".Length;
}
return sb.Put(true);
}
}
}

View File

@ -7,9 +7,8 @@ using NewLife.Serialization;
namespace AntJob.Handlers
{
/// <summary>消息调度基类</summary>
/// <typeparam name="TModel">消息模型类</typeparam>
public abstract class MessageHandler<TModel> : Handler
/// <summary>消息调度基类消费的消息在Data中返回</summary>
public abstract class MessageHandler : Handler
{
#region
/// <summary>主题。设置后使用消费调度模式</summary>
@ -62,18 +61,18 @@ namespace AntJob.Handlers
var ss = ctx.Task.Data.ToJsonEntity<String[]>();
if (ss == null || ss.Length == 0) return;
// 消息作业特殊优待字符串不需要再次Json解码
if (typeof(TModel) == typeof(String))
{
ctx.Total = ss.Length;
ctx.Data = ss;
}
else
{
var ms = ss.Select(e => e.ToJsonEntity<TModel>()).ToList();
ctx.Total = ms.Count;
ctx.Data = ms;
}
//// 消息作业特殊优待字符串不需要再次Json解码
//if (typeof(TModel) == typeof(String))
//{
ctx.Total = ss.Length;
ctx.Data = ss;
//}
//else
//{
// var ms = ss.Select(e => e.ToJsonEntity<TModel>()).ToList();
// ctx.Total = ms.Count;
// ctx.Data = ms;
//}
Execute(ctx);
}
@ -84,12 +83,12 @@ namespace AntJob.Handlers
protected override Int32 Execute(JobContext ctx)
{
var count = 0;
foreach (var item in ctx.Data as IEnumerable)
foreach (String item in ctx.Data as IEnumerable)
{
//ctx.Key = item as String;
//ctx.Entity = item;
if (ProcessItem(ctx, (TModel)item)) count++;
if (ProcessItem(ctx, item)) count++;
}
return count;
@ -99,7 +98,7 @@ namespace AntJob.Handlers
/// <param name="ctx">上下文</param>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual Boolean ProcessItem(JobContext ctx, TModel message) => true;
protected virtual Boolean ProcessItem(JobContext ctx, String message) => true;
#endregion
}
}

View File

@ -98,7 +98,7 @@ namespace AntJob.Providers
if (rs.TryGetValue("Secret", out var secret))
{
set.Secret = secret + "";
set.SaveAsync();
set.Save();
}
Logined = true;

View File

@ -200,9 +200,6 @@ namespace AntJob.Providers
else
{
task.Status = JobStatus.;
if (ctx["Message"] is String msg) task.Message = msg;
task.Cost = (Int32)(ctx.Cost / 1000);
}
if (task.Message.IsNullOrEmpty()) task.Message = ctx.Remark;