备份还原同步数据时,支持取消令牌

This commit is contained in:
智能大石头 2025-06-23 19:11:49 +08:00
parent 15009b9b56
commit a189fb666e
2 changed files with 82 additions and 36 deletions

View File

@ -11,11 +11,12 @@ public partial class DAL
/// </remarks>
/// <param name="table">数据表</param>
/// <param name="stream">目标数据流</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public Int32 Backup(IDataTable table, Stream stream)
public Int32 Backup(IDataTable table, Stream stream, CancellationToken cancellationToken)
{
var dpk = new DbPackage { Dal = this, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.Backup(table, stream);
return dpk.Backup(table, stream, cancellationToken);
}
/// <summary>备份单表数据到文件</summary>
@ -33,11 +34,12 @@ public partial class DAL
/// <param name="file">zip压缩文件</param>
/// <param name="backupSchema">备份架构</param>
/// <param name="ignoreError">忽略错误,继续恢复下一张表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public Int32 BackupAll(IList<IDataTable> tables, String file, Boolean backupSchema = true, Boolean ignoreError = true)
public Int32 BackupAll(IList<IDataTable> tables, String file, Boolean backupSchema = true, Boolean ignoreError = true, CancellationToken cancellationToken = default)
{
var dpk = new DbPackage { Dal = this, IgnoreError = ignoreError, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.BackupAll(tables, file, backupSchema);
return dpk.BackupAll(tables, file, backupSchema, cancellationToken);
}
#endregion
@ -45,11 +47,12 @@ public partial class DAL
/// <summary>从数据流恢复数据</summary>
/// <param name="stream">数据流</param>
/// <param name="table">数据表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public Int32 Restore(Stream stream, IDataTable table)
public Int32 Restore(Stream stream, IDataTable table, CancellationToken cancellationToken = default)
{
var dpk = new DbPackage { Dal = this, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.Restore(stream, table);
return dpk.Restore(stream, table, cancellationToken);
}
/// <summary>从文件恢复数据</summary>
@ -68,11 +71,12 @@ public partial class DAL
/// <param name="tables">数据表。为空时从压缩包读取xml模型文件</param>
/// <param name="setSchema">是否设置数据表模型,自动建表</param>
/// <param name="ignoreError">忽略错误,继续下一张表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public IDataTable[]? RestoreAll(String file, IDataTable[]? tables = null, Boolean setSchema = true, Boolean ignoreError = true)
public IDataTable[]? RestoreAll(String file, IDataTable[]? tables = null, Boolean setSchema = true, Boolean ignoreError = true, CancellationToken cancellationToken = default)
{
var dpk = new DbPackage { Dal = this, IgnoreError = ignoreError, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.RestoreAll(file, tables, setSchema);
return dpk.RestoreAll(file, tables, setSchema, cancellationToken);
}
#endregion
@ -84,11 +88,12 @@ public partial class DAL
/// <param name="table">数据表</param>
/// <param name="connName">目标连接名</param>
/// <param name="syncSchema">同步架构</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true)
public Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true, CancellationToken cancellationToken = default)
{
var dpk = new DbPackage { Dal = this, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.Sync(table, connName, syncSchema);
return dpk.Sync(table, connName, syncSchema, cancellationToken);
}
/// <summary>备份一批表到另一个库</summary>
@ -96,11 +101,12 @@ public partial class DAL
/// <param name="connName">目标连接名</param>
/// <param name="syncSchema">同步架构</param>
/// <param name="ignoreError">忽略错误,继续下一张表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public IDictionary<String, Int32> SyncAll(IDataTable[] tables, String connName, Boolean syncSchema = true, Boolean ignoreError = true)
public IDictionary<String, Int32> SyncAll(IDataTable[] tables, String connName, Boolean syncSchema = true, Boolean ignoreError = true, CancellationToken cancellationToken = default)
{
var dpk = new DbPackage { Dal = this, IgnoreError = ignoreError, Tracer = Tracer ?? GlobalTracer, Log = XTrace.Log };
return dpk.SyncAll(tables, connName, syncSchema);
return dpk.SyncAll(tables, connName, syncSchema, cancellationToken);
}
#endregion
}

View File

@ -1,7 +1,7 @@
using System.Diagnostics;
using System.IO.Compression;
using System.Text;
using NewLife;
using System.Threading;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
@ -11,9 +11,7 @@ using XCode.Transform;
namespace XCode.DataAccessLayer;
/// <summary>
/// 数据包。数据的备份与恢复
/// </summary>
/// <summary>数据包。数据的备份与恢复</summary>
public class DbPackage
{
#region
@ -69,12 +67,22 @@ public class DbPackage
/// <param name="table">数据表</param>
/// <param name="stream">目标数据流</param>
/// <returns></returns>
public virtual Int32 Backup(IDataTable table, Stream stream)
public virtual Int32 Backup(IDataTable table, Stream stream) => Backup(table, stream, default);
/// <summary>备份单表数据,抽取数据和写入文件双线程</summary>
/// <remarks>
/// 最大支持21亿行
/// </remarks>
/// <param name="table">数据表</param>
/// <param name="stream">目标数据流</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public virtual Int32 Backup(IDataTable table, Stream stream, CancellationToken cancellationToken = default)
{
using var span = Tracer?.NewSpan($"db:{Dal.ConnName}:Backup", table.Name);
// 并行写入文件,提升吞吐
var writeFile = WriteFileCallback?.Invoke() ?? new WriteFileActor { BoundedCapacity = 4, };
using var writeFile = WriteFileCallback?.Invoke() ?? new WriteFileActor { BoundedCapacity = 4, };
writeFile.Stream = stream;
writeFile.TracerParent = span;
@ -90,6 +98,7 @@ public class DbPackage
// 总行数
writeFile.Total = Dal.SelectCount(sb);
WriteLog("备份[{0}/{1}]开始,共[{2:n0}]行,抽取器{3}", table, connName, writeFile.Total, extracer);
writeFile.Start(cancellationToken);
// 临时关闭日志
var old = Dal.Db.ShowSQL;
@ -102,6 +111,8 @@ public class DbPackage
{
foreach (var dt in extracer.Fetch())
{
if (cancellationToken.IsCancellationRequested) break;
var row = extracer.TotalCount;
var count = dt.Rows.Count;
WriteLog("备份[{0}/{1}]数据 {2:n0} + {3:n0}", table, connName, row, count);
@ -229,11 +240,11 @@ public class DbPackage
if (file.EndsWithIgnoreCase(".gz"))
{
using var gs = new GZipStream(fs, CompressionLevel.Optimal, true);
rs = Backup(table, gs);
rs = Backup(table, gs, default);
}
else
{
rs = Backup(table, fs);
rs = Backup(table, fs, default);
}
// 截断文件
@ -246,8 +257,9 @@ public class DbPackage
/// <param name="tables">数据表集合</param>
/// <param name="file">zip压缩文件</param>
/// <param name="backupSchema">备份架构</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public Int32 BackupAll(IList<IDataTable> tables, String file, Boolean backupSchema = true)
public Int32 BackupAll(IList<IDataTable> tables, String file, Boolean backupSchema = true, CancellationToken cancellationToken = default)
{
if (tables == null) throw new ArgumentNullException(nameof(tables));
@ -283,11 +295,13 @@ public class DbPackage
foreach (var item in tables)
{
if (cancellationToken.IsCancellationRequested) break;
try
{
var entry = zip.CreateEntry(item.Name + ".table");
using var ms = entry.Open();
Backup(item, ms);
Backup(item, ms, cancellationToken);
count++;
if (span != null) span.Value = count;
@ -315,18 +329,26 @@ public class DbPackage
/// <param name="stream">数据流</param>
/// <param name="table">数据表</param>
/// <returns></returns>
public virtual Int32 Restore(Stream stream, IDataTable table)
public virtual Int32 Restore(Stream stream, IDataTable table) => Restore(stream, table, default);
/// <summary>从数据流恢复数据</summary>
/// <param name="stream">数据流</param>
/// <param name="table">数据表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual Int32 Restore(Stream stream, IDataTable table, CancellationToken cancellationToken = default)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
if (table == null) throw new ArgumentNullException(nameof(table));
using var span = Tracer?.NewSpan($"db:{Dal.ConnName}:Restore", table.Name);
var writeDb = WriteDbCallback?.Invoke() ?? new WriteDbActor { BoundedCapacity = 4 };
using var writeDb = WriteDbCallback?.Invoke() ?? new WriteDbActor { BoundedCapacity = 4 };
writeDb.Host = this;
writeDb.Dal = Dal;
writeDb.Table = table;
writeDb.TracerParent = span;
writeDb.Start(cancellationToken);
var connName = Dal.ConnName;
@ -367,7 +389,7 @@ public class DbPackage
var row = 0;
var pageSize = BatchSize;
if (pageSize <= 0) pageSize = Dal.GetBatchSize();
while (true)
while (!cancellationToken.IsCancellationRequested)
{
//修复总行数是pageSize的倍数无法退出循环的情况
if (dt.Total == row) break;
@ -442,8 +464,9 @@ public class DbPackage
/// <param name="file">zip压缩文件</param>
/// <param name="tables">数据表。为空时从压缩包读取xml模型文件</param>
/// <param name="setSchema">是否设置数据表模型,自动建表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public IDataTable[]? RestoreAll(String file, IDataTable[]? tables = null, Boolean setSchema = true)
public IDataTable[]? RestoreAll(String file, IDataTable[]? tables = null, Boolean setSchema = true, CancellationToken cancellationToken = default)
{
if (file.IsNullOrEmpty()) throw new ArgumentNullException(nameof(file));
//if (tables == null) throw new ArgumentNullException(nameof(tables));
@ -477,6 +500,8 @@ public class DbPackage
var count = 0;
foreach (var item in tables)
{
if (cancellationToken.IsCancellationRequested) break;
var entry = zip.GetEntry(item.Name + ".table");
if (entry != null && entry.Length > 0)
{
@ -484,7 +509,7 @@ public class DbPackage
{
using var ms = entry.Open();
using var bs = new BufferedStream(ms);
Restore(bs, item);
Restore(bs, item, cancellationToken);
count++;
if (span != null) span.Value = count;
@ -516,7 +541,18 @@ public class DbPackage
/// <param name="connName">目标连接名</param>
/// <param name="syncSchema">同步架构</param>
/// <returns></returns>
public virtual Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true)
public virtual Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true) => Sync(table, connName, syncSchema, default);
/// <summary>同步单表数据</summary>
/// <remarks>
/// 把数据同一张表同步到另一个库
/// </remarks>
/// <param name="table">数据表</param>
/// <param name="connName">目标连接名</param>
/// <param name="syncSchema">同步架构</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true, CancellationToken cancellationToken = default)
{
if (connName.IsNullOrEmpty()) throw new ArgumentNullException(nameof(connName));
if (table == null) throw new ArgumentNullException(nameof(table));
@ -525,11 +561,12 @@ public class DbPackage
var dal = DAL.Create(connName);
var writeDb = WriteDbCallback?.Invoke() ?? new WriteDbActor { BoundedCapacity = 4 };
using var writeDb = WriteDbCallback?.Invoke() ?? new WriteDbActor { BoundedCapacity = 4 };
writeDb.Table = table;
writeDb.Host = this;
writeDb.Dal = dal;
writeDb.TracerParent = span;
writeDb.Start(cancellationToken);
var extracer = CreateExtracterCallback?.Invoke(table) ?? GetExtracter(table);
@ -546,6 +583,8 @@ public class DbPackage
foreach (var dt in extracer.Fetch())
{
if (cancellationToken.IsCancellationRequested) break;
var row = extracer.TotalCount;
var count = dt.Rows.Count;
WriteLog("同步[{0}/{1}]数据 {2:n0} + {3:n0}", table.Name, Dal.ConnName, row, count);
@ -589,8 +628,9 @@ public class DbPackage
/// <param name="tables">表名集合</param>
/// <param name="connName">目标连接名</param>
/// <param name="syncSchema">同步架构</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public IDictionary<String, Int32> SyncAll(IDataTable[] tables, String connName, Boolean syncSchema = true)
public IDictionary<String, Int32> SyncAll(IDataTable[] tables, String connName, Boolean syncSchema = true, CancellationToken cancellationToken = default)
{
if (connName.IsNullOrEmpty()) throw new ArgumentNullException(nameof(connName));
if (tables == null) throw new ArgumentNullException(nameof(tables));
@ -609,9 +649,11 @@ public class DbPackage
var count = 0;
foreach (var item in tables)
{
if (cancellationToken.IsCancellationRequested) break;
try
{
dic[item.Name] = Sync(item, connName, false);
dic[item.Name] = Sync(item, connName, false, cancellationToken);
count++;
if (span != null) span.Value = count;
@ -658,11 +700,9 @@ public class DbPackage
private Boolean _writeHeader;
private String[] _columns = null!;
/// <summary>
/// 开始
/// </summary>
/// <summary>开始</summary>
/// <returns></returns>
public override Task? Start()
protected override Task OnStart(CancellationToken cancellationToken)
{
// 二进制读写器
_Binary = new Binary
@ -672,7 +712,7 @@ public class DbPackage
Stream = Stream,
};
return base.Start();
return base.OnStart(cancellationToken);
}
/// <summary>