借助星尘注册中心交换蚂蚁调度服务器地址

This commit is contained in:
大石头 2022-09-21 17:17:21 +08:00
parent 3efcbb4447
commit dcc8021a0c
7 changed files with 310 additions and 297 deletions

View File

@ -2,41 +2,48 @@
using System.Threading;
using AntJob.Providers;
using NewLife.Log;
using NewLife.Model;
using Stardust;
namespace AntJob.Agent
namespace AntJob.Agent;
class Program
{
class Program
static void Main(String[] args)
{
static void Main(String[] args)
// 启用控制台日志,拦截所有异常
XTrace.UseConsole();
var services = ObjectContainer.Current;
var star = new StarFactory();
var set = AntSetting.Current;
// 实例化调度器
var scheduler = new Scheduler
{
// 启用控制台日志,拦截所有异常
XTrace.UseConsole();
var set = AntSetting.Current;
// 实例化调度器
var scheduler = new Scheduler();
ServiceProvider = services.BuildServiceProvider(),
// 使用分布式调度引擎替换默认的本地文件调度
scheduler.Provider = new NetworkJobProvider
Provider = new NetworkJobProvider
{
Debug = set.Debug,
Server = set.Server,
AppID = set.AppID,
Secret = set.Secret,
};
}
};
// 添加作业处理器
//sc.Handlers.Add(new CSharpHandler());
scheduler.AddHandler<SqlHandler>();
scheduler.AddHandler<SqlMessage>();
// 添加作业处理器
//sc.Handlers.Add(new CSharpHandler());
scheduler.AddHandler<SqlHandler>();
scheduler.AddHandler<SqlMessage>();
// 启动调度引擎,调度器内部多线程处理
scheduler.Start();
// 启动调度引擎,调度器内部多线程处理
scheduler.Start();
// 友好退出
//ObjectContainer.Current.BuildHost().Run();
Thread.Sleep(-1);
}
// 友好退出
//ObjectContainer.Current.BuildHost().Run();
Thread.Sleep(-1);
}
}

View File

@ -1,7 +1,6 @@
using System.Diagnostics;
using System.Net;
using AntJob.Data.Entity;
using Microsoft.Win32;
using NewLife;
using NewLife.Caching;
using NewLife.Log;
@ -15,7 +14,7 @@ public class Worker : IHostedService
{
private readonly IRegistry _registry;
public Worker(IRegistry registry) => _registry = registry;
public Worker(IServiceProvider provider) => _registry = provider.GetService<IRegistry>();
public async Task StartAsync(CancellationToken cancellationToken)
{
@ -53,7 +52,7 @@ public class Worker : IHostedService
_clearItemTimer = new TimerX(ClearItems, null, 10_000, 3600_000) { Async = true };
// 启用星尘注册中心,向注册中心注册服务,服务消费者将自动更新服务端地址列表
await _registry.RegisterAsync("Ant.Server", $"tcp://*:{server.Port}");
await _registry?.RegisterAsync("Ant.Server", $"tcp://*:{server.Port}");
}
public Task StopAsync(CancellationToken cancellationToken)

View File

@ -47,6 +47,7 @@
<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.0.2022.920-beta0209" />
<PackageReference Include="NewLife.Remoting" Version="2.0.2022.901" />
<PackageReference Include="NewLife.Stardust" Version="2.3.2022.917-beta1649" />
</ItemGroup>
<ItemGroup>

View File

@ -1,7 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using AntJob.Data;
using AntJob.Data;
using AntJob.Handlers;
using AntJob.Providers;
using NewLife;
@ -9,237 +6,253 @@ using NewLife.Log;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Threading;
using Stardust.Registry;
namespace AntJob
namespace AntJob;
/// <summary>作业调度器</summary>
public class Scheduler : DisposeBase
{
/// <summary>作业调度器</summary>
public class Scheduler : DisposeBase
#region
/// <summary>处理器集合</summary>
public List<Handler> Handlers { get; } = new List<Handler>();
/// <summary>作业提供者</summary>
public IJobProvider Provider { get; set; }
/// <summary>服务提供者</summary>
public IServiceProvider ServiceProvider { get; set; }
/// <summary>性能跟踪器</summary>
public ITracer Tracer { get; set; }
#endregion
#region
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
#region
/// <summary>处理器集合</summary>
public List<Handler> Handlers { get; } = new List<Handler>();
base.Dispose(disposing);
/// <summary>作业提供者</summary>
public IJobProvider Provider { get; set; }
Stop();
}
#endregion
/// <summary>性能跟踪器</summary>
public ITracer Tracer { get; set; }
#endregion
#region
/// <summary>添加处理器</summary>
/// <param name="handler"></param>
public void AddHandler(Handler handler) => Handlers.Add(handler);
#region
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
/// <summary>按类型添加处理器,支持依赖注入</summary>
/// <typeparam name="T"></typeparam>
public void AddHandler<T>() where T : Handler
{
var services = ObjectContainer.Current;
var prv = ObjectContainer.Provider;
services.AddTransient<T>();
Handlers.Add(prv.GetService<T>());
}
#endregion
#region
/// <summary>开始</summary>
public void Start()
{
// 检查本地添加的处理器
var hs = Handlers;
if (hs.Count == 0) throw new ArgumentNullException(nameof(Handlers), "没有可用处理器");
// 启动作业提供者
var prv = Provider;
prv ??= Provider = new FileJobProvider();
prv.Schedule ??= this;
// 从注册中心获取包
if (prv is NetworkJobProvider network && network.Server.IsNullOrEmpty())
{
base.Dispose(disposing);
Stop();
}
#endregion
#region
/// <summary>添加处理器</summary>
/// <param name="handler"></param>
public void AddHandler(Handler handler) => Handlers.Add(handler);
/// <summary>按类型添加处理器,支持依赖注入</summary>
/// <typeparam name="T"></typeparam>
public void AddHandler<T>() where T : Handler
{
var services = ObjectContainer.Current;
var prv = ObjectContainer.Provider;
services.AddTransient<T>();
Handlers.Add(prv.GetService<T>());
}
#endregion
#region
/// <summary>开始</summary>
public void Start()
{
// 检查本地添加的处理器
var hs = Handlers;
if (hs.Count == 0) throw new ArgumentNullException(nameof(Handlers), "没有可用处理器");
// 启动作业提供者
var prv = Provider;
if (prv == null) prv = Provider = new FileJobProvider();
if (prv.Schedule == null) prv.Schedule = this;
prv.Start();
// 获取本应用在调度中心管理的所有作业
var jobs = prv.GetJobs();
if (jobs == null || jobs.Length == 0) throw new Exception("调度中心没有可用作业");
// 输出日志
var msg = $"启动任务调度引擎[{prv}],作业[{hs.Count}]项,定时{Period}秒";
XTrace.WriteLine(msg);
// 设置日志
foreach (var handler in hs)
var registry = ServiceProvider?.GetService<IRegistry>();
if (registry != null)
{
handler.Schedule = this;
handler.Provider = prv;
var svrs = registry.ResolveAddressAsync("Ant.Server").Result;
// 查找作业参数,分配给处理器
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
if (job != null && job.Mode == 0) job.Mode = handler.Mode;
handler.Job = job;
handler.Log = XTrace.Log;
handler.Start();
}
// 定时执行
if (Period > 0) _timer = new TimerX(Loop, null, 100, Period * 1000, "Job") { Async = true };
}
/// <summary>停止</summary>
public void Stop()
{
_timer.TryDispose();
_timer = null;
Provider?.Stop();
foreach (var handler in Handlers)
{
handler.Stop();
if (svrs != null && svrs.Length > 0) network.Server = svrs.Join();
}
}
/// <summary>任务调度</summary>
/// <returns></returns>
public Boolean Process()
prv.Start();
// 获取本应用在调度中心管理的所有作业
var jobs = prv.GetJobs();
if (jobs == null || jobs.Length == 0) throw new Exception("调度中心没有可用作业");
// 输出日志
var msg = $"启动任务调度引擎[{prv}],作业[{hs.Count}]项,定时{Period}秒";
XTrace.WriteLine(msg);
// 设置日志
foreach (var handler in hs)
{
var prv = Provider;
handler.Schedule = this;
handler.Provider = prv;
// 查询所有处理器
var hs = Handlers;
// 查找作业参数,分配给处理器
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
if (job != null && job.Mode == 0) job.Mode = handler.Mode;
handler.Job = job;
// 拿到处理器对应的作业
var jobs = prv.GetJobs();
if (jobs == null) return false;
// 运行时动态往集合里面加处理器为了配合Sql+C#
CheckHandlers(prv, jobs, hs);
var flag = false;
// 遍历处理器,给空闲的增加任务
foreach (var handler in hs)
{
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
// 找不到或者已停用
if (job == null || !job.Enable)
{
if (handler.Active) handler.Stop();
continue;
}
// 可能外部添加的Worker并不完整
handler.Schedule = this;
handler.Provider = prv;
// 更新作业参数,并启动处理器
handler.Job = job;
if (job.Mode == 0) job.Mode = handler.Mode;
if (!handler.Active) handler.Start();
// 如果正在处理任务数没达到最大并行度,则继续安排任务
var max = job.MaxTask;
if (prv is NetworkJobProvider nprv)
{
// 如果是网络提供者,则根据在线节点数平分并行度
var ps = nprv.Peers;
if (ps != null && ps.Length > 0)
{
max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
}
}
var count = max - handler.Busy;
if (count > 0)
{
// 循环申请任务,喂饱处理器
var ts = handler.Acquire(count);
// 送给处理器处理
for (var i = 0; i < count && ts != null && i < ts.Length; i++)
{
// 准备就绪增加Busy避免超额分配
handler.Prepare(ts[i]);
// 使用线程池调度避免Task排队影响使用
ThreadPool.QueueUserWorkItem(s => handler.Process(s as ITask), ts[i]);
}
if (ts != null && ts.Length > 0) flag = true;
}
}
return flag;
handler.Log = XTrace.Log;
handler.Start();
}
private void CheckHandlers(IJobProvider provider, IList<IJob> jobs, IList<Handler> handlers)
// 定时执行
if (Period > 0) _timer = new TimerX(Loop, null, 100, Period * 1000, "Job") { Async = true };
}
/// <summary>停止</summary>
public void Stop()
{
_timer.TryDispose();
_timer = null;
Provider?.Stop();
foreach (var handler in Handlers)
{
foreach (var job in jobs)
handler.Stop();
}
}
/// <summary>任务调度</summary>
/// <returns></returns>
public Boolean Process()
{
var prv = Provider;
// 查询所有处理器
var hs = Handlers;
// 拿到处理器对应的作业
var jobs = prv.GetJobs();
if (jobs == null) return false;
// 运行时动态往集合里面加处理器为了配合Sql+C#
CheckHandlers(prv, jobs, hs);
var flag = false;
// 遍历处理器,给空闲的增加任务
foreach (var handler in hs)
{
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
// 找不到或者已停用
if (job == null || !job.Enable)
{
var handler = handlers.FirstOrDefault(e => e.Name == job.Name);
if (handler == null && job.Enable && !job.ClassName.IsNullOrEmpty())
if (handler.Active) handler.Stop();
continue;
}
// 可能外部添加的Worker并不完整
handler.Schedule = this;
handler.Provider = prv;
// 更新作业参数,并启动处理器
handler.Job = job;
if (job.Mode == 0) job.Mode = handler.Mode;
if (!handler.Active) handler.Start();
// 如果正在处理任务数没达到最大并行度,则继续安排任务
var max = job.MaxTask;
if (prv is NetworkJobProvider nprv)
{
// 如果是网络提供者,则根据在线节点数平分并行度
var ps = nprv.Peers;
if (ps != null && ps.Length > 0)
{
XTrace.WriteLine("发现未知作业[{0}]@[{1}]", job.Name, job.ClassName);
try
max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
}
}
var count = max - handler.Busy;
if (count > 0)
{
// 循环申请任务,喂饱处理器
var ts = handler.Acquire(count);
// 送给处理器处理
for (var i = 0; i < count && ts != null && i < ts.Length; i++)
{
// 准备就绪增加Busy避免超额分配
handler.Prepare(ts[i]);
// 使用线程池调度避免Task排队影响使用
ThreadPool.QueueUserWorkItem(s => handler.Process(s as ITask), ts[i]);
}
if (ts != null && ts.Length > 0) flag = true;
}
}
return flag;
}
private void CheckHandlers(IJobProvider provider, IList<IJob> jobs, IList<Handler> handlers)
{
foreach (var job in jobs)
{
var handler = handlers.FirstOrDefault(e => e.Name == job.Name);
if (handler == null && job.Enable && !job.ClassName.IsNullOrEmpty())
{
XTrace.WriteLine("发现未知作业[{0}]@[{1}]", job.Name, job.ClassName);
try
{
// 实例化一个处理器
var type = Type.GetType(job.ClassName);
if (type == null) type = handlers.Where(e => e.GetType().FullName == job.ClassName)?.FirstOrDefault()?.GetType();
if (type != null)
{
// 实例化一个处理器
var type = Type.GetType(job.ClassName);
if (type == null) type = handlers.Where(e => e.GetType().FullName == job.ClassName)?.FirstOrDefault()?.GetType();
if (type != null)
handler = type.CreateInstance() as Handler;
if (handler != null)
{
handler = type.CreateInstance() as Handler;
if (handler != null)
{
XTrace.WriteLine("添加新作业[{0}]@[{1}]", job.Name, job.ClassName);
XTrace.WriteLine("添加新作业[{0}]@[{1}]", job.Name, job.ClassName);
handler.Name = job.Name;
handler.Schedule = this;
handler.Provider = provider;
handler.Name = job.Name;
handler.Schedule = this;
handler.Provider = provider;
if (handler is MessageHandler messageHandler && !job.Topic.IsNullOrEmpty()) messageHandler.Topic = job.Topic;
if (handler is MessageHandler messageHandler && !job.Topic.IsNullOrEmpty()) messageHandler.Topic = job.Topic;
handler.Log = XTrace.Log;
handler.Start();
handler.Log = XTrace.Log;
handler.Start();
handlers.Add(handler);
}
handlers.Add(handler);
}
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
}
}
/// <summary>已完成</summary>
/// <param name="ctx"></param>
internal protected virtual void OnFinish(JobContext ctx) => _timer?.SetNext(-1);
#endregion
#region
/// <summary>定时轮询周期。默认5秒</summary>
public Int32 Period { get; set; } = 5;
private TimerX _timer;
private void Loop(Object state)
{
// 任务调度
var rs = Process();
// 如果有数据,马上开始下一轮
if (rs) TimerX.Current.SetNext(-1);
}
#endregion
}
/// <summary>已完成</summary>
/// <param name="ctx"></param>
internal protected virtual void OnFinish(JobContext ctx) => _timer?.SetNext(-1);
#endregion
#region
/// <summary>定时轮询周期。默认5秒</summary>
public Int32 Period { get; set; } = 5;
private TimerX _timer;
private void Loop(Object state)
{
// 任务调度
var rs = Process();
// 如果有数据,马上开始下一轮
if (rs) TimerX.Current.SetNext(-1);
}
#endregion
}

View File

@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="NewLife.Stardust.Extensions" Version="2.3.2022.917-beta1649" />
</ItemGroup>
<ItemGroup>

View File

@ -1,61 +1,55 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AntJob;
using AntJob.Providers;
using Microsoft.Extensions.Hosting;
using NewLife;
using NewLife.Configuration;
using NewLife.Log;
namespace HisAgent
namespace HisAgent;
public class JobHost : BackgroundService
{
public class JobHost : BackgroundService
private Scheduler _scheduler;
private readonly IServiceProvider _serviceProvider;
public JobHost(IServiceProvider serviceProvider) => _serviceProvider = serviceProvider;
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
//private readonly IConfigProvider _config;
//private readonly ITracer _tracer;
private Scheduler _scheduler;
var set = AntSetting.Current;
public JobHost()
// 实例化调度器
var sc = new Scheduler
{
}
ServiceProvider = _serviceProvider,
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var set = AntSetting.Current;
// 实例化调度器
var sc = new Scheduler
// 使用分布式调度引擎替换默认的本地文件调度
Provider = new NetworkJobProvider
{
// 使用分布式调度引擎替换默认的本地文件调度
Provider = new NetworkJobProvider
{
Server = set.Server,
AppID = set.AppID,
Secret = set.Secret,
Debug = false
}
};
Server = set.Server,
AppID = set.AppID,
Secret = set.Secret,
Debug = false
}
};
// 添加作业
sc.AddHandler<HelloJob>();
// 添加作业
sc.AddHandler<HelloJob>();
// 启动调度引擎,调度器内部多线程处理
sc.Start();
_scheduler = sc;
// 启动调度引擎,调度器内部多线程处理
sc.Start();
_scheduler = sc;
return Task.CompletedTask;
}
return Task.CompletedTask;
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_scheduler.TryDispose();
_scheduler = null;
public override Task StopAsync(CancellationToken cancellationToken)
{
_scheduler.TryDispose();
_scheduler = null;
return Task.CompletedTask;
}
return Task.CompletedTask;
}
}

View File

@ -1,35 +1,33 @@
using System;
using AntJob;
using AntJob.Providers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NewLife.Log;
namespace HisAgent
namespace HisAgent;
class Program
{
class Program
static void Main(string[] args)
{
static void Main(string[] args)
{
XTrace.UseConsole();
XTrace.UseConsole();
CreateHostBuilder(args).Build().Run();
}
/// <summary></summary>
/// <param name="args"></param>
/// <returns></returns>
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) => ConfigureServices(services));
/// <summary></summary>
/// <param name="hostBuilderContext"></param>
/// <param name="services"></param>
public static void ConfigureServices(IServiceCollection services)
{
// 添加后台调度服务
services.AddHostedService<JobHost>();
}
CreateHostBuilder(args).Build().Run();
}
}
/// <summary></summary>
/// <param name="args"></param>
/// <returns></returns>
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) => ConfigureServices(services));
/// <summary></summary>
/// <param name="hostBuilderContext"></param>
/// <param name="services"></param>
public static void ConfigureServices(IServiceCollection services)
{
services.AddStardust();
// 添加后台调度服务
services.AddHostedService<JobHost>();
}
}