[feat] StarAgent启动后从配置中心获取PluginServer,并通过AgentInfo提供给本地应用

This commit is contained in:
大石头 2023-10-10 15:10:45 +08:00
parent a1c23ed709
commit 59e5f39359
8 changed files with 106 additions and 69 deletions

View File

@ -371,7 +371,14 @@ internal class MyService : ServiceBase, IServiceProvider
if (_factory == null)
{
var server = StarSetting.Server;
if (!server.IsNullOrEmpty()) _factory = new StarFactory(server, "StarAgent", null);
if (!server.IsNullOrEmpty())
{
_factory = new StarFactory(server, "StarAgent", null);
// 激活配置中心获取PluginServer
var config = _factory.GetConfig();
if (config != null) ThreadPoolX.QueueUserWorkItem(() => config.LoadAll());
}
}
}

View File

@ -82,6 +82,13 @@ public class StarService : DisposeBase, IApi
ai.Services = Manager?.Services.Select(e => e.Name).ToArray();
ai.Code = AgentSetting.Code;
// 返回插件服务器地址
var core = NewLife.Setting.Current;
if (!core.PluginServer.IsNullOrEmpty() && !core.PluginServer.Contains("x.newlifex.com"))
{
ai.PluginServer = core.PluginServer;
}
return ai;
}

View File

@ -28,13 +28,13 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
public String AppId { get; set; }
/// <summary>应用名</summary>
public String AppName { get; set; }
public String? AppName { get; set; }
/// <summary>实例。应用可能多实例部署ip@proccessid</summary>
public String ClientId { get; set; }
public String? ClientId { get; set; }
/// <summary>节点编码</summary>
public String NodeCode { get; set; }
public String? NodeCode { get; set; }
/// <summary>WebSocket长连接。建立长连接后可以实时感知配置更新和注册服务更新默认false</summary>
public Boolean UseWebSocket { get; set; }
@ -49,8 +49,8 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
/// <summary>最大失败数。超过该数时新的数据将被抛弃默认120</summary>
public Int32 MaxFails { get; set; } = 120;
private AppInfo _appInfo;
private readonly String _version;
private AppInfo? _appInfo;
private readonly String? _version;
/// <summary>已发布服务,记录下来,定时注册刷新</summary>
private readonly ConcurrentDictionary<String, PublishServiceInfo> _publishServices = new();
@ -146,10 +146,10 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
StartTimer();
}
private String _appName;
private String? _appName;
/// <summary>注册</summary>
/// <returns></returns>
public async Task<Object> Register()
public async Task<Object?> Register()
{
try
{
@ -174,7 +174,7 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
catch (Exception ex)
{
if (ex is HttpRequestException)
Log?.Info("注册异常[{0}] {1}", Source, ex.GetTrue().Message);
Log?.Info("注册异常[{0}] {1}", Source, ex.GetTrue()?.Message);
else
Log?.Info(ex.ToString());
@ -340,7 +340,7 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
#endregion
#region
private TimerX _timer;
private TimerX? _timer;
private void StartTimer()
{
if (_timer == null)
@ -428,8 +428,8 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
}
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
private WebSocket _websocket;
private CancellationTokenSource _source;
private WebSocket? _websocket;
private CancellationTokenSource? _source;
private async Task DoPull(WebSocket socket, CancellationToken cancellationToken)
{
DefaultSpan.Current = null;
@ -523,7 +523,7 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
/// <summary>上报服务调用结果</summary>
/// <param name="model"></param>
/// <returns></returns>
public virtual async Task<Object> CommandReply(CommandReplyModel model) => await PostAsync<Object>("App/CommandReply", model);
public virtual async Task<Object?> CommandReply(CommandReplyModel model) => await PostAsync<Object>("App/CommandReply", model);
#endregion
#region
@ -597,7 +597,7 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
/// <param name="tag">特性标签</param>
/// <param name="health">健康监测接口地址</param>
/// <returns></returns>
public async Task<PublishServiceInfo> RegisterAsync(String serviceName, String address, String tag = null, String health = null)
public async Task<PublishServiceInfo> RegisterAsync(String serviceName, String address, String? tag = null, String? health = null)
{
if (address == null) throw new ArgumentNullException(nameof(address));
@ -619,7 +619,7 @@ public class AppClient : ApiHttpClient, ICommandClient, IRegistry, IEventProvide
/// <param name="tag">特性标签</param>
/// <param name="health">健康监测接口地址</param>
/// <returns></returns>
public PublishServiceInfo Register(String serviceName, Func<String> addressCallback, String tag = null, String health = null)
public PublishServiceInfo Register(String serviceName, Func<String> addressCallback, String? tag = null, String? health = null)
{
if (addressCallback == null) throw new ArgumentNullException(nameof(addressCallback));

View File

@ -8,26 +8,25 @@ using NewLife.Serialization;
using Stardust.Models;
using Stardust.Registry;
using Stardust.Services;
using static System.Net.WebRequestMethods;
namespace Stardust.Configs;
internal class StarHttpConfigProvider : HttpConfigProvider
{
public ConfigInfo ConfigInfo { get; set; }
public ConfigInfo? ConfigInfo { get; set; }
const String REGISTRY = "$Registry:";
private Boolean _useWorkerId;
protected override IDictionary<String, Object> GetAll()
protected override IDictionary<String, Object?>? GetAll()
{
try
{
var rs = base.GetAll();
var inf = Info;
if (rs != null && rs.Count > 0)
if (rs != null && rs.Count > 0 && inf != null)
{
var inf = Info;
var ci = ConfigInfo = JsonHelper.Convert<ConfigInfo>(inf);
if (ci != null && ci.Configs != null && ci.Configs.Count > 0)
@ -56,7 +55,9 @@ internal class StarHttpConfigProvider : HttpConfigProvider
{
var asm = AssemblyX.Entry;
var set = NewLife.Setting.Current;
if (!svr.IsNullOrEmpty() && !svr.EqualIgnoreCase(set.PluginServer) && !asm.Name.EqualIgnoreCase("StarWeb", "StarServer"))
if (!svr.IsNullOrEmpty() &&
!svr.EqualIgnoreCase(set.PluginServer) &&
(asm == null || !asm.Name.EqualIgnoreCase("StarWeb", "StarServer")))
{
XTrace.WriteLine("插件服务器PluginServer变更为 {0}", svr);
set.PluginServer = svr;
@ -86,7 +87,7 @@ internal class StarHttpConfigProvider : HttpConfigProvider
/// <param name="key"></param>
/// <param name="createOnMiss"></param>
/// <returns></returns>
protected override IConfigSection Find(String key, Boolean createOnMiss)
protected override IConfigSection? Find(String key, Boolean createOnMiss)
{
if (key.StartsWithIgnoreCase(REGISTRY))
{

View File

@ -1,14 +1,13 @@
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using NewLife;
using NewLife.Http;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Net;
using NewLife.Remoting;
using Stardust.Models;
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
@ -22,16 +21,16 @@ public class LocalStarClient
{
#region
/// <summary>本机代理信息</summary>
public AgentInfo Info { get; private set; }
public AgentInfo? Info { get; private set; }
/// <summary>本地服务端地址</summary>
public String Server { get; set; }
public String? Server { get; set; }
/// <summary>本地星尘代理服务地址</summary>
public static Int32 Port { get; set; } = 5500;
private AgentInfo _local;
private ApiClient _client;
private AgentInfo? _local;
private ApiClient? _client;
#endregion
#region
@ -44,6 +43,9 @@ public class LocalStarClient
#endregion
#region
#if !NET40
[MemberNotNull(nameof(_client))]
#endif
private void Init()
{
if (_client != null) return;
@ -63,7 +65,7 @@ public class LocalStarClient
/// <summary>获取信息</summary>
/// <returns></returns>
public AgentInfo GetInfo()
public AgentInfo? GetInfo()
{
//!!! 通过进程名判断是否存在可能会误判因为获取其它dotnet进程命令行需要管理员权限
//// 检测进程是否存在,如果进程都不存在,没必要获取信息
@ -94,7 +96,7 @@ public class LocalStarClient
/// <summary>获取信息</summary>
/// <returns></returns>
public async Task<AgentInfo> GetInfoAsync()
public async Task<AgentInfo?> GetInfoAsync()
{
Init();
@ -363,7 +365,7 @@ public class LocalStarClient
/// <param name="url">zip包下载源</param>
/// <param name="version">版本号</param>
/// <param name="target">目标目录</param>
public static Task ProbeAsync(String url = null, String version = null, String target = null)
public static Task ProbeAsync(String? url = null, String? version = null, String? target = null)
{
return TaskEx.Run(() =>
{
@ -470,6 +472,6 @@ public class LocalStarClient
/// <summary>写日志</summary>
/// <param name="format"></param>
/// <param name="args"></param>
public void WriteLog(String format, params Object[] args) => Log?.Info(format, args);
public void WriteLog(String format, params Object?[] args) => Log?.Info(format, args);
#endregion
}

View File

@ -14,28 +14,31 @@ public class AgentInfo
public Int32 ProcessId { get; set; }
/// <summary>进程名称</summary>
public String ProcessName { get; set; }
public String? ProcessName { get; set; }
/// <summary>版本</summary>
public String Version { get; set; }
public String? Version { get; set; }
/// <summary>文件路径</summary>
public String FileName { get; set; }
public String? FileName { get; set; }
/// <summary>命令参数</summary>
public String Arguments { get; set; }
public String? Arguments { get; set; }
/// <summary>本地IP地址</summary>
public String IP { get; set; }
public String? IP { get; set; }
/// <summary>服务端地址</summary>
public String Server { get; set; }
public String? Server { get; set; }
/// <summary>插件服务器</summary>
public String? PluginServer { get; set; }
/// <summary>节点编码</summary>
public String Code { get; set; }
public String? Code { get; set; }
/// <summary>应用服务</summary>
public String[] Services { get; set; }
public String[]? Services { get; set; }
#endregion
#region

View File

@ -19,22 +19,22 @@ public class StarTracer : DefaultTracer
{
#region
/// <summary>应用标识</summary>
public String AppId { get; set; }
public String? AppId { get; set; }
/// <summary>应用名</summary>
public String AppName { get; set; }
public String? AppName { get; set; }
/// <summary>实例。应用可能多实例部署ip@proccessid</summary>
public String ClientId { get; set; }
public String? ClientId { get; set; }
/// <summary>最大失败数。超过该数时新的数据将被抛弃默认120</summary>
public Int32 MaxFails { get; set; } = 120;
/// <summary>要排除的操作名</summary>
public String[] Excludes { get; set; }
public String[]? Excludes { get; set; }
/// <summary>Api客户端</summary>
public IApiClient Client { get; set; }
public IApiClient? Client { get; set; }
/// <summary>剔除埋点调用自己。默认true</summary>
public Boolean TrimSelf { get; set; } = true;
@ -285,7 +285,7 @@ public class StarTracer : DefaultTracer
/// <summary>全局注册星尘性能追踪器</summary>
/// <param name="server">星尘监控中心地址,为空时自动从本地探测</param>
/// <returns></returns>
public static StarTracer Register(String server = null)
public static StarTracer? Register(String? server = null)
{
if (server.IsNullOrEmpty())
{

View File

@ -1,5 +1,6 @@
using System.Collections;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using NewLife;
using NewLife.Caching;
@ -32,37 +33,37 @@ public class StarFactory : DisposeBase
{
#region
/// <summary>服务器地址</summary>
public String Server { get; set; }
public String? Server { get; set; }
/// <summary>应用</summary>
public String AppId { get; set; }
public String? AppId { get; set; }
/// <summary>应用名</summary>
public String AppName { get; set; }
public String? AppName { get; set; }
/// <summary>应用密钥</summary>
public String Secret { get; set; }
public String? Secret { get; set; }
/// <summary>实例。应用可能多实例部署ip@proccessid</summary>
public String ClientId { get; set; }
public String? ClientId { get; set; }
///// <summary>服务名</summary>
//public String ServiceName { get; set; }
/// <summary>客户端</summary>
public IApiClient Client => _client;
public IApiClient? Client => _client;
/// <summary>应用客户端</summary>
public AppClient App => _client;
public AppClient? App => _client;
/// <summary>配置信息。从配置中心返回的信息头</summary>
public ConfigInfo ConfigInfo => (_config as StarHttpConfigProvider)?.ConfigInfo;
public ConfigInfo? ConfigInfo => (_config as StarHttpConfigProvider)?.ConfigInfo;
/// <summary>本地星尘代理</summary>
public LocalStarClient Local { get; private set; }
private AppClient _client;
private TokenHttpFilter _tokenFilter;
private AppClient? _client;
private TokenHttpFilter? _tokenFilter;
#endregion
#region
@ -184,6 +185,17 @@ public class StarFactory : DisposeBase
}
else
XTrace.WriteLine("星尘探测StarAgent Not Found, Cost={0}ms", sw.ElapsedMilliseconds);
if (inf != null & !inf.PluginServer.IsNullOrEmpty())
{
var core = NewLife.Setting.Current;
if (!inf.PluginServer.EqualIgnoreCase(core.PluginServer))
{
XTrace.WriteLine("插件服务器PluginServer变更为 {0}", inf.PluginServer);
core.PluginServer = inf.PluginServer;
core.Save();
}
}
}
catch (Exception ex)
{
@ -238,6 +250,9 @@ public class StarFactory : DisposeBase
ioc.TryAddSingleton(typeof(ICacheProvider), typeof(CacheProvider));
}
#if !NET40
[MemberNotNullWhen(true, nameof(_client))]
#endif
private Boolean Valid()
{
//if (Server.IsNullOrEmpty()) throw new ArgumentNullException(nameof(Server));
@ -288,7 +303,7 @@ public class StarFactory : DisposeBase
#region
private StarTracer _tracer;
/// <summary>监控中心</summary>
public ITracer Tracer
public ITracer? Tracer
{
get
{
@ -305,7 +320,9 @@ public class StarFactory : DisposeBase
private void InitTracer()
{
XTrace.WriteLine("初始化星尘监控中心采样并定期上报应用性能埋点数据包括Api接口、Http请求、数据库操作、Redis操作等。可用于监控系统健康状态分析分布式系统的性能瓶颈。");
if (Server.IsNullOrEmpty()) return;
XTrace.WriteLine("星尘监控中心采样并定期上报应用性能埋点数据包括Api接口、Http请求、数据库操作、Redis操作等。可用于监控系统健康状态分析分布式系统的性能瓶颈。");
var tracer = new StarTracer(Server)
{
@ -329,7 +346,7 @@ public class StarFactory : DisposeBase
/// <remarks>
/// 文档 https://newlifex.com/blood/stardust_configcenter
/// </remarks>
public IConfigProvider Config
public IConfigProvider? Config
{
get
{
@ -337,7 +354,7 @@ public class StarFactory : DisposeBase
{
if (!Valid()) return null;
XTrace.WriteLine("初始化星尘配置中心,提供集中配置管理能力自动从配置中心加载配置数据包括XCode数据库连接。配置中心同时支持分配应用实例的唯一WorkerId确保Snowflake算法能够生成绝对唯一的雪花Id");
XTrace.WriteLine("星尘配置中心:提供集中配置管理能力自动从配置中心加载配置数据包括XCode数据库连接。配置中心同时支持分配应用实例的唯一WorkerId确保Snowflake算法能够生成绝对唯一的雪花Id");
var config = new StarHttpConfigProvider
{
@ -376,13 +393,13 @@ public class StarFactory : DisposeBase
/// <summary>获取复合配置提供者</summary>
/// <returns></returns>
public IConfigProvider GetConfig() => _configProvider ?? Config;
public IConfigProvider? GetConfig() => _configProvider ?? Config;
#endregion
#region
private Boolean _initService;
/// <summary>注册中心,服务注册与发现</summary>
public IRegistry Service
public IRegistry? Service
{
get
{
@ -393,7 +410,7 @@ public class StarFactory : DisposeBase
_initService = true;
//_appClient = _client as AppClient;
XTrace.WriteLine("初始化星尘注册中心,提供服务注册与发布能力");
XTrace.WriteLine("星尘注册中心:提供服务注册与发布能力");
}
return _client;
@ -404,13 +421,13 @@ public class StarFactory : DisposeBase
/// <param name="serviceName"></param>
/// <param name="tag"></param>
/// <returns></returns>
public IApiClient CreateForService(String serviceName, String tag = null) => TaskEx.Run(() => CreateForServiceAsync(serviceName, tag)).Result;
public IApiClient CreateForService(String serviceName, String? tag = null) => TaskEx.Run(() => CreateForServiceAsync(serviceName, tag)).Result;
/// <summary>为指定服务创建客户端,从星尘注册中心获取服务地址。单例,应避免频繁创建客户端</summary>
/// <param name="serviceName"></param>
/// <param name="tag"></param>
/// <returns></returns>
public Task<IApiClient> CreateForServiceAsync(String serviceName, String tag = null) => Service.CreateForServiceAsync(serviceName, tag);
public Task<IApiClient> CreateForServiceAsync(String serviceName, String? tag = null) => Service.CreateForServiceAsync(serviceName, tag);
/// <summary>发布服务</summary>
/// <param name="serviceName">服务名</param>
@ -418,14 +435,14 @@ public class StarFactory : DisposeBase
/// <param name="tag">特性标签</param>
/// <param name="health">健康监测接口地址</param>
/// <returns></returns>
public Task<PublishServiceInfo> RegisterAsync(String serviceName, String address, String tag = null, String health = null) => Service.RegisterAsync(serviceName, address, tag, health);
public Task<PublishServiceInfo> RegisterAsync(String serviceName, String address, String? tag = null, String? health = null) => Service.RegisterAsync(serviceName, address, tag, health);
/// <summary>消费得到服务地址信息</summary>
/// <param name="serviceName">服务名</param>
/// <param name="minVersion">最小版本</param>
/// <param name="tag">特性标签。只要包含该特性的服务提供者</param>
/// <returns></returns>
public Task<String[]> ResolveAddressAsync(String serviceName, String minVersion = null, String tag = null) => Service.ResolveAddressAsync(serviceName, minVersion, tag);
public Task<String[]> ResolveAddressAsync(String serviceName, String? minVersion = null, String? tag = null) => Service.ResolveAddressAsync(serviceName, minVersion, tag);
#endregion
#region
@ -436,7 +453,7 @@ public class StarFactory : DisposeBase
/// <param name="expire"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Int32> SendNodeCommand(String nodeCode, String command, String argument = null, Int32 expire = 3600, Int32 timeout = 5)
public async Task<Int32> SendNodeCommand(String nodeCode, String command, String? argument = null, Int32 expire = 3600, Int32 timeout = 5)
{
if (!Valid()) return -1;
@ -457,7 +474,7 @@ public class StarFactory : DisposeBase
/// <param name="expire"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Int32> SendAppCommand(String appId, String command, String argument = null, Int32 expire = 3600, Int32 timeout = 5)
public async Task<Int32> SendAppCommand(String appId, String command, String? argument = null, Int32 expire = 3600, Int32 timeout = 5)
{
if (!Valid()) return -1;