From 7ff2e9bc5981e4b568df691fc44cc12b30ef713d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Fri, 3 Jul 2020 10:28:06 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=9F=E4=BA=A7=E6=B6=88=E6=81=AF=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + NewLife.Redis/RedisStream.cs | 53 +++++++++++++++------ XUnitTest/QueueTests.cs | 1 - XUnitTest/StreamTests.cs | 92 ++++++++++++++++++++++++++++++++++++ XUnitTest/XUnitTest.csproj | 1 + 5 files changed, 133 insertions(+), 15 deletions(-) create mode 100644 XUnitTest/StreamTests.cs diff --git a/.gitignore b/.gitignore index e027049..4b908dd 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ bld/ *.nuspec *.nupkg /BinTest +/BinUnitTest diff --git a/NewLife.Redis/RedisStream.cs b/NewLife.Redis/RedisStream.cs index 7658bd2..7afa674 100644 --- a/NewLife.Redis/RedisStream.cs +++ b/NewLife.Redis/RedisStream.cs @@ -2,15 +2,12 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Text; -using System.Threading.Tasks; -using NewLife.Caching; using NewLife.Collections; namespace NewLife.Caching { /// Redis5.0的Stream数据结果,完整态消息队列,支持多消费组 - public class RedisStream : RedisBase, IProducerConsumer + public class RedisStream : RedisBase, IProducerConsumer { #region 属性 /// 个数 @@ -18,9 +15,6 @@ namespace NewLife.Caching /// 是否为空 public Boolean IsEmpty => Count == 0; - - /// 最小管道阈值,达到该值时使用管道,默认3 - public Int32 MinPipeline { get; set; } = 3; #endregion #region 构造 @@ -34,20 +28,51 @@ namespace NewLife.Caching /// /// /// - public String Add(String value, Int32 maxlen = -1) + public String Add(Object value, Int32 maxlen = -1) { - var cmd = maxlen > 0 ? - $"{Key} maxlen {maxlen} * {value}" : - $"{Key} * {value}"; + if (value == null) throw new ArgumentNullException(nameof(value)); - return Execute(rc => rc.Execute("XADD", $"{Key} * {value}"), true); + var args = new List { Key }; + if (maxlen > 0) + { + args.Add("maxlen"); + args.Add(maxlen); + } + args.Add("*"); + + // 数组和复杂对象字典,分开处理 + if (Type.GetTypeCode(value.GetType()) != TypeCode.Object) + { + //throw new ArgumentOutOfRangeException(nameof(value), "消息体必须是复杂对象!"); + args.Add("__data"); + args.Add(value); + } + else if (value.GetType().IsArray) + { + foreach (var item in (value as Array)) + { + args.Add(item); + } + } + else + { + foreach (var item in value.ToDictionary()) + { + args.Add(item.Key); + args.Add(item.Value); + } + } + + return Execute(rc => rc.Execute("XADD", args.ToArray()), true); } /// 批量生产添加 /// /// - public Int32 Add(IEnumerable values) + Int32 IProducerConsumer.Add(IEnumerable values) { + if (values == null) throw new ArgumentNullException(nameof(values)); + var count = 0; foreach (var item in values) { @@ -111,7 +136,7 @@ namespace NewLife.Caching /// 批量消费获取 /// /// - public IEnumerable Take(Int32 count = 1) => Read(null, count); + public IEnumerable Take(Int32 count = 1) => Read(null, count); /// 创建消费组 /// diff --git a/XUnitTest/QueueTests.cs b/XUnitTest/QueueTests.cs index f1b15f1..483e322 100644 --- a/XUnitTest/QueueTests.cs +++ b/XUnitTest/QueueTests.cs @@ -5,7 +5,6 @@ using System.Threading; using System.Threading.Tasks; using NewLife.Caching; using NewLife.Security; -using NewLife.Threading; using Xunit; namespace XUnitTest diff --git a/XUnitTest/StreamTests.cs b/XUnitTest/StreamTests.cs new file mode 100644 index 0000000..9335740 --- /dev/null +++ b/XUnitTest/StreamTests.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using NewLife.Caching; +using NewLife.Log; +using NewLife.Serialization; +using Xunit; + +namespace XUnitTest +{ + public class StreamTests + { + private FullRedis _redis; + + public StreamTests() + { + //_redis = new FullRedis("127.0.0.1:6379", null, 2); + + var config = ""; + var file = @"config\redis.config"; + if (File.Exists(file)) config = File.ReadAllText(file.GetFullPath())?.Trim(); + if (config.IsNullOrEmpty()) config = "server=127.0.0.1;port=6379;db=3"; + + _redis = new FullRedis(); + _redis.Init(config); +#if DEBUG + _redis.Log = XTrace.Log; +#endif + } + + [Fact] + public void Stream_Normal() + { + var key = "stream_key"; + + // 删除已有 + _redis.Remove(key); + var s = _redis.GetStream(key); + _redis.SetExpire(key, TimeSpan.FromMinutes(60)); + + // 取出个数 + var count = s.Count; + Assert.True(s.IsEmpty); + Assert.Equal(0, count); + + // 添加 + Assert.Throws(() => s.Add(null)); + //Assert.Throws(() => s.Add("name stone age 24")); + //Assert.Throws(() => s.Add(1234)); + + // 基础类型、数组、复杂对象 + s.Add(1234); + s.Add(new Object[] { "name", "bigStone", "age", 24 }); + s.Add(new { name = "smartStone", age = 36 }); + + var queue = s as IProducerConsumer; + var vs = new Object[] { + new { aaa = "1234" }, + new { bbb = "abcd" }, + new { ccc = "新生命团队" }, + new { ddd = "ABEF" } + }; + queue.Add(vs); + + // 对比个数 + var count2 = s.Count; + Assert.False(s.IsEmpty); + Assert.Equal(count + 1 + 1 + 1 + vs.Length, count2); + + // 独立消费 + var vs1 = s.Read(null, 3); + Assert.Equal(3, vs1.Length); + + // 取出来 + var vs2 = s.Take(2).ToArray(); + Assert.Equal(2, vs2.Length); + Assert.Equal(vs[3], vs2[0]); + Assert.Equal(vs[2], vs2[1]); + + var vs3 = s.Take(2).ToArray(); + Assert.Equal(2, vs3.Length); + Assert.Equal(vs[1], vs3[0]); + Assert.Equal(vs[0], vs3[1]); + + // 对比个数 + var count3 = s.Count; + Assert.True(s.IsEmpty); + Assert.Equal(count, count3); + } + } +} \ No newline at end of file diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj index 8a7ee41..ae0efec 100644 --- a/XUnitTest/XUnitTest.csproj +++ b/XUnitTest/XUnitTest.csproj @@ -2,6 +2,7 @@ netcoreapp3.1 + ..\BinUnitTest false