rabbitmq supports dead letter queues

This commit is contained in:
zhuyasen 2024-06-10 11:53:52 +08:00
parent 41a3cff4e5
commit 8128806972
9 changed files with 731 additions and 68 deletions

1
go.mod
View File

@ -113,6 +113,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect

2
go.sum
View File

@ -318,6 +318,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=

View File

@ -1,12 +1,14 @@
## rabbitmq
rabbitmq library wrapped in [github.com/rabbitmq/amqp091-go](github.com/rabbitmq/amqp091-go), supports automatic reconnection and customized setting parameters, includes `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` a total of six message types.
rabbitmq library wrapped in [github.com/rabbitmq/amqp091-go](github.com/rabbitmq/amqp091-go), supports automatic reconnection and customized setting parameters, includes `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` a total of six message types, and dead letter is supported.
### Example of use
#### Code Example
The code example includes `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` a total of six message types.
The following code example is including `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` six message types.
> Tip: the wrapped `Consume` function uses manual acknowledgement mode by default and does not need to call the ack function again.
```go
package main
@ -14,26 +16,33 @@ package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
)
var (
producerCount int32
consumerCount int32
)
func main() {
url := "amqp://guest:guest@127.0.0.1:5672/"
directExample(url)
topicExample(url)
//topicExample(url)
fanoutExample(url)
//fanoutExample(url)
headersExample(url)
//headersExample(url)
delayedMessageExample(url)
//delayedMessageExample(url)
publisherSubscriberExample(url)
//publisherSubscriberExample(url)
}
func directExample(url string) {
@ -41,10 +50,11 @@ func directExample(url string) {
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- direct --------------------\n")
// producer-side direct message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -52,17 +62,24 @@ func directExample(url string) {
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
err = p.PublishDirect(context.Background(), []byte("[direct] say hello"))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
err = p.PublishDirect(context.Background(), []byte("[direct] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side direct message
func() {
runConsume(url, exchange, queueName)
}()
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func topicExample(url string) {
@ -70,10 +87,11 @@ func topicExample(url string) {
queueName := "topic-queue-1"
routingKey := "key1.key2.*"
exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- topic --------------------\n")
// producer-side topic message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -81,28 +99,36 @@ func topicExample(url string) {
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
key := "key1.key2.key3"
err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" say hello"))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
key := "key1.key2.key" + strconv.Itoa(i)
err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side topic message
func() {
runConsume(url, exchange, queueName)
}()
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func fanoutExample(url string) {
exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1"
exchange := rabbitmq.NewFanoutExchange(exchangeName)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- fanout --------------------\n")
// producer-side fanout message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -110,19 +136,28 @@ func fanoutExample(url string) {
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
err = p.PublishFanout(context.Background(), []byte("[fanout] say hello"))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side fanout message
func() {
runConsume(url, exchange, queueName)
{
queueName = "fanout-queue-1"
c1 := runConsume(url, exchange, queueName, queueArgs)
queueName = "fanout-queue-2"
runConsume(url, exchange, queueName)
}()
c2 := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
fmt.Println("\n\nconsumer 2 count:", c2.Count())
}
printStat()
}
func headersExample(url string) {
@ -130,10 +165,11 @@ func headersExample(url string) {
queueName := "headers-queue-1"
headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- headers --------------------\n")
// producer-side headers message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -141,22 +177,31 @@ func headersExample(url string) {
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] say hello 1"))
checkErr(err)
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] say hello 2"))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] key1 message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
// because of x-match: all, headersKeys2 will not match the same queue, so drop it
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message "+strconv.Itoa(i)))
checkErr(err)
}
}
// consumer-side headers message
func() {
runConsume(url, exchange, queueName)
}()
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func delayedMessageExample(url string) {
@ -164,10 +209,11 @@ func delayedMessageExample(url string) {
queueName := "delayed-message-queue"
routingKey := "delayed-key"
exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- delayed message --------------------\n")
// producer-side delayed message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -175,19 +221,26 @@ func delayedMessageExample(url string) {
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
datetimeLayout := "2006-01-02 15:04:05.000"
err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed message] say hello "+time.Now().Format(datetimeLayout)))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side delayed message
func() {
runConsume(url, exchange, queueName)
}()
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 4)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func publisherSubscriberExample(url string) {
@ -195,7 +248,7 @@ func publisherSubscriberExample(url string) {
fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")
// publisher-side message
func() {
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
@ -204,32 +257,45 @@ func publisherSubscriberExample(url string) {
checkErr(err)
defer p.Close()
err = p.Publish(context.Background(), []byte("[pub-sub] say hello"))
checkErr(err)
}()
for i := 1; i <= 100; i++ {
err = p.Publish(context.Background(), []byte("[pub-sub] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// subscriber-side message
func() {
{
identifier := "pub-sub-queue-1"
runSubscriber(url, channelName, identifier)
s1 := runSubscriber(url, channelName, identifier)
identifier = "pub-sub-queue-2"
runSubscriber(url, channelName, identifier)
}()
s2 := runSubscriber(url, channelName, identifier)
<-time.After(time.Second)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(s1.Count()))
fmt.Println("\n\nsubscriber 2 count:", s2.Count())
}
printStat()
}
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string) {
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
rabbitmq.WithConsumerAutoAck(false),
rabbitmq.WithConsumerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
)
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
func runSubscriber(url string, channelName string, identifier string) {
func runSubscriber(url string, channelName string, identifier string) *rabbitmq.Subscriber {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
@ -237,6 +303,8 @@ func runSubscriber(url string, channelName string, identifier string) {
checkErr(err)
s.Subscribe(context.Background(), handler)
return s
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
@ -249,6 +317,400 @@ func checkErr(err error) {
panic(err)
}
}
func printStat() {
fmt.Println("\n\n-------------------- stat --------------------")
fmt.Println("producer count:", atomic.LoadInt32(&producerCount))
fmt.Println("consumer count:", atomic.LoadInt32(&consumerCount))
fmt.Println("----------------------------------------------\n")
atomic.StoreInt32(&producerCount, 0)
atomic.StoreInt32(&consumerCount, 0)
}
```
<br>
#### Example of Dead Letter
The following example code is in the `direct`, `topic`, `fanout`, `headers`, `delayed message` five message types to add a queue of dead letters, dead letter queue is fixed to `direct` type.
> Tip: the wrapped `Consume` function uses manual acknowledgement mode by default.
```go
package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
)
var (
producerCount int32
consumerCount int32
deadLetterConsumerCount int32
)
func main() {
url := "amqp://guest:guest@127.0.0.1:5672/"
directExample(url)
//topicExample(url)
//fanoutExample(url)
//headersExample(url)
//delayedMessageExample(url)
}
func directExample(url string) {
exchangeName := "direct-exchange-demo-2"
queueName := "direct-queue-2"
routingKey := "direct-key-2"
exchange := rabbitmq.NewDirectExchange(exchangeName, routingKey)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- direct --------------------\n")
// producer-side direct message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs() // get producer queue args
for i := 1; i <= 100; i++ {
err = p.PublishDirect(context.Background(), []byte("[direct] say hello"+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side direct message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func topicExample(url string) {
exchangeName := "topic-exchange-demo-2"
queueName := "topic-queue-2"
routingKey := "dl-key1.key2.*"
exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- topic --------------------\n")
// producer-side topic message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
key := "dl-key1.key2.key" + strconv.Itoa(i)
err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side topic message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func fanoutExample(url string) {
exchangeName := "fanout-exchange-demo-2"
queueName := "fanout-queue-3"
exchange := rabbitmq.NewFanoutExchange(exchangeName)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-direct-key")
fmt.Printf("\n\n-------------------- fanout --------------------\n")
// producer-side fanout message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side fanout message
{
queueName = "fanout-queue-3"
c1 := runConsume(url, exchange, queueName, queueArgs)
queueName = "fanout-queue-4"
c2 := runConsume(url, exchange, queueName, queueArgs)
c3 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&consumerCount, int32(c2.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c3.Count()))
}
printStat()
}
func headersExample(url string) {
exchangeName := "headers-exchange-demo-2"
queueName := "headers-queue-2"
headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-headers-key")
fmt.Printf("\n\n-------------------- headers --------------------\n")
// producer-side headers message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
for i := 1; i <= 100; i++ {
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
// because of x-match: all, headersKeys2 will not match the same queue, so drop it
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message"))
checkErr(err)
}
}
// consumer-side headers message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func delayedMessageExample(url string) {
exchangeName := "delayed-message-exchange-demo-2"
queueName := "delayed-message-queue-2"
routingKey := "delayed-key-2"
exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- delayed message --------------------\n")
// producer-side delayed message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
datetimeLayout := "2006-01-02 15:04:05.000"
for i := 1; i <= 200; i++ {
delayTime := time.Second
if i > 100 {
delayTime = time.Second * 2
}
err = p.PublishDelayedMessage(ctx, delayTime, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side delayed message
{
time.Sleep(time.Second * 3) // wait for all messages to be sent
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 10)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
rabbitmq.WithConsumerAutoAck(false),
rabbitmq.WithConsumerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
)
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
func runConsumeForDeadLetter(url string, exchange *rabbitmq.Exchange, queueName string) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
func printStat() {
fmt.Println("\n\n-------------------- stat --------------------")
fmt.Println("producer count:", producerCount)
fmt.Println("consumer count:", consumerCount)
fmt.Println("dead letter consumer count:", deadLetterConsumerCount)
fmt.Println("----------------------------------------------\n")
atomic.StoreInt32(&producerCount, 0)
atomic.StoreInt32(&consumerCount, 0)
atomic.StoreInt32(&deadLetterConsumerCount, 0)
}
```
<br>

View File

@ -305,3 +305,69 @@ func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) De
o.headersKeys = headersKeys
}
}
// -------------------------------------------------------------------------------------------
// DeadLetterOption declare dead letter option.
type DeadLetterOption func(*deadLetterOptions)
type deadLetterOptions struct {
exchangeName string
queueName string
routingKey string
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
}
func (o *deadLetterOptions) apply(opts ...DeadLetterOption) {
for _, opt := range opts {
opt(o)
}
}
func (o *deadLetterOptions) isEnabled() bool {
if o.exchangeName != "" && o.queueName != "" {
return true
}
return false
}
func defaultDeadLetterOptions() *deadLetterOptions {
return &deadLetterOptions{
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
}
}
// WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithDeadLetterQueueBindOptions set dead letter queue bind option.
func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.queueBind.apply(opts...)
}
}
// WithDeadLetter set dead letter exchange, queue, routing key.
func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption {
return func(o *deadLetterOptions) {
o.exchangeName = exchangeName
o.queueName = queueName
o.routingKey = routingKey
}
}

View File

@ -89,3 +89,22 @@ func TestDelayedMessagePublishOptions(t *testing.T) {
assert.Equal(t, "key1.key2", o.topicKey)
assert.Equal(t, "bar", o.headersKeys["foo"])
}
func TestDelayedMessageConsumeOptions(t *testing.T) {
opts := []DeadLetterOption{
WithDeadLetter("dl-exchange", "dl-queue", "dl-routing-key"),
WithDeadLetterExchangeDeclareOptions(WithExchangeDeclareAutoDelete(false)),
WithDeadLetterQueueDeclareOptions(WithQueueDeclareAutoDelete(false)),
WithDeadLetterQueueBindOptions(WithQueueBindArgs(map[string]interface{}{"foo": "bar"})),
}
o := defaultDeadLetterOptions()
o.apply(opts...)
assert.Equal(t, "dl-exchange", o.exchangeName)
assert.Equal(t, "dl-queue", o.queueName)
assert.Equal(t, "dl-routing-key", o.routingKey)
assert.Equal(t, true, o.isEnabled())
o = defaultDeadLetterOptions()
o.apply()
assert.Equal(t, false, o.isEnabled())
}

View File

@ -4,6 +4,7 @@ import (
"context"
"strconv"
"strings"
"sync/atomic"
"time"
amqp "github.com/rabbitmq/amqp091-go"
@ -79,7 +80,7 @@ func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption {
}
}
// WithConsumerAutoAck set consumer auto ack option.
// WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.
func WithConsumerAutoAck(enable bool) ConsumerOption {
return func(o *consumerOptions) {
o.isAutoAck = enable
@ -233,6 +234,8 @@ type Consumer struct {
isAutoAck bool // auto ack or not
zapLog *zap.Logger
count int64 // consumer success message number
}
// Handler message
@ -424,6 +427,7 @@ func (c *Consumer) Consume(ctx context.Context, handler Handler) {
}
c.zapLog.Info("[rabbitmq consumer] manual ack done", zap.String("tagID", tagID))
}
atomic.AddInt64(&c.count, 1)
}
if isContinueConsume {
@ -440,3 +444,8 @@ func (c *Consumer) Close() {
_ = c.ch.Close()
}
}
// Count consumer success message number
func (c *Consumer) Count() int64 {
return atomic.LoadInt64(&c.count)
}

View File

@ -353,6 +353,7 @@ func TestConsumerErr(t *testing.T) {
t.Log(err)
return
}
t.Log(c.Count())
c.ch = &amqp.Channel{}
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {

View File

@ -16,6 +16,7 @@ type producerOptions struct {
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
deadLetter *deadLetterOptions
isPersistent bool // is it persistent
@ -36,6 +37,7 @@ func defaultProducerOptions() *producerOptions {
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
deadLetter: defaultDeadLetterOptions(),
isPersistent: true,
mandatory: true,
@ -63,6 +65,13 @@ func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption {
}
}
// WithDeadLetterOptions set dead letter options.
func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption {
return func(o *producerOptions) {
o.deadLetter.apply(opts...)
}
}
// WithProducerPersistent set producer persistent option.
func WithProducerPersistent(enable bool) ProducerOption {
return func(o *producerOptions) {
@ -95,6 +104,10 @@ type Producer struct {
mandatory bool
zapLog *zap.Logger
exchangeArgs amqp.Table
queueArgs amqp.Table
queueBindArgs amqp.Table
}
// NewProducer create a producer
@ -133,6 +146,17 @@ func NewProducer(exchange *Exchange, queueName string, connection *Connection, o
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
if o.deadLetter.isEnabled() {
if o.queueDeclare.args == nil {
o.queueDeclare.args = amqp.Table{
"x-dead-letter-exchange": o.deadLetter.exchangeName,
"x-dead-letter-routing-key": o.deadLetter.routingKey,
}
} else {
o.queueDeclare.args["x-dead-letter-exchange"] = o.deadLetter.exchangeName
o.queueDeclare.args["x-dead-letter-routing-key"] = o.deadLetter.routingKey
}
}
q, err := ch.QueueDeclare(
queueName,
o.isPersistent,
@ -163,13 +187,29 @@ func NewProducer(exchange *Exchange, queueName string, connection *Connection, o
return nil, err
}
fields := logFields(queueName, exchange)
fields = append(fields, zap.Bool("isPersistent", o.isPersistent))
// create dead letter exchange and queue if enabled
if o.deadLetter.isEnabled() {
err = createDeadLetter(ch, o.deadLetter)
if err != nil {
_ = ch.Close()
return nil, err
}
fields = append(fields, zap.Any("deadLetter", map[string]string{
"exchange": o.deadLetter.exchangeName,
"queue": o.deadLetter.queueName,
"routingKey": o.deadLetter.routingKey,
"type": exchangeTypeDirect,
}))
}
deliveryMode := amqp.Persistent
if !o.isPersistent {
deliveryMode = amqp.Transient
}
fields := logFields(queueName, exchange)
fields = append(fields, zap.Bool("isPersistent", o.isPersistent))
connection.zapLog.Info("[rabbit producer] initialized", fields...)
return &Producer{
@ -181,6 +221,10 @@ func NewProducer(exchange *Exchange, queueName string, connection *Connection, o
deliveryMode: deliveryMode,
mandatory: o.mandatory,
zapLog: connection.zapLog,
exchangeArgs: o.exchangeDeclare.args,
queueArgs: o.queueDeclare.args,
queueBindArgs: o.queueBind.args,
}, nil
}
@ -307,6 +351,21 @@ func (p *Producer) Close() {
}
}
// ExchangeArgs returns the exchange declare args.
func (p *Producer) ExchangeArgs() amqp.Table {
return p.exchangeArgs
}
// QueueArgs returns the queue declare args.
func (p *Producer) QueueArgs() amqp.Table {
return p.queueArgs
}
// QueueBindArgs returns the queue bind args.
func (p *Producer) QueueBindArgs() amqp.Table {
return p.queueBindArgs
}
func logFields(queueName string, exchange *Exchange) []zap.Field {
fields := []zap.Field{
zap.String("queue", queueName),
@ -329,3 +388,45 @@ func logFields(queueName string, exchange *Exchange) []zap.Field {
}
return fields
}
// -------------------------------------------------------------------------------------------
func createDeadLetter(ch *amqp.Channel, o *deadLetterOptions) error {
// declare the exchange type
err := ch.ExchangeDeclare(
o.exchangeName,
exchangeTypeDirect,
true,
o.exchangeDeclare.autoDelete,
o.exchangeDeclare.internal,
o.exchangeDeclare.noWait,
o.exchangeDeclare.args,
)
if err != nil {
return err
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
q, err := ch.QueueDeclare(
o.queueName,
true,
o.queueDeclare.autoDelete,
o.queueDeclare.exclusive,
o.queueDeclare.noWait,
o.queueDeclare.args,
)
if err != nil {
return err
}
// binding queue and exchange
err = ch.QueueBind(
q.Name,
o.routingKey,
o.exchangeName,
o.queueBind.noWait,
o.queueBind.args,
)
return err
}

View File

@ -32,6 +32,8 @@ func TestProducerOptions(t *testing.T) {
),
WithProducerPersistent(true),
WithProducerMandatory(true),
WithDeadLetterOptions(WithDeadLetter("dl-exchange", "dl-queue", "dl-routing-key")),
}
o := defaultProducerOptions()