更新client

This commit is contained in:
智能大石头 2025-05-22 11:27:12 +08:00
parent 3f579c0150
commit 7e8acd943b
1 changed files with 23 additions and 17 deletions

View File

@ -178,11 +178,13 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
var server = online.Name;
var pid = online.ProcessId;
//var topic = ps["topic"] + "";
var client = online.Client;
if (client.IsNullOrEmpty()) client = online.Instance;
switch (job.Mode)
{
case JobModes.Message:
list.AddRange(AcquireMessage(job, model.Topic, server, ip, pid, model.Count - list.Count, _cacheProvider.Cache));
list.AddRange(AcquireMessage(job, model.Topic, server, client, model.Count - list.Count, _cacheProvider.Cache));
break;
case JobModes.Time:
{
@ -191,7 +193,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
{
// 申请任务前,不能再查数据库,那样子会导致多线程脏读,从而出现多客户端分到相同任务的情况
//jb = Job.FindByKey(jb.ID);
list.AddRange(Acquire(job, server, ip, pid, model.Count - list.Count, _cacheProvider.Cache));
list.AddRange(Acquire(job, server, client, model.Count - list.Count, _cacheProvider.Cache));
}
break;
}
@ -205,7 +207,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
{
// 申请任务前,不能再查数据库,那样子会导致多线程脏读,从而出现多客户端分到相同任务的情况
//jb = Job.FindByKey(jb.ID);
list.AddRange(Acquire(job, server, ip, pid, model.Count - list.Count, _cacheProvider.Cache));
list.AddRange(Acquire(job, server, client, model.Count - list.Count, _cacheProvider.Cache));
}
}
break;
@ -264,9 +266,11 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
if (next <= now)
{
//var online = _appService.GetOnline(app, ip);
var client = online.Client;
if (client.IsNullOrEmpty()) client = online.Instance;
next = now.AddSeconds(15);
list.AddRange(AcquireDelay(job, online.Server, online.UpdateIP, online.ProcessId, count, cache));
list.AddRange(AcquireDelay(job, online.Server, client, count, cache));
if (list.Count > 0)
{
@ -290,9 +294,11 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
if (next < now)
{
//var online = _appService.GetOnline(app, ip);
var client = online.Client;
if (client.IsNullOrEmpty()) client = online.Instance;
next = now.AddSeconds(60);
list.AddRange(AcquireOld(job, online.Server, online.UpdateIP, online.ProcessId, count, cache));
list.AddRange(AcquireOld(job, online.Server, client, count, cache));
if (list.Count > 0)
{
@ -578,13 +584,13 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <param name="count">要申请的任务个数</param>
/// <param name="cache">缓存对象</param>
/// <returns></returns>
public IList<JobTask> Acquire(Job job, String server, String ip, Int32 pid, Int32 count, ICache cache)
public IList<JobTask> Acquire(Job job, String server, String client, Int32 count, ICache cache)
{
var list = new List<JobTask>();
if (!job.Enable) return list;
using var span = _tracer?.NewSpan(nameof(Acquire), new { job.Name, server, ip, pid, count });
using var span = _tracer?.NewSpan(nameof(Acquire), new { job.Name, server, client, count });
using var ts = Job.Meta.CreateTrans();
var start = job.DataTime;
@ -609,7 +615,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
Server = server,
ProcessID = Interlocked.Increment(ref _idxBatch),
Client = $"{ip}@{pid}",
Client = client,
Status = JobStatus.,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now
@ -716,9 +722,9 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <param name="count">要申请的任务个数</param>
/// <param name="cache">缓存对象</param>
/// <returns></returns>
public IList<JobTask> AcquireDelay(Job job, String server, String ip, Int32 pid, Int32 count, ICache cache)
public IList<JobTask> AcquireDelay(Job job, String server, String client, Int32 count, ICache cache)
{
using var span = _tracer?.NewSpan(nameof(AcquireDelay), new { job.Name, server, ip, pid, count });
using var span = _tracer?.NewSpan(nameof(AcquireDelay), new { job.Name, server, client, count });
using var ts = Job.Meta.CreateTrans();
@ -729,7 +735,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
{
task.Server = server;
task.ProcessID = Interlocked.Increment(ref _idxBatch);
task.Client = $"{ip}@{pid}";
task.Client = client;
task.Status = JobStatus.;
//task.CreateTime = DateTime.Now;
task.UpdateTime = DateTime.Now;
@ -751,9 +757,9 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <param name="count">要申请的任务个数</param>
/// <param name="cache">缓存对象</param>
/// <returns></returns>
public IList<JobTask> AcquireOld(Job job, String server, String ip, Int32 pid, Int32 count, ICache cache)
public IList<JobTask> AcquireOld(Job job, String server, String client, Int32 count, ICache cache)
{
using var span = _tracer?.NewSpan(nameof(AcquireOld), new { job.Name, server, ip, pid, count });
using var span = _tracer?.NewSpan(nameof(AcquireOld), new { job.Name, server, client, count });
using var ts = Job.Meta.CreateTrans();
var list = new List<JobTask>();
@ -782,7 +788,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
{
task.Server = server;
task.ProcessID = Interlocked.Increment(ref _idxBatch);
task.Client = $"{ip}@{pid}";
task.Client = client;
task.Status = JobStatus.;
//task.CreateTime = DateTime.Now;
task.UpdateTime = DateTime.Now;
@ -806,7 +812,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <param name="count">要申请的任务个数</param>
/// <param name="cache">缓存对象</param>
/// <returns></returns>
public IList<JobTask> AcquireMessage(Job job, String topic, String server, String ip, Int32 pid, Int32 count, ICache cache)
public IList<JobTask> AcquireMessage(Job job, String topic, String server, String client, Int32 count, ICache cache)
{
// 消费消息时,保存主题
if (job.Topic != topic)
@ -823,7 +829,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
var now = DateTime.Now;
if (job.MessageCount == 0 && job.UpdateTime.AddMinutes(2) > now) return list;
using var span = _tracer?.NewSpan(nameof(AcquireMessage), new { job.Name, topic, server, ip, pid, count });
using var span = _tracer?.NewSpan(nameof(AcquireMessage), new { job.Name, topic, server, client, count });
using var ts = Job.Meta.CreateTrans();
var size = job.BatchSize;
@ -851,7 +857,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
Server = server,
ProcessID = Interlocked.Increment(ref _idxBatch),
Client = $"{ip}@{pid}",
Client = client,
Status = JobStatus.,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now