增加申请任务的埋点
This commit is contained in:
parent
c7bb9165d7
commit
a10f2c5808
|
@ -37,7 +37,7 @@
|
|||
<None Remove="Build.tt" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.601" />
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.606" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<None Update="Build.log">
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.601" />
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.606" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\AntJob\AntJob.csproj" />
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
using AntJob.Data;
|
||||
using System;
|
||||
using AntJob.Data;
|
||||
using AntJob.Data.Entity;
|
||||
using AntJob.Models;
|
||||
using NewLife;
|
||||
|
@ -11,10 +12,11 @@ using XCode.DataAccessLayer;
|
|||
|
||||
namespace AntJob.Server.Services;
|
||||
|
||||
public class JobService(AppService appService, ICacheProvider cacheProvider, ILog log)
|
||||
public class JobService(AppService appService, ICacheProvider cacheProvider, ITracer tracer, ILog log)
|
||||
{
|
||||
private readonly AppService _appService = appService;
|
||||
private readonly ICacheProvider _cacheProvider = cacheProvider;
|
||||
private readonly ITracer _tracer = tracer;
|
||||
private readonly ILog _log = log;
|
||||
|
||||
#region 业务
|
||||
|
@ -379,6 +381,8 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
}
|
||||
else if (result.Status == JobStatus.延迟)
|
||||
{
|
||||
using var span = _tracer?.NewSpan("Delay", new { job.Name, task.DataTime, NextTime = result.NextTime.ToLocalTime() });
|
||||
|
||||
task.Times++;
|
||||
|
||||
// 延迟任务的下一次执行时间
|
||||
|
@ -397,6 +401,8 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
|
||||
private void SetJobFinish(Job job, JobTask task)
|
||||
{
|
||||
using var span = _tracer?.NewSpan(nameof(SetJobFinish), new { job.Name, task.DataTime });
|
||||
|
||||
job.Total += task.Total;
|
||||
job.Success += task.Success;
|
||||
job.Error += task.Error;
|
||||
|
@ -421,6 +427,8 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
|
||||
private JobError SetJobError(Job job, JobTask task)
|
||||
{
|
||||
using var span = _tracer?.NewSpan(nameof(SetJobError), new { job.Name, task.DataTime });
|
||||
|
||||
var err = new JobError
|
||||
{
|
||||
AppID = job.AppID,
|
||||
|
@ -482,8 +490,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
var step = job.Step;
|
||||
if (step <= 0) step = 30;
|
||||
|
||||
//// 全局锁,确保单个作业只有一个线程在分配作业
|
||||
//using var ck = cache.AcquireLock($"Job:{ID}", 5_000);
|
||||
using var span = _tracer?.NewSpan(nameof(Acquire), new { job.Name, server, ip, pid, count });
|
||||
|
||||
using var ts = Job.Meta.CreateTrans();
|
||||
var start = job.DataTime;
|
||||
|
@ -520,7 +527,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
{
|
||||
var ti2 = cache.Get<JobTask>(key);
|
||||
XTrace.WriteLine("[{0}]重复切片:{1}", key, ti2?.ToJson());
|
||||
using var span = DefaultTracer.Instance?.NewSpan($"job:AcquireDuplicate", ti2);
|
||||
using var span2 = DefaultTracer.Instance?.NewSpan($"job:AcquireDuplicate", ti2);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -547,6 +554,9 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
ts.Commit();
|
||||
}
|
||||
|
||||
// 记录任务数
|
||||
span?.AppendTag(null, list.Count);
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
@ -634,8 +644,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
/// <returns></returns>
|
||||
public IList<JobTask> AcquireOld(Job job, String server, String ip, Int32 pid, Int32 count, ICache cache)
|
||||
{
|
||||
//// 全局锁,确保单个作业只有一个线程在分配作业
|
||||
//using var ck = cache.AcquireLock($"Job:{ID}", 5_000);
|
||||
using var span = _tracer?.NewSpan(nameof(AcquireOld), new { job.Name, server, ip, pid, count });
|
||||
|
||||
using var ts = Job.Meta.CreateTrans();
|
||||
var list = new List<JobTask>();
|
||||
|
@ -662,7 +671,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
task.Server = server;
|
||||
task.ProcessID = Interlocked.Increment(ref _idxBatch);
|
||||
task.Client = $"{ip}@{pid}";
|
||||
task.Status = JobStatus.处理中;
|
||||
task.Status = JobStatus.就绪;
|
||||
//task.CreateTime = DateTime.Now;
|
||||
task.UpdateTime = DateTime.Now;
|
||||
}
|
||||
|
@ -671,6 +680,9 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
|
||||
ts.Commit();
|
||||
|
||||
// 记录任务数
|
||||
span?.AppendTag(null, list.Count);
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
@ -699,8 +711,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
var now = DateTime.Now;
|
||||
if (job.MessageCount == 0 && job.UpdateTime.AddMinutes(2) > now) return list;
|
||||
|
||||
//// 全局锁,确保单个作业只有一个线程在分配作业
|
||||
//using var ck = cache.AcquireLock($"Job:{ID}", 5_000);
|
||||
using var span = _tracer?.NewSpan(nameof(AcquireMessage), new { job.Name, topic, server, ip, pid, count });
|
||||
|
||||
using var ts = Job.Meta.CreateTrans();
|
||||
var size = job.BatchSize;
|
||||
|
@ -729,7 +740,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
Server = server,
|
||||
ProcessID = Interlocked.Increment(ref _idxBatch),
|
||||
Client = $"{ip}@{pid}",
|
||||
Status = JobStatus.处理中,
|
||||
Status = JobStatus.就绪,
|
||||
CreateTime = DateTime.Now,
|
||||
UpdateTime = DateTime.Now
|
||||
};
|
||||
|
@ -764,6 +775,9 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ILo
|
|||
|
||||
ts.Commit();
|
||||
|
||||
// 记录任务数
|
||||
span?.AppendTag(null, list.Count);
|
||||
|
||||
return list;
|
||||
}
|
||||
#endregion
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.601" />
|
||||
<PackageReference Include="NewLife.XCode" Version="11.13.2024.606" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
Loading…
Reference in New Issue