bxcodec/goqueue
GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.
๐ GoQueue - Universal Go Message Queue Library
One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.
โจ Why GoQueue?
๐ฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐ง Extensible - Plugin architecture for custom middleware and queue providers
๐ Observable - Built-in logging and middleware support for monitoring
๐ Developer Experience - Intuitive API design with sensible defaults
๐ Table of Contents
- ๐ Quick Start
- ๐ซ Features
- ๐ ๏ธ Installation
- ๐ Basic Usage
- ๐ง Advanced Features
- ๐ฎ Examples
- ๐๏ธ Architecture
- ๐ Documentation
- ๐ค Contributing
- ๐ License
๐ Quick Start
Get up and running in less than 5 minutes:
go get -u github.com/bxcodec/goqueuepackage main
import (
"context"
"log"
"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
"github.com/bxcodec/goqueue/publisher"
"github.com/bxcodec/goqueue/interfaces"
)
func main() {
// Create queue service
queueSvc := goqueue.NewQueueService(
options.WithConsumer(myConsumer),
options.WithPublisher(myPublisher),
options.WithMessageHandler(handleMessage),
)
// Publish a message
queueSvc.Publish(context.Background(), interfaces.Message{
Data: map[string]interface{}{"hello": "world"},
Action: "user.created",
Topic: "users",
})
// Start consuming
queueSvc.Start(context.Background())
}
func handleMessage(ctx context.Context, m interfaces.InboundMessage) error {
log.Printf("Received: %v", m.Data)
return m.Ack(ctx) // Acknowledge successful processing
}๐ซ Features
๐ฏ Core Features
- Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
- Unified API: Consistent interface across all queue providers
- Type Safety: Strongly typed message structures
- Context Support: Full Go context integration for cancellation and timeouts
๐ก๏ธ Reliability & Resilience
- Automatic Retries: Configurable retry mechanisms with exponential backoff
- Dead Letter Queues: Handle failed messages gracefully
- Circuit Breaker: Built-in protection against cascading failures
- Graceful Shutdown: Clean resource cleanup on application termination
๐ง Advanced Capabilities
- Middleware System: Extensible pipeline for message processing
- Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
- Message Routing: Flexible topic and routing key patterns
- Batching: Efficient batch message processing
- Connection Pooling: Optimized connection management
๐ Observability
- Structured Logging: Built-in zerolog integration
- Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
- Tracing Support: OpenTelemetry compatible
- Health Checks: Built-in health check endpoints
๐ ๏ธ Installation
# Install the core library
go get -u github.com/bxcodec/goqueueRequirements
- Go 1.21 or higher
- Message broker (RabbitMQ supported, more coming soon)
๐ Basic Usage
๐ Publisher Example
package main
import (
"context"
"github.com/bxcodec/goqueue/publisher"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// Connect to RabbitMQ
conn, _ := amqp.Dial("amqp://localhost:5672/")
// Create publisher
pub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: conn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("my-service"),
)
// Publish message
err := pub.Publish(context.Background(), interfaces.Message{
Data: map[string]interface{}{"user_id": 123, "action": "signup"},
Action: "user.created",
Topic: "users",
})
if err != nil {
log.Fatal(err)
}
}๐จ Consumer Example
package main
import (
"context"
"github.com/bxcodec/goqueue/consumer"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)
func main() {
// Create consumer
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithQueueName("user-events"),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(10),
)
// Start consuming
cons.Consume(context.Background(), messageHandler, metadata)
}
func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error {
// Process your message
userData := msg.Data.(map[string]interface{})
// Business logic here
if err := processUser(userData); err != nil {
// Retry with exponential backoff
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
}
// Acknowledge successful processing
return msg.Ack(ctx)
}๐ง Advanced Features
๐ Retry Mechanisms
GoQueue provides sophisticated retry mechanisms with multiple strategies:
// Exponential backoff retry
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
// Custom retry logic
return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 {
return retryCount * 2 // Custom delay calculation
})
// Move to dead letter queue after max retries
return msg.MoveToDeadLetterQueue(ctx)๐ Middleware System
Extend functionality with custom middleware:
// Custom logging middleware
func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) error {
start := time.Now()
err := next(ctx, m)
log.Printf("Message processed in %v", time.Since(start))
return err
}
}
}
// Apply middleware
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithMiddlewares(
LoggingMiddleware(),
MetricsMiddleware(),
AuthMiddleware(),
),
)๐๏ธ Configuration Options
Fine-tune your queue behavior:
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithQueueName("high-priority-queue"),
consumerOpts.WithMaxRetryFailedMessage(5),
consumerOpts.WithBatchMessageSize(50),
consumerOpts.WithConsumerID("worker-01"),
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: channel,
ReQueueChannel: requeueChannel,
QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{
Durable: true,
AutoDelete: false,
Exclusive: false,
},
}),
)๐ฎ Examples
๐ Complete Examples
Explore our comprehensive examples:
- Basic Usage - Simple publish/consume example
- With Retries - Advanced retry mechanisms
๐ฐ RabbitMQ Quick Setup
Start RabbitMQ with Docker:
# Clone the repository
git clone https://github.com/bxcodec/goqueue.git
cd goqueue/examples/rabbitmq/basic
# Start RabbitMQ
docker-compose up -d
# Run the example
go run main.go๐ Retry Architecture
Automatic retry mechanism with exponential backoff and dead letter queue
๐๏ธ Architecture
๐ฏ Design Principles
- Interface Segregation: Clean, focused interfaces for different responsibilities
- Dependency Injection: Easy testing and swappable implementations
- Error Handling: Comprehensive error types and recovery mechanisms
- Performance: Optimized for high-throughput scenarios
- Extensibility: Plugin architecture for custom providers and middleware
๐งฉ Core Components
๐ฆ Provider Support
| Provider | Status | Features |
|---|---|---|
| RabbitMQ | ๐ Beta Version | Full feature support |
| Google Pub/Sub | ๐ Planned | Coming soon |
| AWS SQS + SNS | ๐ Planned | Coming soon |
๐ง Configuration
๐ Logging Setup
GoQueue uses structured logging with zerolog:
import "github.com/bxcodec/goqueue"
// Setup basic logging (automatic when importing consumer/publisher)
// OR setup with custom configuration:
goqueue.SetupLoggingWithDefaults() // Pretty console output for development๐งช Testing
Run the test suite:
# Unit tests
make test
# Integration tests with RabbitMQ
make integration-test
๐ Documentation
๐ Component Documentation
Explore our comprehensive guides for each system component:
| Component | Description | Documentation |
|---|---|---|
| ๐ Middleware | Extend functionality with custom logic | ๐ Middleware Guide |
| ๐จ Consumer | Reliable message consumption and processing | ๐ Consumer Guide |
| ๐ค Publisher | High-performance message publishing | ๐ Publisher Guide |
| ๐ RabbitMQ Retry | Advanced retry mechanisms for RabbitMQ | ๐ Retry Architecture |
๐ฏ Quick Links
- ๐ Full Documentation Index - Complete documentation overview
- ๐ง API Reference - Go package documentation
- ๐ฎ Examples - Working code examples
- ๐ Troubleshooting - Common issues and solutions
๐ค Contributing
We welcome contributions! Here's how to get started:
๐ Quick Contribution Guide
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
๐ Development Setup
# Clone your fork
git clone https://github.com/yourusername/goqueue.git
cd goqueue
# Install dependencies
go mod download
# Run tests
make test
# Run linting
make lint
๐ฏ Contribution Areas
- ๐ New Queue Providers (Google Pub/Sub, SQS+SNS)
- ๐ ๏ธ Middleware Components (Metrics, Tracing, Auth)
- ๐ Documentation & Examples
- ๐งช Testing & Benchmarks
- ๐ Bug Fixes & Improvements
๐ Support & Community
- ๐ Documentation: pkg.go.dev/github.com/bxcodec/goqueue
- ๐ Issues: GitHub Issues
- ๐ง Email: iman@tumorang.com
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
- Thanks to all contributors
- Inspired by the Go community's best practices
- Built with โค๏ธ for the Go ecosystem

