package queue import ( "context" "fmt" "sync" "time" ) // QueueHooks — набор хуков для метрик и мониторинга. type QueueHooks struct { OnProduced func() // вызывается при успешном добавлении элемента OnHandled func() // вызывается при успешной обработке OnDropped func(any) // вызывается при пропуске (cancel/timeout) } // Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой // и корректным завершением без data race type Queue[T any] struct { ch chan T capacity int mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed sendMu sync.RWMutex // синхронизирует записи в канал и его закрытие handlers []func(T) errorFunc func(error) dlq *Queue[T] hooks QueueHooks closeOnce sync.Once closed bool wg sync.WaitGroup rateLimit <-chan time.Time } // NewQueue создаёт новую очередь с заданной ёмкостью. func NewQueue[T any](capacity int) *Queue[T] { if capacity <= 0 { panic("queue capacity must be > 0") } return &Queue[T]{ ch: make(chan T, capacity), capacity: capacity, } } // ---------- Конфигурация ---------- func (q *Queue[T]) Register(handler func(T)) { if handler == nil { panic("handler cannot be nil") } q.mu.Lock() q.handlers = append(q.handlers, handler) q.mu.Unlock() } // ---------- При ошибке ---------- func (q *Queue[T]) OnError(fn func(error)) { q.mu.Lock() q.errorFunc = fn q.mu.Unlock() } // ---------- Установка DLQ ---------- func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { q.mu.Lock() q.dlq = dlq q.mu.Unlock() } // ---------- Установка Хуков ---------- func (q *Queue[T]) SetHooks(h QueueHooks) { q.mu.Lock() q.hooks = h q.mu.Unlock() } // ---------- Установка RateLimit ---------- func (q *Queue[T]) SetRateLimit(rps int) { if rps > 0 { q.rateLimit = time.Tick(time.Second / time.Duration(rps)) } } // ---------- Запуск обработчиков ---------- func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { if workers <= 0 { workers = 1 } q.wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer q.wg.Done() for { select { case item, ok := <-q.ch: if !ok { return } if q.rateLimit != nil { <-q.rateLimit } q.processItem(item) case <-ctx.Done(): return } } }() } } // ---------- Отправка в канал (синхронизированная) ---------- // sendBlocking — блокирующая отправка (используется Produce). // Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch) func (q *Queue[T]) sendBlocking(item T) bool { // Быстрая проверка закрытия под RLock q.sendMu.RLock() q.mu.RLock() closed := q.closed hooks := q.hooks q.mu.RUnlock() if closed { q.sendMu.RUnlock() if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } defer q.sendMu.RUnlock() defer func() { _ = recover() }() // на случай гонок в чужом коде q.ch <- item if hooks.OnProduced != nil { hooks.OnProduced() } return true } // sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ). func (q *Queue[T]) sendNonBlocking(item T) bool { q.sendMu.RLock() q.mu.RLock() closed := q.closed hooks := q.hooks q.mu.RUnlock() if closed { q.sendMu.RUnlock() if hooks.OnDropped != nil { hooks.OnDropped(item) } return false } defer q.sendMu.RUnlock() defer func() { _ = recover() }() select { case q.ch <- item: if hooks.OnProduced != nil { hooks.OnProduced() } return true default: return false } } // ---------- Паблик-API продюсеров ---------- // Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется. func (q *Queue[T]) Produce(item T) { _ = q.sendBlocking(item) } // ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере). func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { select { case <-ctx.Done(): q.mu.RLock() if q.hooks.OnDropped != nil { q.hooks.OnDropped(item) } q.mu.RUnlock() return default: _ = q.sendNonBlocking(item) } } // TryProduce — неблокирующая постановка с таймаутом. func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() t := time.NewTimer(timeout) defer t.Stop() for { if ok := q.sendNonBlocking(item); ok { return true } select { case <-ctx.Done(): return false case <-t.C: return false default: // короткая уступка планировщику time.Sleep(time.Microsecond) } } } // ---------- Завершение ---------- func (q *Queue[T]) Shutdown(ctx context.Context) error { q.Close() done := make(chan struct{}) go func() { defer close(done) q.wg.Wait() }() select { case <-done: return nil case <-ctx.Done(): return ctx.Err() } } func (q *Queue[T]) Close() { q.closeOnce.Do(func() { // запретим новые записи: эксклюзивная блокировка перекроет send RLock q.sendMu.Lock() q.mu.Lock() if !q.closed { q.closed = true close(q.ch) } q.mu.Unlock() q.sendMu.Unlock() }) } func (q *Queue[T]) Wait() { q.wg.Wait() } // ---------- Инфо-методы ---------- func (q *Queue[T]) Len() int { return len(q.ch) } func (q *Queue[T]) Cap() int { return q.capacity } func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } func (q *Queue[T]) Closed() bool { q.mu.RLock() defer q.mu.RUnlock() return q.closed } func (q *Queue[T]) String() string { return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed()) } // ---------- Внутренняя обработка ---------- func (q *Queue[T]) processItem(item T) { q.mu.RLock() handlers := append([]func(T){}, q.handlers...) errFn := q.errorFunc dlq := q.dlq hooks := q.hooks q.mu.RUnlock() for _, h := range handlers { func() { defer func() { if r := recover(); r != nil { // сообщим об ошибке if errFn != nil { errFn(fmt.Errorf("handler panic: %v", r)) } // отправим в DLQ неблокирующе и безопасно if dlq != nil { _ = dlq.sendNonBlocking(item) } } }() h(item) if hooks.OnHandled != nil { hooks.OnHandled() } }() } }