新增Http客户端作业提供者,对接AntWeb的AntJob接口

This commit is contained in:
大石头 2024-01-18 11:04:42 +08:00
parent 52385ae8cf
commit c6949c787f
8 changed files with 365 additions and 47 deletions

View File

@ -18,16 +18,18 @@ var scheduler = new Scheduler
{
ServiceProvider = services.BuildServiceProvider(),
// 使用分布式调度引擎替换默认的本地文件调度
Provider = new NetworkJobProvider
{
Debug = set.Debug,
Server = set.Server,
AppID = set.AppID,
Secret = set.Secret,
}
//// 使用分布式调度引擎替换默认的本地文件调度
//Provider = new NetworkJobProvider
//{
// Debug = set.Debug,
// Server = set.Server,
// AppID = set.AppID,
// Secret = set.Secret,
//}
};
scheduler.Join(set.Server, set.AppID, set.Secret, set.Debug);
// 添加作业处理器
//sc.Handlers.Add(new CSharpHandler());
scheduler.AddHandler<SqlHandler>();

View File

@ -1,6 +1,5 @@
using AntJob.Server;
using AntJob.Server.Services;
using NewLife;
using NewLife.Caching;
using NewLife.Caching.Services;
using NewLife.Log;
@ -12,10 +11,7 @@ using XCode;
XTrace.UseConsole();
var services = ObjectContainer.Current;
var star = services.AddStardust();
if (star.Server.IsNullOrEmpty()) services.AddSingleton<ITracer>(DefaultTracer.Instance ??= new DefaultTracer());
services.AddSingleton(XTrace.Log);
services.AddStardust();
// 默认数据目录
var set = NewLife.Setting.Current;
@ -32,6 +28,8 @@ if (set2.IsNew)
set2.Save();
}
services.AddSingleton(AntJobSetting.Current);
// 分布式缓存锚定配置中心RedisCache若无配置则使用本地MemoryCache
// 集群部署时务必使用RedisCache内部将使用Redis实现分布式锁
services.AddSingleton<ICacheProvider, RedisCacheProvider>();

View File

@ -19,6 +19,8 @@ using IActionFilter = Microsoft.AspNetCore.Mvc.Filters.IActionFilter;
namespace AntJob.Web.Controllers;
[ApiController]
[Route("[controller]")]
public class AntJobController : ControllerBase, IActionFilter
{
/// <summary>令牌</summary>

View File

@ -1,8 +1,5 @@
using AntJob.Server.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using AntJob.Server;
using AntJob.Server.Services;
using NewLife.Cube;
using XCode;
@ -26,6 +23,8 @@ public class Startup
set.Save();
}
services.AddSingleton(AntJobSetting.Current);
services.AddSingleton<AppService>();
services.AddSingleton<JobService>();
@ -64,5 +63,6 @@ public class Startup
// 启用星尘注册中心,向注册中心注册服务,服务消费者将自动更新服务端地址列表
app.RegisterService("AntWeb", null, env.EnvironmentName);
app.RegisterService("AntServer", null, env.EnvironmentName);
}
}

View File

@ -0,0 +1,258 @@
using AntJob.Data;
using AntJob.Handlers;
using AntJob.Models;
using NewLife;
using NewLife.Log;
using NewLife.Remoting;
using NewLife.Threading;
namespace AntJob.Providers;
/// <summary>Http任务提供者</summary>
public class HttpJobProvider : JobProvider
{
#region
/// <summary>调试,打开编码日志</summary>
public Boolean Debug { get; set; }
/// <summary>调度中心地址</summary>
public String Server { get; set; }
/// <summary>应用编号</summary>
public String AppId { get; set; }
/// <summary>应用密钥</summary>
public String Secret { get; set; }
/// <summary>客户端</summary>
public ApiHttpClient Client { get; set; }
/// <summary>邻居伙伴。用于应用判断自身有多少个实例在运行</summary>
public IPeer[] Peers { get; private set; }
/// <summary>性能跟踪器</summary>
public ITracer Tracer { get; set; }
#endregion
#region
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
_timer.TryDispose();
_timer = null;
}
#endregion
#region
/// <summary>初始化</summary>
public void Init()
{
var svr = Server?.Split(",").Where(e => e.StartsWithIgnoreCase("http://", "https://")).Join(",");
// 使用配置中心账号
var ant = new ApiHttpClient(svr)
{
Tracer = Tracer,
};
// 断开前一个连接
Client.TryDispose();
Client = ant;
}
/// <summary>开始</summary>
public override void Start()
{
if (Client == null) Init();
var bs = Schedule?.Handlers;
// 遍历所有处理器,添加作业到调度中心
//var jobs = GetJobs(ws.Select(e => e.Name).ToArray());
var list = new List<IJob>();
foreach (var handler in bs)
{
var job = handler.Job ?? new JobModel();
job.Name = handler.Name;
job.ClassName = handler.GetType().FullName;
job.Mode = handler.Mode;
// 描述
if (job is JobModel job2)
{
var dis = handler.GetType().GetDisplayName();
if (!dis.IsNullOrEmpty()) job2.DisplayName = dis;
var des = handler.GetType().GetDescription();
if (!des.IsNullOrEmpty()) job2.Description = des;
if (handler is MessageHandler mhandler) job2.Topic = mhandler.Topic;
}
list.Add(job);
}
if (list.Count > 0) Client.Post<Object>("/AntJob/AddJobs", new { jobs = list.ToArray() });
// 定时更新邻居
_timer = new TimerX(DoCheckPeer, null, 1_000, 30_000) { Async = true };
}
/// <summary>停止</summary>
public override void Stop()
{
// 断开前一个连接
Client.TryDispose();
Client = null;
}
#endregion
#region
private IJob[] _jobs;
private DateTime _NextGetJobs;
/// <summary>获取所有作业名称</summary>
/// <returns></returns>
public override IJob[] GetJobs()
{
// 周期性获取,避免请求过快
var now = TimerX.Now;
if (_jobs == null || _NextGetJobs <= now)
{
_NextGetJobs = now.AddSeconds(5);
_jobs = Client.Get<JobModel[]>("/AntJob/GetJobs");
}
return _jobs;
}
/// <summary>申请任务</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public override ITask[] Acquire(IJob job, String topic, Int32 count) => Client.Post<TaskModel[]>("/AntJob/Acquire", new AcquireModel { Job = job.Name, Topic = topic, Count = count });
/// <summary>生产消息</summary>
/// <param name="job">作业</param>
/// <param name="topic">主题</param>
/// <param name="messages">消息集合</param>
/// <param name="option">消息选项</param>
/// <returns></returns>
public override Int32 Produce(String job, String topic, String[] messages, MessageOption option = null)
{
if (topic.IsNullOrEmpty() || messages == null || messages.Length < 1) return 0;
var model = new ProduceModel
{
Job = job,
Topic = topic,
Messages = messages,
};
if (option != null)
{
model.DelayTime = option.DelayTime;
model.Unique = option.Unique;
}
return Client.Post<Int32>("/AntJob/Product", model);
}
#endregion
#region
/// <summary>报告进度,每个任务多次调用</summary>
/// <param name="ctx">上下文</param>
public override void Report(JobContext ctx)
{
// 不用上报抽取中
if (ctx.Status == JobStatus.) return;
if (ctx?.Result is not TaskResult task) return;
// 区分抽取和处理
task.Status = ctx.Status;
task.Speed = ctx.Speed;
task.Total = ctx.Total;
task.Success = ctx.Success;
Report(ctx.Handler.Job, task);
}
/// <summary>完成任务,每个任务只调用一次</summary>
/// <param name="ctx">上下文</param>
public override void Finish(JobContext ctx)
{
if (ctx?.Result is not TaskResult task) return;
task.Speed = ctx.Speed;
task.Total = ctx.Total;
task.Success = ctx.Success;
task.Times++;
// 区分正常完成还是错误终止
if (ctx.Error != null)
{
task.Error++;
task.Status = JobStatus.;
var ex = ctx.Error?.GetTrue();
if (ex != null)
{
var msg = ctx.Error.GetMessage();
if (msg.Contains("Exception:")) msg = msg.Substring("Exception:").Trim();
task.Message = msg;
}
}
else
{
task.Status = JobStatus.;
task.Cost = (Int32)Math.Round(ctx.Cost / 1000);
}
if (task.Message.IsNullOrEmpty()) task.Message = ctx.Remark;
task.Key = ctx.Key;
Report(ctx.Handler.Job, task);
}
private void Report(IJob job, ITaskResult task)
{
try
{
Client.Post<Boolean>("/AntJob/Report", task);
}
catch (Exception ex)
{
XTrace.WriteLine("[{0}]的[{1}]状态报告失败!{2}", job, task.Status, ex.GetTrue().Message);
}
}
#endregion
#region
private TimerX _timer;
private void DoCheckPeer(Object state)
{
var ps = Client?.Get<PeerModel[]>("/AntJob/GetPeers");
if (ps == null || ps.Length == 0) return;
var old = (Peers ?? new IPeer[0]).ToList();
foreach (var item in ps)
{
var pr = old.FirstOrDefault(e => e.Instance == item.Instance);
if (pr == null)
XTrace.WriteLine("[{0}]上线!{1}", item.Instance, item.Machine);
else
old.Remove(pr);
}
foreach (var item in old)
{
XTrace.WriteLine("[{0}]下线!{1}", item.Instance, item.Machine);
}
Peers = ps;
}
#endregion
}

View File

@ -18,7 +18,7 @@ public class NetworkJobProvider : JobProvider
public String Server { get; set; }
/// <summary>应用编号</summary>
public String AppID { get; set; }
public String AppId { get; set; }
/// <summary>应用密钥</summary>
public String Secret { get; set; }
@ -54,7 +54,7 @@ public class NetworkJobProvider : JobProvider
// 使用配置中心账号
var ant = new AntClient(svr)
{
UserName = AppID,
UserName = AppId,
Password = Secret,
Tracer = Tracer,
};

View File

@ -5,8 +5,10 @@ using NewLife;
using NewLife.Log;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Remoting;
using NewLife.Threading;
using Stardust.Registry;
using static System.Net.WebRequestMethods;
namespace AntJob;
@ -56,6 +58,60 @@ public class Scheduler : DisposeBase
#endregion
#region
/// <summary>加入调度中心从注册中心获取地址自动识别RPC/Http</summary>
/// <param name="server"></param>
/// <param name="appId"></param>
/// <param name="secret"></param>
/// <param name="debug"></param>
/// <returns></returns>
public IJobProvider Join(String server, String appId, String secret, Boolean debug = false)
{
var registry = ServiceProvider?.GetService<IRegistry>();
if (registry != null)
{
var svrs = registry.ResolveAddressAsync("AntServer").Result;
if (svrs != null && svrs.Length > 0) server = svrs.Join();
}
if (server.IsNullOrEmpty()) return null;
// 根据地址决定用Http还是RPC
var servers = server.Split(",");
if (servers.Any(e => e.StartsWithIgnoreCase("http://", "https://")))
{
var http = new HttpJobProvider
{
Debug = debug,
Server = server,
AppId = appId,
Secret = secret,
};
// 如果有注册中心,则使用注册中心的服务发现
if (registry != null)
{
//http.Client = registry.CreateForService("AntServer") as ApiHttpClient;
//http.Client.RoundRobin = false;
}
Provider = http;
}
else
{
var rpc = new NetworkJobProvider
{
Debug = debug,
Server = server,
AppId = appId,
Secret = secret,
};
Provider = rpc;
}
return Provider;
}
/// <summary>开始</summary>
public void Start()
{
@ -71,22 +127,22 @@ public class Scheduler : DisposeBase
prv ??= Provider = new FileJobProvider();
prv.Schedule ??= this;
if (prv is NetworkJobProvider network)
{
network.Tracer ??= Tracer;
//if (prv is NetworkJobProvider network)
//{
// network.Tracer ??= Tracer;
// 从注册中心获取服务端地址,优先于本地配置文件
if (network.Server.IsNullOrEmpty() || network.Server.EqualIgnoreCase(AntSetting.Current.Server))
{
// 从注册中心获取包
var registry = ServiceProvider?.GetService<IRegistry>();
if (registry != null)
{
var svrs = registry.ResolveAddressAsync("AntServer").Result;
if (svrs != null && svrs.Length > 0) network.Server = svrs.Join();
}
}
}
// // 从注册中心获取服务端地址,优先于本地配置文件
// if (network.Server.IsNullOrEmpty() || network.Server.EqualIgnoreCase(AntSetting.Current.Server))
// {
// // 从注册中心获取包
// var registry = ServiceProvider?.GetService<IRegistry>();
// if (registry != null)
// {
// var svrs = registry.ResolveAddressAsync("AntServer").Result;
// if (svrs != null && svrs.Length > 0) network.Server = svrs.Join();
// }
// }
//}
prv.Start();

View File

@ -20,27 +20,29 @@ public class JobHost : BackgroundService
var set = AntSetting.Current;
// 实例化调度器
var sc = new Scheduler
var scheduler = new Scheduler
{
ServiceProvider = _serviceProvider,
// 使用分布式调度引擎替换默认的本地文件调度
Provider = new NetworkJobProvider
{
Server = set.Server,
AppID = set.AppID,
Secret = set.Secret,
Debug = false
}
//// 使用分布式调度引擎替换默认的本地文件调度
//Provider = new NetworkJobProvider
//{
// Server = set.Server,
// AppID = set.AppID,
// Secret = set.Secret,
// Debug = false
//}
};
scheduler.Join(set.Server, set.AppID, set.Secret, set.Debug);
// 添加作业
sc.AddHandler<HelloJob>();
scheduler.AddHandler<HelloJob>();
// 启动调度引擎,调度器内部多线程处理
sc.Start();
_scheduler = sc;
scheduler.Start();
_scheduler = scheduler;
return Task.CompletedTask;
}