申请任务时,从在线记录获取机器名和进程ID

This commit is contained in:
大石头 2020-01-11 17:52:55 +08:00
parent 38a7600cec
commit 9bce1132bc
8 changed files with 53 additions and 33 deletions

View File

@ -58,6 +58,14 @@ namespace AntJob.Data.Entity
[BindColumn("Name", "名称。机器名称", "", Master = true)]
public String Name { get { return _Name; } set { if (OnPropertyChanging(__.Name, value)) { _Name = value; OnPropertyChanged(__.Name); } } }
private Int32 _ProcessId;
/// <summary>进程。进程Id</summary>
[DisplayName("进程")]
[Description("进程。进程Id")]
[DataObjectField(false, false, false, 0)]
[BindColumn("ProcessId", "进程。进程Id", "")]
public Int32 ProcessId { get { return _ProcessId; } set { if (OnPropertyChanging(__.ProcessId, value)) { _ProcessId = value; OnPropertyChanged(__.ProcessId); } } }
private String _Version;
/// <summary>版本。客户端</summary>
[DisplayName("版本")]
@ -186,6 +194,7 @@ namespace AntJob.Data.Entity
case __.Instance : return _Instance;
case __.Client : return _Client;
case __.Name : return _Name;
case __.ProcessId : return _ProcessId;
case __.Version : return _Version;
case __.CompileTime : return _CompileTime;
case __.Server : return _Server;
@ -212,6 +221,7 @@ namespace AntJob.Data.Entity
case __.Instance : _Instance = Convert.ToString(value); break;
case __.Client : _Client = Convert.ToString(value); break;
case __.Name : _Name = Convert.ToString(value); break;
case __.ProcessId : _ProcessId = value.ToInt(); break;
case __.Version : _Version = Convert.ToString(value); break;
case __.CompileTime : _CompileTime = value.ToDateTime(); break;
case __.Server : _Server = Convert.ToString(value); break;
@ -251,6 +261,9 @@ namespace AntJob.Data.Entity
/// <summary>名称。机器名称</summary>
public static readonly Field Name = FindByName(__.Name);
/// <summary>进程。进程Id</summary>
public static readonly Field ProcessId = FindByName(__.ProcessId);
/// <summary>版本。客户端</summary>
public static readonly Field Version = FindByName(__.Version);
@ -314,6 +327,9 @@ namespace AntJob.Data.Entity
/// <summary>名称。机器名称</summary>
public const String Name = "Name";
/// <summary>进程。进程Id</summary>
public const String ProcessId = "ProcessId";
/// <summary>版本。客户端</summary>
public const String Version = "Version";
@ -378,6 +394,9 @@ namespace AntJob.Data.Entity
/// <summary>名称。机器名称</summary>
String Name { get; set; }
/// <summary>进程。进程Id</summary>
Int32 ProcessId { get; set; }
/// <summary>版本。客户端</summary>
String Version { get; set; }

View File

@ -33,6 +33,7 @@
<Column Name="Instance" DataType="String" Description="实例。IP加端口" />
<Column Name="Client" DataType="String" Description="客户端。IP加进程" />
<Column Name="Name" DataType="String" Master="True" Description="名称。机器名称" />
<Column Name="ProcessId" DataType="Int32" Description="进程。进程Id" />
<Column Name="Version" DataType="String" Description="版本。客户端" />
<Column Name="CompileTime" DataType="DateTime" Description="编译时间" />
<Column Name="Server" DataType="String" Description="服务端。客户端登录到哪个服务端IP加端口" />

View File

@ -284,6 +284,8 @@ namespace AntJob.Server
if (jb == null) throw new XException($"应用[{app.ID}/{app.Name}]下未找到作业[{job}]");
if (jb.Step == 0 || jb.Start.Year <= 2000) throw new XException("作业[{0}/{1}]未设置开始时间或步进", jb.ID, jb.Name);
var online = GetOnline(app, Session as INetSession);
var list = new List<JobTask>();
// 每分钟检查一下错误任务和中断任务
@ -293,8 +295,8 @@ namespace AntJob.Server
if (list.Count < count)
{
var ps = ControllerContext.Current.Parameters;
var server = ps["server"] + "";
var pid = ps["pid"].ToInt();
var server = online.Name;
var pid = online.ProcessId;
var topic = ps["topic"] + "";
var ip = (Session as INetSession).Remote.Host;
@ -305,6 +307,8 @@ namespace AntJob.Server
break;
case JobModes.Data:
case JobModes.Alarm:
case JobModes.CSharp:
case JobModes.Sql:
default:
{
// 如果能够切片,则查询数据库后进入,避免缓存导致重复
@ -320,7 +324,6 @@ namespace AntJob.Server
}
// 记录状态
var online = GetOnline(app, Session as INetSession);
online.Tasks += list.Count;
online.SaveAsync();
@ -664,6 +667,7 @@ namespace AntJob.Server
var online = GetOnline(app, ns);
online.Client = $"{(ip.IsNullOrEmpty() ? machine : ip)}@{pid}";
online.Name = machine;
online.ProcessId = pid;
online.UpdateIP = ip;
//online.Version = version;

View File

@ -102,7 +102,7 @@ namespace AntJob
/// <remarks>
/// 业务应用根据使用场景可重载Acquire并返回空来阻止创建新任务
/// </remarks>
/// <param name="data">扩展数据。服务器、进程等信息</param>
/// <param name="data">扩展数据。Topic等信息</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public virtual ITask[] Acquire(IDictionary<String, Object> data, Int32 count = 1)

View File

@ -41,12 +41,13 @@ namespace AntJob.Handlers
/// <remarks>
/// 业务应用根据使用场景可重载Acquire并返回空来阻止创建新任务
/// </remarks>
/// <param name="data">扩展数据。服务器、进程等信息</param>
/// <param name="data">扩展数据。Topic等信息</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
public override ITask[] Acquire(IDictionary<String, Object> data, Int32 count = 1)
{
// 消费模式设置Topic值
if (data == null) data = new Dictionary<String, Object>();
if (!Topic.IsNullOrEmpty()) data[nameof(Topic)] = Topic;
return base.Acquire(data, count);

View File

@ -24,7 +24,7 @@ namespace AntJob.Providers
/// <summary>申请任务</summary>
/// <param name="job">作业</param>
/// <param name="data">扩展数据</param>
/// <param name="data">扩展数据。Topic等信息</param>
/// <param name="count">要申请的任务个数</param>
/// <returns></returns>
ITask[] Acquire(IJob job, IDictionary<String, Object> data, Int32 count = 1);

View File

@ -63,23 +63,23 @@ namespace AntJob.Providers
Ant.TryDispose();
Ant = ant;
var ws = Schedule?.Jobs;
var bs = Schedule?.Jobs;
//var jobs = GetJobs(ws.Select(e => e.Name).ToArray());
var list = new List<IJob>();
foreach (var wrk in ws)
foreach (var handler in bs)
{
var job = wrk.Job ?? new JobModel();
var job = handler.Job ?? new JobModel();
job.Name = wrk.Name;
job.Mode = wrk.Mode;
job.Name = handler.Name;
job.Mode = handler.Mode;
// 描述
if (job is JobModel job2)
{
var dis = wrk.GetType().GetDisplayName();
var dis = handler.GetType().GetDisplayName();
if (!dis.IsNullOrEmpty()) job2.DisplayName = dis;
var des = wrk.GetType().GetDescription();
var des = handler.GetType().GetDescription();
if (!des.IsNullOrEmpty()) job2.Description = des;
}

View File

@ -103,15 +103,10 @@ namespace AntJob
public Boolean Process()
{
var prv = Provider;
var extend = new Dictionary<String, Object>
{
["server"] = Environment.MachineName,
["pid"] = System.Diagnostics.Process.GetCurrentProcess().Id
};
// 查询所有处理器和被依赖的作业
var ws = Jobs;
var names = ws.Select(e => e.Name).ToList();
var bs = Jobs;
var names = bs.Select(e => e.Name).ToList();
names = names.Distinct().ToList();
// 拿到处理器对应的作业
@ -120,40 +115,40 @@ namespace AntJob
var flag = false;
// 遍历处理器,给空闲的增加任务
foreach (var wrk in ws)
foreach (var handler in bs)
{
var job = jobs.FirstOrDefault(e => e.Name == wrk.Name);
var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
// 找不到或者已停用
if (job == null || !job.Enable)
{
if (wrk.Active) wrk.Stop();
if (handler.Active) handler.Stop();
continue;
}
// 可能外部添加的Worker并不完整
wrk.Schedule = this;
wrk.Provider = prv;
handler.Schedule = this;
handler.Provider = prv;
// 更新作业参数,并启动处理器
wrk.Job = job;
if (job.Mode == 0) job.Mode = wrk.Mode;
if (!wrk.Active) wrk.Start();
handler.Job = job;
if (job.Mode == 0) job.Mode = handler.Mode;
if (!handler.Active) handler.Start();
// 如果正在处理任务数没达到最大并行度,则继续安排任务
if (wrk.Busy < job.MaxTask)
if (handler.Busy < job.MaxTask)
{
// 循环申请任务,喂饱处理器
var count = job.MaxTask - wrk.Busy;
var ts = wrk.Acquire(extend, count);
var count = job.MaxTask - handler.Busy;
var ts = handler.Acquire(null, count);
// 送给处理器处理
for (var i = 0; i < count && ts != null && i < ts.Length; i++)
{
// 准备就绪增加Busy避免超额分配
wrk.Prepare(ts[i]);
handler.Prepare(ts[i]);
// 使用线程池调度避免Task排队影响使用
ThreadPoolX.QueueUserWorkItem(wrk.Process, ts[i]);
ThreadPoolX.QueueUserWorkItem(handler.Process, ts[i]);
}
if (ts != null && ts.Length > 0) flag = true;