Files
queue/queue.go
Кобелев Андрей Андреевич bac4f05437
All checks were successful
CI / test (push) Successful in 5m21s
- Context fix without timer.
- Add godoc documentation.
2025-10-14 13:23:12 +05:00

369 lines
9.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package queue provides a simple, type-safe, concurrent FIFO queue
// with support for parallel handlers, graceful shutdown, optional
// Dead Letter Queue (DLQ), hooks, and rate limiting.
package queue
import (
"context"
"fmt"
"sync"
"time"
)
// QueueHooks defines optional callbacks that are invoked during queue
// operations. Hooks can be used for metrics, logging, and monitoring.
//
// All hooks are invoked best-effort and must be fast and panic-safe in user code.
//
// - OnProduced is called after an item is successfully enqueued.
// - OnHandled is called after a handler returns without panic.
// - OnDropped is called when an item is intentionally dropped due to
// a canceled context or a closed queue.
type QueueHooks[T any] struct {
OnProduced func()
OnHandled func()
OnDropped func(T)
}
// Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel
// handling, rate limiting and graceful shutdown without data races.
type Queue[T any] struct {
ch chan T
capacity int
mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker
sendMu sync.RWMutex // synchronizes sends to the channel and its closing
handlers []func(T)
errorFunc func(error)
dlq *Queue[T]
hooks QueueHooks[T]
closeOnce sync.Once
closed bool
wg sync.WaitGroup
rateTicker *time.Ticker
}
// NewQueue creates a new queue with the given buffer capacity.
// Capacity must be > 0.
func NewQueue[T any](capacity int) *Queue[T] {
if capacity <= 0 {
panic("queue capacity must be > 0")
}
return &Queue[T]{
ch: make(chan T, capacity),
capacity: capacity,
}
}
// Register adds a handler function that will be invoked for each item.
// Multiple handlers may be registered; each item will be passed to each
// registered handler in registration order. Handlers must be concurrency-safe.
func (q *Queue[T]) Register(handler func(T)) {
if handler == nil {
panic("handler cannot be nil")
}
q.mu.Lock()
q.handlers = append(q.handlers, handler)
q.mu.Unlock()
}
// OnError registers an error callback which is invoked when a handler panics.
// The error contains panic details. Set to nil to disable.
func (q *Queue[T]) OnError(fn func(error)) {
q.mu.Lock()
q.errorFunc = fn
q.mu.Unlock()
}
// SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic.
// Items are enqueued to the DLQ in a non-blocking best-effort manner.
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
q.mu.Lock()
q.dlq = dlq
q.mu.Unlock()
}
// SetHooks installs optional hooks for produced/handled/dropped events.
// Passing a zero-value struct clears previously set hooks.
func (q *Queue[T]) SetHooks(h QueueHooks[T]) {
q.mu.Lock()
q.hooks = h
q.mu.Unlock()
}
// SetRateLimit sets a per-queue processing rate limit in requests-per-second.
// If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any
// previous ticker and will be applied by active workers.
func (q *Queue[T]) SetRateLimit(rps int) {
q.mu.Lock()
defer q.mu.Unlock()
// Stop previous ticker if exists
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
if rps > 0 {
q.rateTicker = time.NewTicker(time.Second / time.Duration(rps))
}
}
// HandleParallel starts worker goroutines that consume items from the queue
// and invoke registered handlers. Workers exit when the channel is closed or
// when the provided context is canceled. If workers <= 0, a single worker
// is started.
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if workers <= 0 {
workers = 1
}
q.wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer q.wg.Done()
for {
select {
case item, ok := <-q.ch:
if !ok {
return
}
// copy ticker safely
q.mu.RLock()
t := q.rateTicker
q.mu.RUnlock()
if t != nil {
<-t.C
}
q.processItem(item)
case <-ctx.Done():
return
}
}
}()
}
}
// sendBlocking performs a blocking send into the channel (used by Produce).
// It holds a read lock across the send to prevent Close() from closing the
// channel concurrently.
func (q *Queue[T]) sendBlocking(item T) bool {
// Быстрая проверка закрытия под RLock
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }() // guard against external races/panics
q.ch <- item
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
}
// sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ).
func (q *Queue[T]) sendNonBlocking(item T) bool {
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }()
select {
case q.ch <- item:
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
default:
return false
}
}
// sendWithContext blocks until the item is sent or the context is canceled.
func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool {
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }()
select {
case q.ch <- item:
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
case <-ctx.Done():
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
}
// Produce enqueues an item and blocks until the item is placed into the buffer.
// If the queue is closed, the item is dropped and the method returns immediately.
func (q *Queue[T]) Produce(item T) {
_ = q.sendBlocking(item)
}
// ProduceWithContext enqueues an item, blocking until the item is accepted or
// the provided context is canceled. If the context is already canceled or the
// queue is closed, the item is dropped and OnDropped is invoked (if set).
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
// Если контекст уже отменён — не блокируемся и помечаем как dropped
select {
case <-ctx.Done():
q.mu.RLock()
if q.hooks.OnDropped != nil {
q.hooks.OnDropped(item)
}
q.mu.RUnlock()
return
default:
}
_ = q.sendWithContext(ctx, item)
}
// TryProduce attempts to enqueue an item without blocking. It retries until
// either the item is accepted or the timeout elapses. Returns true on success.
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if ok := q.sendNonBlocking(item); ok {
return true
}
select {
case <-ctx.Done():
return false
default:
// короткая уступка планировщику
time.Sleep(time.Microsecond)
}
}
}
// Shutdown closes the queue for new items and waits for workers to finish
// processing or until the provided context is canceled. Returns ctx.Err() on
// timeout/cancel.
func (q *Queue[T]) Shutdown(ctx context.Context) error {
q.Close()
done := make(chan struct{})
go func() {
defer close(done)
q.wg.Wait()
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (q *Queue[T]) Close() {
q.closeOnce.Do(func() {
// forbid new sends: exclusive lock supersedes send RLock
q.sendMu.Lock()
q.mu.Lock()
if !q.closed {
q.closed = true
close(q.ch)
}
// Остановим тикер, если включён
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
q.mu.Unlock()
q.sendMu.Unlock()
})
}
// Wait blocks until all worker goroutines started by HandleParallel exit.
func (q *Queue[T]) Wait() { q.wg.Wait() }
// Len returns the number of buffered items currently in the queue.
func (q *Queue[T]) Len() int { return len(q.ch) }
// Cap returns the configured buffer capacity of the queue.
func (q *Queue[T]) Cap() int { return q.capacity }
// IsEmpty reports whether the queue buffer is empty.
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
// Closed reports whether the queue has been closed for sending.
func (q *Queue[T]) Closed() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.closed
}
// String returns a human-readable summary of the queue state.
func (q *Queue[T]) String() string {
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
}
// processItem executes all registered handlers for the item, recovering from
// panics. On panic, it calls the error callback (if any) and forwards the
// item to the DLQ using a best-effort non-blocking send.
func (q *Queue[T]) processItem(item T) {
q.mu.RLock()
handlers := append([]func(T){}, q.handlers...)
errFn := q.errorFunc
dlq := q.dlq
hooks := q.hooks
q.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
// сообщим об ошибке
if errFn != nil {
errFn(fmt.Errorf("handler panic: %v", r))
}
// отправим в DLQ неблокирующе и безопасно
if dlq != nil {
_ = dlq.sendNonBlocking(item)
}
}
}()
h(item)
if hooks.OnHandled != nil {
hooks.OnHandled()
}
}()
}
}