diff --git a/README.md b/README.md
index 53998c5..972d992 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Click to view the [**test code**](https://github.com/zhufuyi/microservices_frame
- Automated API documentation [swagger](https://github.com/swaggo/swag), [protoc-gen-openapiv2](https://github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2)
- Authentication [jwt](https://github.com/golang-jwt/jwt)
- Websocket [gorilla/websocket](https://github.com/gorilla/websocket)
-- Message Queue [rabbitmq](https://github.com/rabbitmq/amqp091-go)
+- Message Queue [rabbitmq](https://github.com/rabbitmq/amqp091-go), [kafka](https://github.com/IBM/sarama)
- Distributed Transaction Manager [dtm](https://github.com/dtm-labs/dtm)
- Parameter validation [validator](https://github.com/go-playground/validator)
- Adaptive rate limiting [ratelimit](https://github.com/zhufuyi/sponge/tree/main/pkg/shield/ratelimit)
diff --git a/assets/readme-cn.md b/assets/readme-cn.md
index 0d4bb95..8c4a4a4 100644
--- a/assets/readme-cn.md
+++ b/assets/readme-cn.md
@@ -67,7 +67,7 @@ sponge包含丰富的组件(按需使用):
- 鉴权 [jwt](https://github.com/golang-jwt/jwt)
- 校验 [validator](https://github.com/go-playground/validator)
- Websocket [gorilla/websocket](https://github.com/gorilla/websocket)
-- 消息队列组件 [rabbitmq](https://github.com/rabbitmq/amqp091-go)
+- 消息队列组件 [rabbitmq](https://github.com/rabbitmq/amqp091-go), [kafka](https://github.com/IBM/sarama)
- 分布式事务管理器 [dtm](https://github.com/dtm-labs/dtm)
- 自适应限流 [ratelimit](https://github.com/zhufuyi/sponge/tree/main/pkg/shield/ratelimit)
- 自适应熔断 [circuitbreaker](https://github.com/zhufuyi/sponge/tree/main/pkg/shield/circuitbreaker)
@@ -163,7 +163,7 @@ sponge run
### 视频介绍
-> 视频教程演示使用sponge v1.3.12版本,后续的版本增加了一些自动化功能、调整了一些UI界面和菜单,建议结合[文档教程](https://go-sponge.com/zh-cn/)使用。
+> 视频教程演示使用sponge v1.3.12版本,新版本增加了一些自动化功能、调整了一些UI界面和菜单,建议结合[文档教程](https://go-sponge.com/zh-cn/)使用。
- [01 sponge的形成过程](https://www.bilibili.com/video/BV1s14y1F7Fz/)
- [02 sponge的框架介绍](https://www.bilibili.com/video/BV13u4y1F7EU/)
diff --git a/go.mod b/go.mod
index d9cd208..84f13ed 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.19
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
+ github.com/IBM/sarama v1.43.2
github.com/alicebob/miniredis/v2 v2.23.0
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
github.com/bojand/ghz v0.117.0
@@ -17,7 +18,7 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.7.0
github.com/golang-jwt/jwt/v5 v5.0.0
- github.com/golang/snappy v0.0.3
+ github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
@@ -34,7 +35,7 @@ require (
github.com/shirou/gopsutil/v3 v3.23.8
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.12.0
- github.com/stretchr/testify v1.8.4
+ github.com/stretchr/testify v1.9.0
github.com/swaggo/files v0.0.0-20220728132757-551d4a08d97a
github.com/swaggo/gin-swagger v1.5.2
github.com/swaggo/swag v1.8.12
@@ -50,8 +51,8 @@ require (
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/zap v1.24.0
- golang.org/x/crypto v0.21.0
- golang.org/x/sync v0.5.0
+ golang.org/x/crypto v0.22.0
+ golang.org/x/sync v0.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
@@ -92,6 +93,9 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
+ github.com/eapache/go-resiliency v1.6.0 // indirect
+ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
+ github.com/eapache/queue v1.1.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/fatih/color v1.13.0 // indirect
@@ -114,10 +118,13 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/google/uuid v1.4.0 // indirect
+ github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
+ github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
+ github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
@@ -126,6 +133,11 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.4.3 // indirect
+ github.com/jcmturner/aescts/v2 v2.0.0 // indirect
+ github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
+ github.com/jcmturner/gofork v1.7.6 // indirect
+ github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
+ github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jhump/protoreflect v1.15.1 // indirect
github.com/jinzhu/configor v1.2.1 // indirect
github.com/jinzhu/now v1.1.5 // indirect
@@ -133,7 +145,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 // indirect
- github.com/klauspost/compress v1.13.6 // indirect
+ github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
@@ -152,11 +164,13 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
+ github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
+ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
@@ -181,9 +195,9 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.3.0 // indirect
- golang.org/x/net v0.21.0 // indirect
+ golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
- golang.org/x/sys v0.18.0 // indirect
+ golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
golang.org/x/tools v0.10.0 // indirect
diff --git a/go.sum b/go.sum
index 03aee1b..dcf87c3 100644
--- a/go.sum
+++ b/go.sum
@@ -47,6 +47,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
+github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw=
+github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
@@ -143,6 +145,12 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
+github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
+github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
+github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -161,6 +169,7 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -275,8 +284,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
-github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -318,6 +327,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
+github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
@@ -346,6 +357,7 @@ github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iP
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
@@ -353,8 +365,10 @@ github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sL
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
-github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
+github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
@@ -385,6 +399,18 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
+github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
+github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
+github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
+github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
+github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
+github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
+github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
+github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
+github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
+github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
+github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
+github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jinzhu/configor v1.2.1 h1:OKk9dsR8i6HPOCZR8BcMtcEImAFjIhbJFZNyn5GCZko=
@@ -422,8 +448,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
-github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
+github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
@@ -530,6 +556,8 @@ github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -575,6 +603,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -617,8 +647,8 @@ github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiu
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
-github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -630,8 +660,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
-github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w=
@@ -748,8 +779,9 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
-golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
-golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
+golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -835,8 +867,10 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
-golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
-golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
+golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -862,8 +896,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
-golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -943,14 +977,16 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
-golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
+golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20180302201248-b7ef84aaf62a/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -963,6 +999,7 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
diff --git a/pkg/kafka/README.md b/pkg/kafka/README.md
new file mode 100644
index 0000000..1690339
--- /dev/null
+++ b/pkg/kafka/README.md
@@ -0,0 +1,177 @@
+## kafka
+
+`kafka` is a kafka client library based on [sarama](https://github.com/IBM/sarama) encapsulation, producer supports synchronous and asynchronous production messages, consumer supports group and partition consumption messages, fully compatible with the usage of sarama.
+
+
+
+## Example of use
+
+### Producer
+
+#### Synchronous Produce
+
+```go
+package main
+
+import (
+ "fmt"
+ "github.com/IBM/sarama"
+ "github.com/zhufuyi/pkg/kafka"
+)
+
+func main() {
+ testTopic := "my-topic"
+ addrs := []string{"localhost:9092"}
+ // default config are requiredAcks=WaitForAll, partitionerConstructor=NewHashPartitioner, returnSuccesses=true
+ p, err := kafka.InitSyncProducer(addrs, kafka.SyncProducerWithVersion(sarama.V3_6_0_0))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ defer p.Close()
+
+ // Case 1: send sarama.ProducerMessage type message
+ msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/producer_test.go#L18
+ partition, offset, err := p.SendMessage(msg)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ fmt.Println("partition:", partition, "offset:", offset)
+
+ // Case 2: send multiple types message
+ for _, data := range testData {
+ partition, offset, err := p.SendData(testTopic, data)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ fmt.Println("partition:", partition, "offset:", offset)
+ }
+}
+```
+
+
+
+### Asynchronous Produce
+
+```go
+package main
+
+import (
+ "fmt"
+ "time"
+ "github.com/IBM/sarama"
+ "github.com/zhufuyi/pkg/kafka"
+)
+
+func main() {
+ testTopic := "my-topic"
+ addrs := []string{"localhost:9092"}
+
+ p, err := kafka.InitAsyncProducer(addrs,
+ kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
+ kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
+ kafka.AsyncProducerWithFlushMessages(50),
+ kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
+ )
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ defer p.Close()
+
+ // Case 1: send sarama.ProducerMessage type message, supports multiple messages
+ msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/producer_test.go#L18
+ err = p.SendMessage(msg, msg)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ // Case 2: send multiple types message, supports multiple messages
+ err = p.SendData(testTopic, testData...)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ <-time.After(time.Second) // wait for all messages to be sent
+}
+```
+
+
+
+### Consumer
+
+#### Consume Group
+
+```go
+package main
+
+import (
+ "fmt"
+ "time"
+ "github.com/IBM/sarama"
+ "github.com/zhufuyi/pkg/kafka"
+)
+
+func main() {
+ testTopic := "my-topic"
+ groupID := "my-group"
+ addrs := []string{"localhost:9092"}
+
+ // default config are offsetsInitial=OffsetOldest, autoCommitEnable=true, autoCommitInterval=time.Second
+ cg, err := kafka.InitConsumerGroup(addrs, groupID, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ defer cg.Close()
+
+ // Case 1: consume default handle message
+ go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn) // handleMsgFn is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L19
+
+ // Case 2: consume custom handle message
+ go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{ // myConsumerGroupHandler is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L26
+ autoCommitEnable: cg.autoCommitEnable,
+ })
+
+ <-time.After(time.Minute) // wait exit
+}
+```
+
+
+
+#### Consume Partition
+
+```go
+package main
+
+import (
+ "fmt"
+ "github.com/IBM/sarama"
+ "github.com/zhufuyi/pkg/kafka"
+ "time"
+)
+
+func main() {
+ testTopic := "my-topic"
+ addrs := []string{"localhost:9092"}
+
+ c, err := kafka.InitConsumer(addrs, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ defer c.Close()
+
+ // Case 1: consume one partition
+ go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn) // // handleMsgFn is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L19
+
+ // Case 2: consume all partition
+ c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)
+
+ <-time.After(time.Minute) // wait exit
+}
+```
diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go
new file mode 100644
index 0000000..65c40dc
--- /dev/null
+++ b/pkg/kafka/consumer.go
@@ -0,0 +1,227 @@
+package kafka
+
+import (
+ "context"
+
+ "github.com/IBM/sarama"
+ "go.uber.org/zap"
+)
+
+// ---------------------------------- consume group---------------------------------------
+
+// ConsumerGroup consume group
+type ConsumerGroup struct {
+ Group sarama.ConsumerGroup
+ groupID string
+ zapLogger *zap.Logger
+ autoCommitEnable bool
+}
+
+// InitConsumerGroup init consumer group
+func InitConsumerGroup(addrs []string, groupID string, opts ...ConsumerOption) (*ConsumerGroup, error) {
+ o := defaultConsumerOptions()
+ o.apply(opts...)
+
+ var config *sarama.Config
+ if o.config != nil {
+ config = o.config
+ } else {
+ config = sarama.NewConfig()
+ config.Version = o.version
+ config.Consumer.Offsets.Initial = o.offsetsInitial
+ config.Consumer.Offsets.AutoCommit.Enable = o.offsetsAutoCommitEnable
+ config.Consumer.Offsets.AutoCommit.Interval = o.offsetsAutoCommitInterval
+ config.ClientID = o.clientID
+ if o.tlsConfig != nil {
+ config.Net.TLS.Config = o.tlsConfig
+ config.Net.TLS.Enable = true
+ }
+ }
+
+ consumer, err := sarama.NewConsumerGroup(addrs, groupID, config)
+ if err != nil {
+ return nil, err
+ }
+ return &ConsumerGroup{
+ Group: consumer,
+ groupID: groupID,
+ zapLogger: o.zapLogger,
+ autoCommitEnable: config.Consumer.Offsets.AutoCommit.Enable,
+ }, nil
+}
+
+// Consume consume messages
+func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) error {
+ handler := &defaultConsumerHandler{
+ ctx: ctx,
+ handleMessageFn: handleMessageFn,
+ zapLogger: c.zapLogger,
+ autoCommitEnable: c.autoCommitEnable,
+ }
+
+ err := c.Group.Consume(ctx, topics, handler)
+ if err != nil {
+ c.zapLogger.Error("failed to consume messages", zap.String("group_id", c.groupID), zap.Strings("topics", topics), zap.Error(err))
+ return err
+ }
+ return nil
+}
+
+// ConsumeCustom consume messages for custom handler, you need to implement the sarama.ConsumerGroupHandler interface
+func (c *ConsumerGroup) ConsumeCustom(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
+ err := c.Group.Consume(ctx, topics, handler)
+ if err != nil {
+ c.zapLogger.Error("failed to consume messages", zap.String("group_id", c.groupID), zap.Strings("topics", topics), zap.Error(err))
+ return err
+ }
+ return nil
+}
+
+func (c *ConsumerGroup) Close() error {
+ if c == nil || c.Group == nil {
+ return c.Group.Close()
+ }
+ return nil
+}
+
+type defaultConsumerHandler struct {
+ ctx context.Context
+ handleMessageFn HandleMessageFn
+ zapLogger *zap.Logger
+ autoCommitEnable bool
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (h *defaultConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {
+ h.zapLogger.Info("consumer group session [setup]", zap.Any("claims", sess.Claims()))
+ return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (h *defaultConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
+ h.zapLogger.Info("consumer group session [cleanup]", zap.Any("claims", sess.Claims()))
+ return nil
+}
+
+// ConsumeClaim consumes messages
+func (h *defaultConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ defer func() {
+ if e := recover(); e != nil {
+ h.zapLogger.Error("panic occurred while consuming messages", zap.Any("error", e))
+ _ = h.ConsumeClaim(sess, claim)
+ }
+ }()
+
+ for {
+ select {
+ case <-h.ctx.Done():
+ return nil
+ case msg, ok := <-claim.Messages():
+ if !ok {
+ return nil
+ }
+ err := h.handleMessageFn(msg)
+ if err != nil {
+ h.zapLogger.Error("failed to handle message", zap.Error(err))
+ continue
+ }
+ sess.MarkMessage(msg, "")
+ if !h.autoCommitEnable {
+ sess.Commit()
+ }
+ }
+ }
+}
+
+// ---------------------------------- consume partition------------------------------------
+
+// Consumer consume partition
+type Consumer struct {
+ C sarama.Consumer
+ zapLogger *zap.Logger
+}
+
+// InitConsumer init consumer
+func InitConsumer(addrs []string, opts ...ConsumerOption) (*Consumer, error) {
+ o := defaultConsumerOptions()
+ o.apply(opts...)
+
+ var config *sarama.Config
+ if o.config != nil {
+ config = o.config
+ } else {
+ config = sarama.NewConfig()
+ config.Version = o.version
+ config.Consumer.Return.Errors = true
+ config.ClientID = o.clientID
+ if o.tlsConfig != nil {
+ config.Net.TLS.Config = o.tlsConfig
+ config.Net.TLS.Enable = true
+ }
+ }
+
+ consumer, err := sarama.NewConsumer(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Consumer{
+ C: consumer,
+ zapLogger: o.zapLogger,
+ }, nil
+}
+
+// ConsumePartition consumer one partition, blocking
+func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handleFn HandleMessageFn) {
+ defer func() {
+ if e := recover(); e != nil {
+ c.zapLogger.Error("panic occurred while consuming messages", zap.Any("error", e))
+ c.ConsumePartition(ctx, topic, partition, offset, handleFn)
+ }
+ }()
+
+ pc, err := c.C.ConsumePartition(topic, partition, offset)
+ if err != nil {
+ c.zapLogger.Error("failed to create partition consumer", zap.Error(err), zap.String("topic", topic), zap.Int32("partition", partition))
+ return
+ }
+
+ c.zapLogger.Info("start consuming partition", zap.String("topic", topic), zap.Int32("partition", partition), zap.Int64("offset", offset))
+
+ for {
+ select {
+ case msg := <-pc.Messages():
+ err = handleFn(msg)
+ if err != nil {
+ c.zapLogger.Warn("failed to handle message", zap.Error(err), zap.String("topic", topic), zap.Int32("partition", partition), zap.Int64("offset", msg.Offset))
+ }
+ case err := <-pc.Errors():
+ c.zapLogger.Error("partition consumer error", zap.Any("err", err))
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// ConsumeAllPartition consumer all partitions, no blocking
+func (c *Consumer) ConsumeAllPartition(ctx context.Context, topic string, offset int64, handleFn HandleMessageFn) {
+ partitionList, err := c.C.Partitions(topic)
+ if err != nil {
+ c.zapLogger.Error("failed to get partition", zap.Error(err))
+ return
+ }
+
+ for _, partition := range partitionList {
+ go func(partition int32, offset int64) {
+ c.ConsumePartition(ctx, topic, partition, offset, handleFn)
+ }(partition, offset)
+ }
+}
+
+// Close the consumer
+func (c *Consumer) Close() error {
+ if c == nil || c.C == nil {
+ return c.C.Close()
+ }
+ return nil
+}
diff --git a/pkg/kafka/consumer_option.go b/pkg/kafka/consumer_option.go
new file mode 100644
index 0000000..0bdc763
--- /dev/null
+++ b/pkg/kafka/consumer_option.go
@@ -0,0 +1,113 @@
+package kafka
+
+import (
+ "crypto/tls"
+ "fmt"
+ "time"
+
+ "github.com/IBM/sarama"
+ "go.uber.org/zap"
+)
+
+// HandleMessageFn is a function that handles a message from a partition consumer
+type HandleMessageFn func(msg *sarama.ConsumerMessage) error
+
+// ConsumerOption set options.
+type ConsumerOption func(*consumerOptions)
+
+type consumerOptions struct {
+ version sarama.KafkaVersion // default V2_1_0_0
+ clientID string // default "sarama"
+ tlsConfig *tls.Config // default nil
+
+ // consumer group options
+ offsetsInitial int64 // default OffsetOldest
+ offsetsAutoCommitEnable bool // default true
+ offsetsAutoCommitInterval time.Duration // default 1s, when offsetsAutoCommitEnable is true
+
+ // custom config, if not nil, it will override the default config, the above parameters are invalid
+ config *sarama.Config // default nil
+
+ zapLogger *zap.Logger // default NewProduction
+}
+
+func (o *consumerOptions) apply(opts ...ConsumerOption) {
+ for _, opt := range opts {
+ opt(o)
+ }
+}
+
+func defaultConsumerOptions() *consumerOptions {
+ zapLogger, _ := zap.NewProduction()
+ return &consumerOptions{
+ version: sarama.V2_1_0_0,
+ offsetsInitial: sarama.OffsetOldest,
+ offsetsAutoCommitEnable: true,
+ offsetsAutoCommitInterval: time.Second,
+ clientID: "sarama",
+ zapLogger: zapLogger,
+ }
+}
+
+// ConsumerWithVersion set kafka version.
+func ConsumerWithVersion(version sarama.KafkaVersion) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.version = version
+ }
+}
+
+// ConsumerWithOffsetsInitial set offsetsInitial.
+func ConsumerWithOffsetsInitial(offsetsInitial int64) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.offsetsInitial = offsetsInitial
+ }
+}
+
+// ConsumerWithOffsetsAutoCommitEnable set offsetsAutoCommitEnable.
+func ConsumerWithOffsetsAutoCommitEnable(offsetsAutoCommitEnable bool) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.offsetsAutoCommitEnable = offsetsAutoCommitEnable
+ }
+}
+
+// ConsumerWithOffsetsAutoCommitInterval set offsetsAutoCommitInterval.
+func ConsumerWithOffsetsAutoCommitInterval(offsetsAutoCommitInterval time.Duration) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.offsetsAutoCommitInterval = offsetsAutoCommitInterval
+ }
+}
+
+// ConsumerWithClientID set clientID.
+func ConsumerWithClientID(clientID string) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.clientID = clientID
+ }
+}
+
+// ConsumerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by
+// the server and any host name in that certificate.
+func ConsumerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) ConsumerOption {
+ return func(o *consumerOptions) {
+ var err error
+ o.tlsConfig, err = getTLSConfig(certFile, keyFile, caFile, isSkipVerify)
+ if err != nil {
+ fmt.Println("ConsumerWithTLS error:", err)
+ }
+ }
+}
+
+// ConsumerWithZapLogger set zapLogger.
+func ConsumerWithZapLogger(zapLogger *zap.Logger) ConsumerOption {
+ return func(o *consumerOptions) {
+ if zapLogger != nil {
+ o.zapLogger = zapLogger
+ }
+ }
+}
+
+// ConsumerWithConfig set custom config.
+func ConsumerWithConfig(config *sarama.Config) ConsumerOption {
+ return func(o *consumerOptions) {
+ o.config = config
+ }
+}
diff --git a/pkg/kafka/consumer_test.go b/pkg/kafka/consumer_test.go
new file mode 100644
index 0000000..4dd2ca0
--- /dev/null
+++ b/pkg/kafka/consumer_test.go
@@ -0,0 +1,328 @@
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "go.uber.org/zap"
+
+ "github.com/zhufuyi/sponge/pkg/grpc/gtls/certfile"
+)
+
+var (
+ groupID = "my-group"
+ waitTime = time.Second * 10
+
+ handleMsgFn = func(msg *sarama.ConsumerMessage) error {
+ fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
+ msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
+ return nil
+ }
+)
+
+type myConsumerGroupHandler struct {
+ autoCommitEnable bool
+}
+
+func (h *myConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *myConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *myConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for msg := range claim.Messages() {
+ fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
+ msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
+ session.MarkMessage(msg, "")
+ if !h.autoCommitEnable {
+ session.Commit()
+ }
+ }
+ return nil
+}
+
+func TestInitConsumerGroup(t *testing.T) {
+ // Test InitConsumerGroup default options
+ cg, err := InitConsumerGroup(addrs, groupID)
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitConsumerGroup with options
+ cg, err = InitConsumerGroup(addrs, groupID,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithClientID("my-client-id"),
+ ConsumerWithOffsetsInitial(sarama.OffsetOldest),
+ ConsumerWithOffsetsAutoCommitEnable(true),
+ ConsumerWithOffsetsAutoCommitInterval(time.Second),
+ ConsumerWithTLS(certfile.Path("two-way/server/server.pem"), certfile.Path("two-way/server/server.key"), certfile.Path("two-way/ca.pem"), true),
+ ConsumerWithZapLogger(zap.NewNop()),
+ )
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitConsumerGroup custom options
+ config := sarama.NewConfig()
+ config.Producer.Return.Successes = true
+ cg, err = InitConsumerGroup(addrs, groupID, ConsumerWithConfig(config))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+
+ time.Sleep(time.Second)
+ _ = cg.Close()
+}
+
+func TestConsumerGroup_Consume(t *testing.T) {
+ cg, err := InitConsumerGroup(addrs, groupID,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithOffsetsInitial(sarama.OffsetOldest),
+ ConsumerWithOffsetsAutoCommitEnable(true),
+ ConsumerWithOffsetsAutoCommitInterval(time.Second),
+ )
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer cg.Close()
+
+ go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn)
+
+ <-time.After(waitTime)
+}
+
+func TestConsumerGroup_ConsumeCustom(t *testing.T) {
+ cg, err := InitConsumerGroup(addrs, groupID,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithOffsetsAutoCommitEnable(false),
+ )
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer cg.Close()
+
+ cgh := &myConsumerGroupHandler{autoCommitEnable: cg.autoCommitEnable}
+ go cg.ConsumeCustom(context.Background(), []string{testTopic}, cgh)
+
+ <-time.After(waitTime)
+}
+
+func TestInitConsumer(t *testing.T) {
+ // Test InitConsumer default options
+ c, err := InitConsumer(addrs)
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitConsumer with options
+ c, err = InitConsumer(addrs,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithClientID("my-client-id"),
+ ConsumerWithTLS(certfile.Path("two-way/server/server.pem"), certfile.Path("two-way/server/server.key"), certfile.Path("two-way/ca.pem"), true),
+ ConsumerWithZapLogger(zap.NewNop()),
+ )
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitConsumer custom options
+ config := sarama.NewConfig()
+ config.Producer.Return.Successes = true
+ c, err = InitConsumer(addrs, ConsumerWithConfig(config))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+
+ time.Sleep(time.Second)
+ _ = c.Close()
+}
+
+func TestConsumer_ConsumePartition(t *testing.T) {
+ c, err := InitConsumer(addrs,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithClientID("my-client-id"),
+ )
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer c.Close()
+
+ go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn)
+
+ <-time.After(waitTime)
+}
+
+func TestConsumer_ConsumeAllPartition(t *testing.T) {
+ c, err := InitConsumer(addrs,
+ ConsumerWithVersion(sarama.V3_6_0_0),
+ ConsumerWithClientID("my-client-id"),
+ )
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer c.Close()
+
+ c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)
+
+ <-time.After(waitTime)
+}
+
+func TestConsumerGroup(t *testing.T) {
+ var (
+ myTopic = "my-topic"
+ myGroup = "my_group"
+ )
+
+ broker0 := sarama.NewMockBroker(t, 0)
+ defer broker0.Close()
+
+ mockData := map[string]sarama.MockResponse{
+ "MetadataRequest": sarama.NewMockMetadataResponse(t).
+ SetBroker(broker0.Addr(), broker0.BrokerID()).
+ SetLeader(myTopic, 0, broker0.BrokerID()),
+ "OffsetRequest": sarama.NewMockOffsetResponse(t).
+ SetOffset(myTopic, 0, sarama.OffsetOldest, 0).
+ SetOffset(myTopic, 0, sarama.OffsetNewest, 1),
+ "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
+ SetCoordinator(sarama.CoordinatorGroup, myGroup, broker0),
+ "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t),
+ "JoinGroupRequest": sarama.NewMockSequence(
+ sarama.NewMockJoinGroupResponse(t).SetError(sarama.ErrOffsetsLoadInProgress),
+ sarama.NewMockJoinGroupResponse(t).SetGroupProtocol(sarama.RangeBalanceStrategyName),
+ ),
+ "SyncGroupRequest": sarama.NewMockSequence(
+ sarama.NewMockSyncGroupResponse(t).SetError(sarama.ErrOffsetsLoadInProgress),
+ sarama.NewMockSyncGroupResponse(t).SetMemberAssignment(
+ &sarama.ConsumerGroupMemberAssignment{
+ Version: 0,
+ Topics: map[string][]int32{
+ myTopic: {0},
+ },
+ }),
+ ),
+ "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t).SetOffset(
+ myGroup, myTopic, 0, 0, "", sarama.ErrNoError,
+ ).SetError(sarama.ErrNoError),
+ "FetchRequest": sarama.NewMockSequence(
+ sarama.NewMockFetchResponse(t, 1).
+ SetMessage(myTopic, 0, 0, sarama.StringEncoder("foo")).
+ SetMessage(myTopic, 0, 1, sarama.StringEncoder("bar")),
+ sarama.NewMockFetchResponse(t, 1),
+ ),
+ }
+
+ broker0.SetHandlerByMap(mockData)
+
+ config := sarama.NewConfig()
+ config.ClientID = t.Name()
+ config.Version = sarama.V2_0_0_0
+ config.Consumer.Return.Errors = true
+ config.Consumer.Group.Rebalance.Retry.Max = 2
+ config.Consumer.Group.Rebalance.Retry.Backoff = 0
+ config.Consumer.Offsets.AutoCommit.Enable = false
+ group, err := sarama.NewConsumerGroup([]string{broker0.Addr()}, myGroup, config)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ topics := []string{myTopic}
+ g := &ConsumerGroup{
+ Group: group,
+ groupID: myGroup,
+ zapLogger: zap.NewExample(),
+ autoCommitEnable: false,
+ }
+ defer g.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go g.Consume(ctx, topics, handleMsgFn)
+
+ <-time.After(time.Second)
+
+ broker0.SetHandlerByMap(mockData)
+ group, err = sarama.NewConsumerGroup([]string{broker0.Addr()}, myGroup, config)
+ if err != nil {
+ t.Fatal(err)
+ }
+ g.Group = group
+ go g.ConsumeCustom(ctx, topics, &defaultConsumerHandler{
+ ctx: ctx,
+ handleMessageFn: handleMsgFn,
+ zapLogger: g.zapLogger,
+ autoCommitEnable: g.autoCommitEnable,
+ })
+
+ <-time.After(time.Second)
+ cancel()
+}
+
+func TestConsumerPartition(t *testing.T) {
+ myTopic := "my-topic"
+ testMsg := sarama.StringEncoder("Foo")
+ broker0 := sarama.NewMockBroker(t, 0)
+
+ manualOffset := int64(1234)
+ offsetNewest := int64(2345)
+ offsetNewestAfterFetchRequest := int64(3456)
+
+ mockFetchResponse := sarama.NewMockFetchResponse(t, 1)
+
+ mockFetchResponse.SetMessage(myTopic, 0, manualOffset-1, testMsg)
+
+ for i := int64(0); i < 10; i++ {
+ mockFetchResponse.SetMessage(myTopic, 0, i+manualOffset, testMsg)
+ }
+
+ mockFetchResponse.SetHighWaterMark(myTopic, 0, offsetNewestAfterFetchRequest)
+
+ mockData := map[string]sarama.MockResponse{
+ "MetadataRequest": sarama.NewMockMetadataResponse(t).
+ SetBroker(broker0.Addr(), broker0.BrokerID()).
+ SetLeader(myTopic, 0, broker0.BrokerID()),
+ "OffsetRequest": sarama.NewMockOffsetResponse(t).
+ SetOffset(myTopic, 0, sarama.OffsetOldest, 0).
+ SetOffset(myTopic, 0, sarama.OffsetNewest, offsetNewest),
+ "FetchRequest": mockFetchResponse,
+ }
+ broker0.SetHandlerByMap(mockData)
+
+ master, err := sarama.NewConsumer([]string{broker0.Addr()}, sarama.NewConfig())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ c := &Consumer{
+ C: master,
+ zapLogger: zap.NewExample(),
+ }
+ defer c.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go c.ConsumePartition(ctx, myTopic, 0, manualOffset, handleMsgFn)
+ <-time.After(time.Second)
+
+ broker0.SetHandlerByMap(mockData)
+ master, err = sarama.NewConsumer([]string{broker0.Addr()}, sarama.NewConfig())
+ if err != nil {
+ t.Fatal(err)
+ }
+ c.C = master
+ go c.ConsumeAllPartition(ctx, myTopic, offsetNewest, handleMsgFn)
+ <-time.After(time.Second)
+
+ cancel()
+}
diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go
new file mode 100644
index 0000000..5c440d3
--- /dev/null
+++ b/pkg/kafka/producer.go
@@ -0,0 +1,215 @@
+// Package kafka is a kafka client package.
+package kafka
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/IBM/sarama"
+ "go.uber.org/zap"
+)
+
+// ProducerMessage is sarama ProducerMessage
+type ProducerMessage = sarama.ProducerMessage
+
+// ---------------------------------- sync producer ---------------------------------------
+
+// SyncProducer is a sync producer.
+type SyncProducer struct {
+ Producer sarama.SyncProducer
+}
+
+// InitSyncProducer init sync producer.
+func InitSyncProducer(addrs []string, opts ...SyncProducerOption) (*SyncProducer, error) {
+ o := defaultSyncProducerOptions()
+ o.apply(opts...)
+
+ var config *sarama.Config
+ if o.config != nil {
+ config = o.config
+ } else {
+ config = sarama.NewConfig()
+ config.Version = o.version
+ config.Producer.RequiredAcks = o.requiredAcks
+ config.Producer.Partitioner = o.partitioner
+ config.Producer.Return.Successes = o.returnSuccesses
+ config.ClientID = o.clientID
+ if o.tlsConfig != nil {
+ config.Net.TLS.Config = o.tlsConfig
+ config.Net.TLS.Enable = true
+ }
+ }
+
+ producer, err := sarama.NewSyncProducer(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+
+ return &SyncProducer{Producer: producer}, nil
+}
+
+// SendMessage sends a message to a topic.
+func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error) {
+ return p.Producer.SendMessage(msg)
+}
+
+// SendData sends a message to a topic with multiple types of data.
+func (p *SyncProducer) SendData(topic string, data interface{}) (int32, int64, error) {
+ var msg *sarama.ProducerMessage
+ switch val := data.(type) {
+ case *sarama.ProducerMessage:
+ msg = val
+ case []byte:
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(val)}
+ case string:
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(val)}
+ case *Message:
+ msg = &sarama.ProducerMessage{Topic: val.Topic, Value: sarama.ByteEncoder(val.Data), Key: sarama.ByteEncoder(val.Key)}
+ default:
+ buf, err := json.Marshal(data)
+ if err != nil {
+ return 0, 0, err
+ }
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(buf)}
+ }
+
+ return p.Producer.SendMessage(msg)
+}
+
+// Close closes the producer.
+func (p *SyncProducer) Close() error {
+ if p.Producer != nil {
+ return p.Producer.Close()
+ }
+ return nil
+}
+
+// Message is a message to be sent to a topic.
+type Message struct {
+ Topic string `json:"topic"`
+ Data []byte `json:"data"`
+ Key []byte `json:"key"`
+}
+
+// ---------------------------------- async producer ---------------------------------------
+
+// AsyncProducer is async producer.
+type AsyncProducer struct {
+ Producer sarama.AsyncProducer
+ zapLogger *zap.Logger
+ exit chan struct{}
+}
+
+// InitAsyncProducer init async producer.
+func InitAsyncProducer(addrs []string, opts ...AsyncProducerOption) (*AsyncProducer, error) {
+ o := defaultAsyncProducerOptions()
+ o.apply(opts...)
+
+ var config *sarama.Config
+ if o.config != nil {
+ config = o.config
+ } else {
+ config = sarama.NewConfig()
+ config.Version = o.version
+ config.Producer.RequiredAcks = o.requiredAcks
+ config.Producer.Partitioner = o.partitioner
+ config.Producer.Return.Successes = o.returnSuccesses
+ config.ClientID = o.clientID
+ config.Producer.Flush.Messages = o.flushMessages
+ config.Producer.Flush.Frequency = o.flushFrequency
+ config.Producer.Flush.Bytes = o.flushBytes
+ if o.tlsConfig != nil {
+ config.Net.TLS.Config = o.tlsConfig
+ config.Net.TLS.Enable = true
+ }
+ }
+
+ producer, err := sarama.NewAsyncProducer(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+
+ p := &AsyncProducer{
+ Producer: producer,
+ zapLogger: o.zapLogger,
+ exit: make(chan struct{}),
+ }
+
+ go p.handleResponse(o.handleFailedFn)
+
+ return p, nil
+}
+
+// SendMessage sends messages to a topic.
+func (p *AsyncProducer) SendMessage(messages ...*sarama.ProducerMessage) error {
+ for _, msg := range messages {
+ select {
+ case p.Producer.Input() <- msg:
+ case <-p.exit:
+ return fmt.Errorf("async produce message had exited")
+ }
+ }
+
+ return nil
+}
+
+// SendData sends messages to a topic with multiple types of data.
+func (p *AsyncProducer) SendData(topic string, multiData ...interface{}) error {
+ var messages []*sarama.ProducerMessage
+
+ for _, data := range multiData {
+ var msg *sarama.ProducerMessage
+ switch val := data.(type) {
+ case *sarama.ProducerMessage:
+ msg = val
+ case []byte:
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(val)}
+ case string:
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(val)}
+ case *Message:
+ msg = &sarama.ProducerMessage{Topic: val.Topic, Value: sarama.ByteEncoder(val.Data), Key: sarama.ByteEncoder(val.Key)}
+ default:
+ buf, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ msg = &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(buf)}
+ }
+ messages = append(messages, msg)
+ }
+
+ return p.SendMessage(messages...)
+}
+
+// handleResponse handles the response of async producer, if producer message failed, you can handle it, e.g. add to other queue to handle later.
+func (p *AsyncProducer) handleResponse(handleFn AsyncSendFailedHandlerFn) {
+ for {
+ select {
+ case pm := <-p.Producer.Successes():
+ p.zapLogger.Info("async send successfully",
+ zap.String("topic", pm.Topic),
+ zap.Int32("partition", pm.Partition),
+ zap.Int64("offset", pm.Offset))
+ case err := <-p.Producer.Errors():
+ p.zapLogger.Error("async send failed", zap.Error(err.Err), zap.Any("msg", err.Msg))
+ if handleFn != nil {
+ e := handleFn(err.Msg)
+ if e != nil {
+ p.zapLogger.Error("handle failed msg failed", zap.Error(e))
+ }
+ }
+ case <-p.exit:
+ return
+ }
+ }
+}
+
+// Close closes the producer.
+func (p *AsyncProducer) Close() error {
+ defer func() { _ = recover() }() // ignore error
+ close(p.exit)
+ if p.Producer != nil {
+ return p.Producer.Close()
+ }
+ return nil
+}
diff --git a/pkg/kafka/producer_option.go b/pkg/kafka/producer_option.go
new file mode 100644
index 0000000..9336f3a
--- /dev/null
+++ b/pkg/kafka/producer_option.go
@@ -0,0 +1,257 @@
+package kafka
+
+import (
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/IBM/sarama"
+ "go.uber.org/zap"
+)
+
+// -------------------------------------- sync producer ------------------------------------
+
+// SyncProducerOption set options.
+type SyncProducerOption func(*syncProducerOptions)
+
+type syncProducerOptions struct {
+ version sarama.KafkaVersion // default V2_1_0_0
+ requiredAcks sarama.RequiredAcks // default WaitForAll
+ partitioner sarama.PartitionerConstructor // default NewHashPartitioner
+ returnSuccesses bool // default true
+ clientID string // default "sarama"
+ tlsConfig *tls.Config // default nil
+
+ // custom config, if not nil, it will override the default config, the above parameters are invalid
+ config *sarama.Config // default nil
+}
+
+func (o *syncProducerOptions) apply(opts ...SyncProducerOption) {
+ for _, opt := range opts {
+ opt(o)
+ }
+}
+
+func defaultSyncProducerOptions() *syncProducerOptions {
+ return &syncProducerOptions{
+ version: sarama.V2_1_0_0,
+ requiredAcks: sarama.WaitForAll,
+ partitioner: sarama.NewHashPartitioner,
+ returnSuccesses: true,
+ clientID: "sarama",
+ }
+}
+
+// SyncProducerWithVersion set kafka version.
+func SyncProducerWithVersion(version sarama.KafkaVersion) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.version = version
+ }
+}
+
+// SyncProducerWithRequiredAcks set requiredAcks.
+func SyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.requiredAcks = requiredAcks
+ }
+}
+
+// SyncProducerWithPartitioner set partitioner.
+func SyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.partitioner = partitioner
+ }
+}
+
+// SyncProducerWithReturnSuccesses set returnSuccesses.
+func SyncProducerWithReturnSuccesses(returnSuccesses bool) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.returnSuccesses = returnSuccesses
+ }
+}
+
+// SyncProducerWithClientID set clientID.
+func SyncProducerWithClientID(clientID string) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.clientID = clientID
+ }
+}
+
+// SyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by
+// the server and any host name in that certificate.
+func SyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ var err error
+ o.tlsConfig, err = getTLSConfig(certFile, keyFile, caFile, isSkipVerify)
+ if err != nil {
+ fmt.Println("SyncProducerWithTLS error:", err)
+ }
+ }
+}
+
+// SyncProducerWithConfig set custom config.
+func SyncProducerWithConfig(config *sarama.Config) SyncProducerOption {
+ return func(o *syncProducerOptions) {
+ o.config = config
+ }
+}
+
+// -------------------------------------- async producer -----------------------------------
+
+// AsyncSendFailedHandlerFn is a function that handles failed messages.
+type AsyncSendFailedHandlerFn func(msg *sarama.ProducerMessage) error
+
+// AsyncProducerOption set options.
+type AsyncProducerOption func(*asyncProducerOptions)
+
+type asyncProducerOptions struct {
+ version sarama.KafkaVersion // default V2_1_0_0
+ requiredAcks sarama.RequiredAcks // default WaitForLocal
+ partitioner sarama.PartitionerConstructor // default NewHashPartitioner
+ returnSuccesses bool // default true
+ clientID string // default "sarama"
+ flushMessages int // default 20
+ flushFrequency time.Duration // default 2 second
+ flushBytes int // default 0
+ tlsConfig *tls.Config
+
+ // custom config, if not nil, it will override the default config, the above parameters are invalid
+ config *sarama.Config // default nil
+
+ zapLogger *zap.Logger // default NewProduction
+ handleFailedFn AsyncSendFailedHandlerFn // default nil
+}
+
+func (o *asyncProducerOptions) apply(opts ...AsyncProducerOption) {
+ for _, opt := range opts {
+ opt(o)
+ }
+}
+
+func defaultAsyncProducerOptions() *asyncProducerOptions {
+ zapLogger, _ := zap.NewProduction()
+ return &asyncProducerOptions{
+ version: sarama.V2_1_0_0,
+ requiredAcks: sarama.WaitForLocal,
+ partitioner: sarama.NewHashPartitioner,
+ returnSuccesses: true,
+ clientID: "sarama",
+ flushMessages: 20,
+ flushFrequency: 2 * time.Second,
+ zapLogger: zapLogger,
+ }
+}
+
+// AsyncProducerWithVersion set kafka version.
+func AsyncProducerWithVersion(version sarama.KafkaVersion) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.version = version
+ }
+}
+
+// AsyncProducerWithRequiredAcks set requiredAcks.
+func AsyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.requiredAcks = requiredAcks
+ }
+}
+
+// AsyncProducerWithPartitioner set partitioner.
+func AsyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.partitioner = partitioner
+ }
+}
+
+// AsyncProducerWithReturnSuccesses set returnSuccesses.
+func AsyncProducerWithReturnSuccesses(returnSuccesses bool) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.returnSuccesses = returnSuccesses
+ }
+}
+
+// AsyncProducerWithClientID set clientID.
+func AsyncProducerWithClientID(clientID string) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.clientID = clientID
+ }
+}
+
+// AsyncProducerWithFlushMessages set flushMessages.
+func AsyncProducerWithFlushMessages(flushMessages int) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.flushMessages = flushMessages
+ }
+}
+
+// AsyncProducerWithFlushFrequency set flushFrequency.
+func AsyncProducerWithFlushFrequency(flushFrequency time.Duration) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.flushFrequency = flushFrequency
+ }
+}
+
+// AsyncProducerWithFlushBytes set flushBytes.
+func AsyncProducerWithFlushBytes(flushBytes int) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.flushBytes = flushBytes
+ }
+}
+
+// AsyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by
+// the server and any host name in that certificate.
+func AsyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ var err error
+ o.tlsConfig, err = getTLSConfig(certFile, keyFile, caFile, isSkipVerify)
+ if err != nil {
+ fmt.Println("AsyncProducerWithTLS error:", err)
+ }
+ }
+}
+
+// AsyncProducerWithZapLogger set zapLogger.
+func AsyncProducerWithZapLogger(zapLogger *zap.Logger) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ if zapLogger != nil {
+ o.zapLogger = zapLogger
+ }
+ }
+}
+
+// AsyncProducerWithHandleFailed set handleFailedFn.
+func AsyncProducerWithHandleFailed(handleFailedFn AsyncSendFailedHandlerFn) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.handleFailedFn = handleFailedFn
+ }
+}
+
+// AsyncProducerWithConfig set custom config.
+func AsyncProducerWithConfig(config *sarama.Config) AsyncProducerOption {
+ return func(o *asyncProducerOptions) {
+ o.config = config
+ }
+}
+
+func getTLSConfig(certFile, keyFile, caFile string, isSkipVerify bool) (*tls.Config, error) {
+ cert, err := tls.LoadX509KeyPair(certFile, keyFile)
+ if err != nil {
+ return nil, err
+ }
+
+ caCert, err := os.ReadFile(caFile)
+ if err != nil {
+ return nil, err
+ }
+
+ caCertPool := x509.NewCertPool()
+ caCertPool.AppendCertsFromPEM(caCert)
+
+ return &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ RootCAs: caCertPool,
+ InsecureSkipVerify: isSkipVerify,
+ }, nil
+}
diff --git a/pkg/kafka/producer_test.go b/pkg/kafka/producer_test.go
new file mode 100644
index 0000000..c801491
--- /dev/null
+++ b/pkg/kafka/producer_test.go
@@ -0,0 +1,256 @@
+package kafka
+
+import (
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/IBM/sarama/mocks"
+ "go.uber.org/zap"
+
+ "github.com/zhufuyi/sponge/pkg/grpc/gtls/certfile"
+)
+
+var (
+ addrs = []string{"localhost:9092"}
+ //addrs = []string{"192.168.3.37:33001", "192.168.3.37:33002", "192.168.3.37:33003"}
+ testTopic = "test_topic_1"
+
+ testData = []interface{}{
+ // (1) sarama.ProducerMessage type
+ &sarama.ProducerMessage{
+ Topic: testTopic,
+ Value: sarama.StringEncoder("hello world " + time.Now().String()),
+ },
+
+ // (2) string type
+ "hello world " + time.Now().String(),
+
+ // (3) []byte type
+ []byte("hello world " + time.Now().String()),
+
+ // (4) struct type, supports json.Marshal
+ &struct {
+ Name string `json:"name"`
+ Age int `json:"age"`
+ }{
+ Name: "Alice",
+ Age: 20,
+ },
+
+ // (5) Message type
+ &Message{
+ Topic: testTopic,
+ Data: []byte("hello world " + time.Now().String()),
+ Key: []byte("foobar"),
+ },
+ }
+)
+
+func TestInitSyncProducer(t *testing.T) {
+ // Test InitSyncProducer default options
+ p, err := InitSyncProducer(addrs)
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitSyncProducer with options
+ p, err = InitSyncProducer(addrs,
+ SyncProducerWithVersion(sarama.V3_6_0_0),
+ SyncProducerWithClientID("my-client-id"),
+ SyncProducerWithRequiredAcks(sarama.WaitForLocal),
+ SyncProducerWithPartitioner(sarama.NewRandomPartitioner),
+ SyncProducerWithReturnSuccesses(true),
+ SyncProducerWithTLS(certfile.Path("two-way/server/server.pem"), certfile.Path("two-way/server/server.key"), certfile.Path("two-way/ca.pem"), true),
+ )
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitSyncProducer custom options
+ config := sarama.NewConfig()
+ config.Producer.Return.Successes = true
+ p, err = InitSyncProducer(addrs, SyncProducerWithConfig(config))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+
+ time.Sleep(time.Second)
+ _ = p.Close()
+}
+
+func TestSyncProducer_SendMessage(t *testing.T) {
+ p, err := InitSyncProducer(addrs)
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer p.Close()
+
+ partition, offset, err := p.SendMessage(&sarama.ProducerMessage{
+ Topic: testTopic,
+ Value: sarama.StringEncoder("hello world " + time.Now().String()),
+ })
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ t.Log("partition:", partition, "offset:", offset)
+}
+
+func TestSyncProducer_SendData(t *testing.T) {
+ p, err := InitSyncProducer(addrs)
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer p.Close()
+
+ for _, data := range testData {
+ partition, offset, err := p.SendData(testTopic, data)
+ if err != nil {
+ t.Log(err)
+ continue
+ }
+ t.Log("partition:", partition, "offset:", offset)
+ }
+}
+
+func TestInitAsyncProducer(t *testing.T) {
+ // Test InitAsyncProducer default options
+ p, err := InitAsyncProducer(addrs)
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitAsyncProducer with options
+ p, err = InitAsyncProducer(addrs,
+ AsyncProducerWithVersion(sarama.V3_6_0_0),
+ AsyncProducerWithClientID("my-client-id"),
+ AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
+ AsyncProducerWithPartitioner(sarama.NewRandomPartitioner),
+ AsyncProducerWithReturnSuccesses(true),
+ AsyncProducerWithFlushMessages(100),
+ AsyncProducerWithFlushFrequency(time.Second),
+ AsyncProducerWithFlushBytes(16*1024),
+ AsyncProducerWithTLS(certfile.Path("two-way/server/server.pem"), certfile.Path("two-way/server/server.key"), certfile.Path("two-way/ca.pem"), true),
+ AsyncProducerWithZapLogger(zap.NewExample()),
+ AsyncProducerWithHandleFailed(func(msg *sarama.ProducerMessage) error {
+ t.Logf("handle failed message: %v", msg)
+ return nil
+ }),
+ )
+ if err != nil {
+ t.Log(err)
+ }
+
+ // Test InitAsyncProducer custom options
+ config := sarama.NewConfig()
+ config.Producer.Return.Successes = true
+ p, err = InitAsyncProducer(addrs, AsyncProducerWithConfig(config))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+
+ time.Sleep(time.Second)
+ _ = p.Close()
+}
+
+func TestAsyncProducer_SendMessage(t *testing.T) {
+ p, err := InitAsyncProducer(addrs, AsyncProducerWithFlushFrequency(time.Millisecond*100))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer p.Close()
+
+ msg1 := &sarama.ProducerMessage{
+ Topic: testTopic,
+ Value: sarama.StringEncoder("hello world " + time.Now().String()),
+ }
+ msg2 := &sarama.ProducerMessage{
+ Topic: testTopic,
+ Value: sarama.StringEncoder("foo bar " + time.Now().String()),
+ }
+
+ err = p.SendMessage(msg1, msg2)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ time.Sleep(time.Millisecond * 200) // wait for messages to be sent, and flush them
+}
+
+func TestAsyncProducer_SendData(t *testing.T) {
+ p, err := InitAsyncProducer(addrs, AsyncProducerWithFlushFrequency(time.Millisecond*100))
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ defer p.Close()
+
+ err = p.SendData(testTopic, testData...)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ time.Sleep(time.Millisecond * 200) // wait for messages to be sent, and flush them
+}
+
+func TestSyncProducer(t *testing.T) {
+ sp := mocks.NewSyncProducer(t, nil)
+ sp.ExpectSendMessageAndSucceed()
+ p := &SyncProducer{Producer: sp}
+ defer p.Close()
+
+ msg := testData[0].(*sarama.ProducerMessage)
+ partition, offset, err := p.SendMessage(msg)
+ if err != nil {
+ t.Log(err)
+ } else {
+ t.Log("partition:", partition, "offset:", offset)
+ }
+
+ for _, data := range testData {
+ sp.ExpectSendMessageAndSucceed()
+ p = &SyncProducer{Producer: sp}
+ partition, offset, err := p.SendData(testTopic, data)
+ if err != nil {
+ t.Log(err)
+ continue
+ } else {
+ t.Log("partition:", partition, "offset:", offset)
+ }
+ }
+}
+
+func TestAsyncProducer(t *testing.T) {
+ ap := mocks.NewAsyncProducer(t, nil)
+ ap.ExpectInputAndSucceed()
+ p := &AsyncProducer{Producer: ap, exit: make(chan struct{}), zapLogger: zap.NewExample()}
+ defer p.Close()
+ go p.handleResponse(nil)
+
+ msg := testData[0].(*sarama.ProducerMessage)
+ err := p.SendMessage(msg)
+ if err != nil {
+ t.Log(err)
+ } else {
+ t.Log("send message success")
+ }
+
+ for _, data := range testData {
+ ap.ExpectInputAndSucceed()
+ p.Producer = ap
+ err := p.SendData(testTopic, data)
+ if err != nil {
+ t.Log(err)
+ continue
+ } else {
+ t.Log("send message success")
+ }
+ }
+}