// Package queue provides a simple, type-safe, concurrent FIFO queue // with support for parallel handlers, graceful shutdown, optional // Dead Letter Queue (DLQ), hooks, and rate limiting. package queue import ( "context" "fmt" "sync" "time" ) // QueueHooks defines optional callbacks that are invoked during queue // operations. Hooks can be used for metrics, logging, and monitoring. // // All hooks are invoked best-effort and must be fast and panic-safe in user code. // // - OnProduced is called after an item is successfully enqueued. // - OnHandled is called after a handler returns without panic. // - OnDropped is called when an item is intentionally dropped due to // a canceled context or a closed queue. type QueueHooks[T any] struct { OnProduced func() OnHandled func() OnDropped func(T) } // Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel // handling, rate limiting and graceful shutdown without data races. type Queue[T any] struct { ch chan T capacity int mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker sendMu sync.RWMutex // synchronizes sends to the channel and its closing handlers []func(T) errorFunc func(error) dlq *Queue[T] hooks QueueHooks[T] closeOnce sync.Once closed bool wg sync.WaitGroup rateTicker *time.Ticker } // NewQueue creates a new queue with the given buffer capacity. // Capacity must be > 0. func NewQueue[T any](capacity int) *Queue[T] { if capacity <= 0 { panic("queue capacity must be > 0") } return &Queue[T]{ ch: make(chan T, capacity), capacity: capacity, } } // Register adds a handler function that will be invoked for each item. // Multiple handlers may be registered; each item will be passed to each // registered handler in registration order. Handlers must be concurrency-safe. func (q *Queue[T]) Register(handler func(T)) { if handler == nil { panic("handler cannot be nil") } q.mu.Lock() q.handlers = append(q.handlers, handler) q.mu.Unlock() } // OnError registers an error callback which is invoked when a handler panics. // The error contains panic details. Set to nil to disable. func (q *Queue[T]) OnError(fn func(error)) { q.mu.Lock() q.errorFunc = fn q.mu.Unlock() } // SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic. // Items are enqueued to the DLQ in a non-blocking best-effort manner. func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { q.mu.Lock() q.dlq = dlq q.mu.Unlock() } // SetHooks installs optional hooks for produced/handled/dropped events. // Passing a zero-value struct clears previously set hooks. func (q *Queue[T]) SetHooks(h QueueHooks[T]) { q.mu.Lock() q.hooks = h q.mu.Unlock() } // SetRateLimit sets a per-queue processing rate limit in requests-per-second. // If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any // previous ticker and will be applied by active workers. func (q *Queue[T]) SetRateLimit(rps int) { q.mu.Lock() defer q.mu.Unlock() // Stop previous ticker if exists if q.rateTicker != nil { q.rateTicker.Stop() q.rateTicker = nil } if rps > 0 { q.rateTicker = time.NewTicker(time.Second / time.Duration(rps)) } } // HandleParallel starts worker goroutines that consume items from the queue // and invoke registered handlers. Workers exit when the channel is closed or // when the provided context is canceled. If workers <= 0, a single worker // is started. func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { if workers <= 0 { workers = 1 } q.wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer q.wg.Done() for { select { case item, ok := <-q.ch: if !ok { return } // copy ticker safely q.mu.RLock() t := q.rateTicker q.mu.RUnlock() if t != nil { <-t.C } q.processItem(item) case <-ctx.Done(): return } } }() } } // sendBlocking performs a blocking send into the channel (used by Produce). // It holds a read lock across the send to prevent Close() from closing the // channel concurrently. func (q *Queue[T]) sendBlocking(item T) bool { // Быстрая проверка закрытия под RLock q.sendMu.RLock() q.mu.RLock() closed := q.closed hooks := q.hooks q.mu.RUnlock() if closed { q.sendMu.RUnlock() if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } defer q.sendMu.RUnlock() defer func() { _ = recover() }() // guard against external races/panics q.ch <- item if hooks.OnProduced != nil { hooks.OnProduced() } return true } // sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ). func (q *Queue[T]) sendNonBlocking(item T) bool { q.sendMu.RLock() q.mu.RLock() closed := q.closed hooks := q.hooks q.mu.RUnlock() if closed { q.sendMu.RUnlock() if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } defer q.sendMu.RUnlock() defer func() { _ = recover() }() select { case q.ch <- item: if hooks.OnProduced != nil { hooks.OnProduced() } return true default: return false } } // sendWithContext blocks until the item is sent or the context is canceled. func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool { q.sendMu.RLock() q.mu.RLock() closed := q.closed hooks := q.hooks q.mu.RUnlock() if closed { q.sendMu.RUnlock() if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } defer q.sendMu.RUnlock() defer func() { _ = recover() }() select { case q.ch <- item: if hooks.OnProduced != nil { hooks.OnProduced() } return true case <-ctx.Done(): if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } } // Produce enqueues an item and blocks until the item is placed into the buffer. // If the queue is closed, the item is dropped and the method returns immediately. func (q *Queue[T]) Produce(item T) { _ = q.sendBlocking(item) } // ProduceWithContext enqueues an item, blocking until the item is accepted or // the provided context is canceled. If the context is already canceled or the // queue is closed, the item is dropped and OnDropped is invoked (if set). func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { // Если контекст уже отменён — не блокируемся и помечаем как dropped select { case <-ctx.Done(): q.mu.RLock() if q.hooks.OnDropped != nil { q.hooks.OnDropped(item) } q.mu.RUnlock() return default: } _ = q.sendWithContext(ctx, item) } // TryProduce attempts to enqueue an item without blocking. It retries until // either the item is accepted or the timeout elapses. Returns true on success. func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { if ok := q.sendNonBlocking(item); ok { return true } select { case <-ctx.Done(): return false default: // короткая уступка планировщику time.Sleep(time.Microsecond) } } } // Shutdown closes the queue for new items and waits for workers to finish // processing or until the provided context is canceled. Returns ctx.Err() on // timeout/cancel. func (q *Queue[T]) Shutdown(ctx context.Context) error { q.Close() done := make(chan struct{}) go func() { defer close(done) q.wg.Wait() }() select { case <-done: return nil case <-ctx.Done(): return ctx.Err() } } func (q *Queue[T]) Close() { q.closeOnce.Do(func() { // forbid new sends: exclusive lock supersedes send RLock q.sendMu.Lock() q.mu.Lock() if !q.closed { q.closed = true close(q.ch) } // Остановим тикер, если включён if q.rateTicker != nil { q.rateTicker.Stop() q.rateTicker = nil } q.mu.Unlock() q.sendMu.Unlock() }) } // Wait blocks until all worker goroutines started by HandleParallel exit. func (q *Queue[T]) Wait() { q.wg.Wait() } // Len returns the number of buffered items currently in the queue. func (q *Queue[T]) Len() int { return len(q.ch) } // Cap returns the configured buffer capacity of the queue. func (q *Queue[T]) Cap() int { return q.capacity } // IsEmpty reports whether the queue buffer is empty. func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } // Closed reports whether the queue has been closed for sending. func (q *Queue[T]) Closed() bool { q.mu.RLock() defer q.mu.RUnlock() return q.closed } // String returns a human-readable summary of the queue state. func (q *Queue[T]) String() string { return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed()) } // processItem executes all registered handlers for the item, recovering from // panics. On panic, it calls the error callback (if any) and forwards the // item to the DLQ using a best-effort non-blocking send. func (q *Queue[T]) processItem(item T) { q.mu.RLock() handlers := append([]func(T){}, q.handlers...) errFn := q.errorFunc dlq := q.dlq hooks := q.hooks q.mu.RUnlock() for _, h := range handlers { func() { defer func() { if r := recover(); r != nil { // сообщим об ошибке if errFn != nil { errFn(fmt.Errorf("handler panic: %v", r)) } // отправим в DLQ неблокирующе и безопасно if dlq != nil { _ = dlq.sendNonBlocking(item) } } }() h(item) if hooks.OnHandled != nil { hooks.OnHandled() } }() } }