All checks were successful
CI / test (push) Successful in 5m21s
- Add godoc documentation.
369 lines
9.3 KiB
Go
369 lines
9.3 KiB
Go
// Package queue provides a simple, type-safe, concurrent FIFO queue
|
||
// with support for parallel handlers, graceful shutdown, optional
|
||
// Dead Letter Queue (DLQ), hooks, and rate limiting.
|
||
package queue
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// QueueHooks defines optional callbacks that are invoked during queue
|
||
// operations. Hooks can be used for metrics, logging, and monitoring.
|
||
//
|
||
// All hooks are invoked best-effort and must be fast and panic-safe in user code.
|
||
//
|
||
// - OnProduced is called after an item is successfully enqueued.
|
||
// - OnHandled is called after a handler returns without panic.
|
||
// - OnDropped is called when an item is intentionally dropped due to
|
||
// a canceled context or a closed queue.
|
||
type QueueHooks[T any] struct {
|
||
OnProduced func()
|
||
OnHandled func()
|
||
OnDropped func(T)
|
||
}
|
||
|
||
// Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel
|
||
// handling, rate limiting and graceful shutdown without data races.
|
||
type Queue[T any] struct {
|
||
ch chan T
|
||
capacity int
|
||
|
||
mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker
|
||
sendMu sync.RWMutex // synchronizes sends to the channel and its closing
|
||
handlers []func(T)
|
||
|
||
errorFunc func(error)
|
||
dlq *Queue[T]
|
||
hooks QueueHooks[T]
|
||
|
||
closeOnce sync.Once
|
||
closed bool
|
||
|
||
wg sync.WaitGroup
|
||
rateTicker *time.Ticker
|
||
}
|
||
|
||
// NewQueue creates a new queue with the given buffer capacity.
|
||
// Capacity must be > 0.
|
||
func NewQueue[T any](capacity int) *Queue[T] {
|
||
if capacity <= 0 {
|
||
panic("queue capacity must be > 0")
|
||
}
|
||
return &Queue[T]{
|
||
ch: make(chan T, capacity),
|
||
capacity: capacity,
|
||
}
|
||
}
|
||
|
||
// Register adds a handler function that will be invoked for each item.
|
||
// Multiple handlers may be registered; each item will be passed to each
|
||
// registered handler in registration order. Handlers must be concurrency-safe.
|
||
func (q *Queue[T]) Register(handler func(T)) {
|
||
if handler == nil {
|
||
panic("handler cannot be nil")
|
||
}
|
||
q.mu.Lock()
|
||
q.handlers = append(q.handlers, handler)
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
// OnError registers an error callback which is invoked when a handler panics.
|
||
// The error contains panic details. Set to nil to disable.
|
||
func (q *Queue[T]) OnError(fn func(error)) {
|
||
q.mu.Lock()
|
||
q.errorFunc = fn
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
// SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic.
|
||
// Items are enqueued to the DLQ in a non-blocking best-effort manner.
|
||
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
|
||
q.mu.Lock()
|
||
q.dlq = dlq
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
// SetHooks installs optional hooks for produced/handled/dropped events.
|
||
// Passing a zero-value struct clears previously set hooks.
|
||
func (q *Queue[T]) SetHooks(h QueueHooks[T]) {
|
||
q.mu.Lock()
|
||
q.hooks = h
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
// SetRateLimit sets a per-queue processing rate limit in requests-per-second.
|
||
// If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any
|
||
// previous ticker and will be applied by active workers.
|
||
func (q *Queue[T]) SetRateLimit(rps int) {
|
||
q.mu.Lock()
|
||
defer q.mu.Unlock()
|
||
// Stop previous ticker if exists
|
||
if q.rateTicker != nil {
|
||
q.rateTicker.Stop()
|
||
q.rateTicker = nil
|
||
}
|
||
if rps > 0 {
|
||
q.rateTicker = time.NewTicker(time.Second / time.Duration(rps))
|
||
}
|
||
}
|
||
|
||
// HandleParallel starts worker goroutines that consume items from the queue
|
||
// and invoke registered handlers. Workers exit when the channel is closed or
|
||
// when the provided context is canceled. If workers <= 0, a single worker
|
||
// is started.
|
||
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
|
||
if workers <= 0 {
|
||
workers = 1
|
||
}
|
||
q.wg.Add(workers)
|
||
for i := 0; i < workers; i++ {
|
||
go func() {
|
||
defer q.wg.Done()
|
||
for {
|
||
select {
|
||
case item, ok := <-q.ch:
|
||
if !ok {
|
||
return
|
||
}
|
||
// copy ticker safely
|
||
q.mu.RLock()
|
||
t := q.rateTicker
|
||
q.mu.RUnlock()
|
||
if t != nil {
|
||
<-t.C
|
||
}
|
||
q.processItem(item)
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// sendBlocking performs a blocking send into the channel (used by Produce).
|
||
// It holds a read lock across the send to prevent Close() from closing the
|
||
// channel concurrently.
|
||
func (q *Queue[T]) sendBlocking(item T) bool {
|
||
// Быстрая проверка закрытия под RLock
|
||
q.sendMu.RLock()
|
||
q.mu.RLock()
|
||
closed := q.closed
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
if closed {
|
||
q.sendMu.RUnlock()
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
defer q.sendMu.RUnlock()
|
||
|
||
defer func() { _ = recover() }() // guard against external races/panics
|
||
q.ch <- item
|
||
if hooks.OnProduced != nil {
|
||
hooks.OnProduced()
|
||
}
|
||
return true
|
||
}
|
||
|
||
// sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ).
|
||
func (q *Queue[T]) sendNonBlocking(item T) bool {
|
||
q.sendMu.RLock()
|
||
q.mu.RLock()
|
||
closed := q.closed
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
if closed {
|
||
q.sendMu.RUnlock()
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
defer q.sendMu.RUnlock()
|
||
|
||
defer func() { _ = recover() }()
|
||
select {
|
||
case q.ch <- item:
|
||
if hooks.OnProduced != nil {
|
||
hooks.OnProduced()
|
||
}
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
// sendWithContext blocks until the item is sent or the context is canceled.
|
||
func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool {
|
||
q.sendMu.RLock()
|
||
q.mu.RLock()
|
||
closed := q.closed
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
if closed {
|
||
q.sendMu.RUnlock()
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
defer q.sendMu.RUnlock()
|
||
|
||
defer func() { _ = recover() }()
|
||
select {
|
||
case q.ch <- item:
|
||
if hooks.OnProduced != nil {
|
||
hooks.OnProduced()
|
||
}
|
||
return true
|
||
case <-ctx.Done():
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
}
|
||
|
||
// Produce enqueues an item and blocks until the item is placed into the buffer.
|
||
// If the queue is closed, the item is dropped and the method returns immediately.
|
||
func (q *Queue[T]) Produce(item T) {
|
||
_ = q.sendBlocking(item)
|
||
}
|
||
|
||
// ProduceWithContext enqueues an item, blocking until the item is accepted or
|
||
// the provided context is canceled. If the context is already canceled or the
|
||
// queue is closed, the item is dropped and OnDropped is invoked (if set).
|
||
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
|
||
// Если контекст уже отменён — не блокируемся и помечаем как dropped
|
||
select {
|
||
case <-ctx.Done():
|
||
q.mu.RLock()
|
||
if q.hooks.OnDropped != nil {
|
||
q.hooks.OnDropped(item)
|
||
}
|
||
q.mu.RUnlock()
|
||
return
|
||
default:
|
||
}
|
||
_ = q.sendWithContext(ctx, item)
|
||
}
|
||
|
||
// TryProduce attempts to enqueue an item without blocking. It retries until
|
||
// either the item is accepted or the timeout elapses. Returns true on success.
|
||
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
|
||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
defer cancel()
|
||
|
||
for {
|
||
if ok := q.sendNonBlocking(item); ok {
|
||
return true
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return false
|
||
default:
|
||
// короткая уступка планировщику
|
||
time.Sleep(time.Microsecond)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Shutdown closes the queue for new items and waits for workers to finish
|
||
// processing or until the provided context is canceled. Returns ctx.Err() on
|
||
// timeout/cancel.
|
||
func (q *Queue[T]) Shutdown(ctx context.Context) error {
|
||
q.Close()
|
||
done := make(chan struct{})
|
||
go func() {
|
||
defer close(done)
|
||
q.wg.Wait()
|
||
}()
|
||
select {
|
||
case <-done:
|
||
return nil
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
func (q *Queue[T]) Close() {
|
||
q.closeOnce.Do(func() {
|
||
// forbid new sends: exclusive lock supersedes send RLock
|
||
q.sendMu.Lock()
|
||
q.mu.Lock()
|
||
if !q.closed {
|
||
q.closed = true
|
||
close(q.ch)
|
||
}
|
||
// Остановим тикер, если включён
|
||
if q.rateTicker != nil {
|
||
q.rateTicker.Stop()
|
||
q.rateTicker = nil
|
||
}
|
||
q.mu.Unlock()
|
||
q.sendMu.Unlock()
|
||
})
|
||
}
|
||
|
||
// Wait blocks until all worker goroutines started by HandleParallel exit.
|
||
func (q *Queue[T]) Wait() { q.wg.Wait() }
|
||
|
||
// Len returns the number of buffered items currently in the queue.
|
||
func (q *Queue[T]) Len() int { return len(q.ch) }
|
||
|
||
// Cap returns the configured buffer capacity of the queue.
|
||
func (q *Queue[T]) Cap() int { return q.capacity }
|
||
|
||
// IsEmpty reports whether the queue buffer is empty.
|
||
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
|
||
|
||
// Closed reports whether the queue has been closed for sending.
|
||
func (q *Queue[T]) Closed() bool {
|
||
q.mu.RLock()
|
||
defer q.mu.RUnlock()
|
||
return q.closed
|
||
}
|
||
|
||
// String returns a human-readable summary of the queue state.
|
||
func (q *Queue[T]) String() string {
|
||
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
|
||
}
|
||
|
||
// processItem executes all registered handlers for the item, recovering from
|
||
// panics. On panic, it calls the error callback (if any) and forwards the
|
||
// item to the DLQ using a best-effort non-blocking send.
|
||
func (q *Queue[T]) processItem(item T) {
|
||
q.mu.RLock()
|
||
handlers := append([]func(T){}, q.handlers...)
|
||
errFn := q.errorFunc
|
||
dlq := q.dlq
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
for _, h := range handlers {
|
||
func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// сообщим об ошибке
|
||
if errFn != nil {
|
||
errFn(fmt.Errorf("handler panic: %v", r))
|
||
}
|
||
// отправим в DLQ неблокирующе и безопасно
|
||
if dlq != nil {
|
||
_ = dlq.sendNonBlocking(item)
|
||
}
|
||
}
|
||
}()
|
||
h(item)
|
||
if hooks.OnHandled != nil {
|
||
hooks.OnHandled()
|
||
}
|
||
}()
|
||
}
|
||
}
|