优化应用在线插入表,避免高并发冲突错误

This commit is contained in:
智能大石头 2025-05-21 19:15:34 +08:00
parent 7597fa96df
commit 317ce967bd
5 changed files with 53 additions and 40 deletions

View File

@ -149,5 +149,10 @@ public partial class AppOnline : EntityBase<AppOnline>
UpdateTime = UpdateTime,
};
}
/// <summary>根据编码查询或添加</summary>
/// <param name="sessionid"></param>
/// <returns></returns>
public static AppOnline GetOrAdd(String sessionid) => GetOrAdd(sessionid, FindByInstance, k => new AppOnline { Instance = k });
#endregion
}

View File

@ -57,8 +57,8 @@ class AntService : IApi, IActionFilter
{
_App = app;
var ip = _Net.Remote.Host;
var online = _appService.GetOnline(app, ip);
var remote = _Net.Remote;
var online = _appService.GetOnline(app, remote + "", remote.Host);
online.UpdateTime = TimerX.Now;
online.SaveAsync();
}
@ -102,7 +102,8 @@ class AntService : IApi, IActionFilter
if (model.Code.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Code));
var (app, online, rs) = _appService.Login(model, _Net.Remote.Host);
var remote = _Net.Remote;
var (app, online, rs) = _appService.Login(model, remote + "", remote.Host);
// 记录当前用户
Session["App"] = app;
@ -117,6 +118,9 @@ class AntService : IApi, IActionFilter
var app = Session["App"] as App;
var online = Session["AppOnline"] as AppOnline;
var remote = _Net.Remote;
online ??= _appService.GetOnline(app, remote + "", remote.Host);
return _appService.Logout(app, online, reason, _Net.Remote.Host);
}
@ -126,6 +130,9 @@ class AntService : IApi, IActionFilter
var app = Session["App"] as App;
var online = Session["AppOnline"] as AppOnline;
var remote = _Net.Remote;
online ??= _appService.GetOnline(app, remote + "", remote.Host);
return _appService.Ping(app, online, request, _Net.Remote.Host);
}
@ -196,8 +203,8 @@ class AntService : IApi, IActionFilter
{
if (task == null || task.ID == 0) throw new InvalidOperationException("无效操作 TaskID=" + task?.ID);
var ip = _Net.Remote.Host;
return _jobService.Report(_App, task, ip);
var remote = _Net.Remote;
return _jobService.Report(_App, task, remote + "", remote.Host);
}
#endregion
}

View File

@ -28,7 +28,7 @@ public class AppService
/// <summary>应用登录</summary>
/// <param name="model">模型</param>
/// <returns></returns>
public (App, AppOnline, LoginResponse) Login(LoginModel model, String ip)
public (App, AppOnline, LoginResponse) Login(LoginModel model, String sessionId, String ip)
{
if (model.Code.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Code));
@ -66,8 +66,10 @@ public class AppService
app.Save();
// 应用上线
var online = CreateOnline(app, ip, model.ClientId);
if (!model.ClientId.IsNullOrEmpty()) sessionId = model.ClientId;
var online = GetOnline(app, sessionId, ip);
online.Name = model.Machine;
online.Client = model.ClientId;
online.ProcessId = model.ProcessId;
online.Version = model.Version;
online.CompileTime = compile;
@ -127,7 +129,7 @@ public class AppService
{
if (app != null)
{
online ??= GetOnline(app, ip);
//online ??= GetOnline(app, ip);
if (online != null)
{
WriteHistory(app, "注销", true, reason, ip);
@ -145,7 +147,7 @@ public class AppService
{
if (app != null)
{
online ??= GetOnline(app, ip);
//online ??= GetOnline(app, ip);
if (online != null)
{
if (request is PingRequest req)
@ -173,30 +175,23 @@ public class AppService
return olts.Select(e => e.ToModel()).ToArray();
}
AppOnline CreateOnline(App app, String ip, String clientId)
public AppOnline GetOnline(App app, String sessionId, String ip)
{
var online = GetOnline(app, ip);
online.Client = clientId;
online.UpdateIP = ip;
if (sessionId.IsNullOrEmpty()) sessionId = $"{app.Name}@{ip}";
var online = AppOnline.GetOrAdd(sessionId);
online.AppID = app.ID;
online.Server = Environment.MachineName;
return online;
}
public AppOnline GetOnline(App app, String ip)
{
var ins = $"{app.Name}@{ip}";
var online = AppOnline.FindByInstance(ins) ?? new AppOnline { Enable = true, CreateIP = ip };
online.AppID = app.ID;
online.Instance = ins;
if (online.CreateIP.IsNullOrEmpty()) online.CreateIP = ip;
online.UpdateIP = ip;
return online;
}
public void UpdateOnline(App app, JobTask ji, String ip)
public void UpdateOnline(App app, JobTask ji, String sessionId, String ip)
{
var online = GetOnline(app, ip);
var online = GetOnline(app, sessionId, ip);
online.Total += ji.Total;
online.Success += ji.Success;
online.Error += ji.Error;

View File

@ -4,6 +4,7 @@ using AntJob.Models;
using NewLife;
using NewLife.Caching;
using NewLife.Log;
using NewLife.Net;
using NewLife.Reflection;
using NewLife.Serialization;
using NewLife.Threading;
@ -133,7 +134,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <summary>申请作业任务</summary>
/// <param name="model">模型</param>
/// <returns></returns>
public ITask[] Acquire(App app, AcquireModel model, String ip)
public ITask[] Acquire(App app, AcquireModel model, NetUri remote)
{
var jobName = model.Job?.Trim();
if (jobName.IsNullOrEmpty()) return [];
@ -159,14 +160,15 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
if (job.DataTime.Year <= 2000) throw new XException("作业[{0}/{1}]未设置数据时间", job.ID, job.Name);
// 应用在线,但可能禁止向其分配任务
var online = _appService.GetOnline(app, ip);
var ip = remote?.Host;
var online = _appService.GetOnline(app, remote + "", ip);
if (!online.Enable) return [];
var list = new List<JobTask>();
// 首先检查延迟任务和错误任务
CheckDelayTask(app, job, model.Count, list, ip);
CheckOldTask(app, job, model.Count, list, ip);
CheckDelayTask(app, job, model.Count, list, online);
CheckOldTask(app, job, model.Count, list, online);
// 错误项不够时,增加切片
if (list.Count < model.Count)
@ -244,7 +246,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
return rs.ToArray();
}
private void CheckDelayTask(App app, Job job, Int32 count, List<JobTask> list, String ip)
private void CheckDelayTask(App app, Job job, Int32 count, List<JobTask> list, AppOnline online)
{
// 获取下一次检查时间
var cache = _cacheProvider.Cache;
@ -260,10 +262,10 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
}
if (next <= now)
{
var online = _appService.GetOnline(app, ip);
//var online = _appService.GetOnline(app, ip);
next = now.AddSeconds(15);
list.AddRange(AcquireDelay(job, online.Server, ip, online.ProcessId, count, cache));
list.AddRange(AcquireDelay(job, online.Server, online.UpdateIP, online.ProcessId, count, cache));
if (list.Count > 0)
{
@ -277,7 +279,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
}
}
private void CheckOldTask(App app, Job job, Int32 count, List<JobTask> list, String ip)
private void CheckOldTask(App app, Job job, Int32 count, List<JobTask> list, AppOnline online)
{
// 每分钟检查一下错误任务和中断任务
var cache = _cacheProvider.Cache;
@ -286,10 +288,10 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
var next = cache.Get<DateTime>(nextKey);
if (next < now)
{
var online = _appService.GetOnline(app, ip);
//var online = _appService.GetOnline(app, ip);
next = now.AddSeconds(60);
list.AddRange(AcquireOld(job, online.Server, ip, online.ProcessId, count, cache));
list.AddRange(AcquireOld(job, online.Server, online.UpdateIP, online.ProcessId, count, cache));
if (list.Count > 0)
{
@ -409,7 +411,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
/// <summary>报告状态(进度、成功、错误)</summary>
/// <param name="result"></param>
/// <returns></returns>
public Boolean Report(App app, TaskResult result, String ip)
public Boolean Report(App app, TaskResult result, String sessionId, String ip)
{
if (result == null || result.ID == 0) throw new InvalidOperationException("无效操作 TaskID=" + result?.ID);
@ -442,7 +444,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
SetJobFinish(job, task);
// 记录状态
_appService.UpdateOnline(app, task, ip);
_appService.UpdateOnline(app, task, sessionId, ip);
}
else if (result.Status == JobStatus.)
{
@ -456,7 +458,7 @@ public class JobService(AppService appService, ICacheProvider cacheProvider, ITr
CheckMaxError(app, job);
// 记录状态
_appService.UpdateOnline(app, task, ip);
_appService.UpdateOnline(app, task, sessionId, ip);
}
else if (result.Status == JobStatus.)
{

View File

@ -11,6 +11,7 @@ using Microsoft.AspNetCore.Mvc.Controllers;
using Microsoft.AspNetCore.Mvc.Filters;
using NewLife;
using NewLife.Cube;
using NewLife.Data;
using NewLife.Log;
using NewLife.Remoting;
using NewLife.Remoting.Models;
@ -104,7 +105,8 @@ public class AntJobController : ControllerBase, IActionFilter
{
if (model.Code.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Code));
var (app, online, rs) = _appService.Login(model, UserHost);
var sessionId = $"{model.Code}@{UserHost}";
var (app, online, rs) = _appService.Login(model, sessionId, UserHost);
return rs;
}
@ -124,7 +126,8 @@ public class AntJobController : ControllerBase, IActionFilter
// 密码模式
if (model.grant_type == "password")
{
var (app, online, rs) = _appService.Login(new LoginModel { Code = model.UserName, Secret = model.Password }, ip);
var sessionId = $"{model.UserName}@{ip}";
var (app, online, rs) = _appService.Login(new LoginModel { Code = model.UserName, Secret = model.Password }, sessionId, ip);
var tokenModel = _appService.IssueToken(app.Name, set);
@ -218,7 +221,8 @@ public class AntJobController : ControllerBase, IActionFilter
{
if (task == null || task.ID == 0) throw new InvalidOperationException("无效操作 TaskID=" + task?.ID);
return _jobService.Report(_App, task, UserHost);
var sessionId = $"{_App.Name}@{UserHost}";
return _jobService.Report(_App, task, sessionId, UserHost);
}
#endregion
}