Files
queue/queue.go
Кобелев Андрей Андреевич e3c9643d05 Initial release of queue package
2025-10-10 22:50:00 +05:00

294 lines
6.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"fmt"
"sync"
"time"
)
// QueueHooks — набор хуков для метрик и мониторинга.
type QueueHooks struct {
OnProduced func() // вызывается при успешном добавлении элемента
OnHandled func() // вызывается при успешной обработке
OnDropped func(interface{}) // вызывается при пропуске (cancel/timeout)
}
// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой
// и корректным завершением без data race (даже под -race).
type Queue[T any] struct {
ch chan T
capacity int
mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed
sendMu sync.RWMutex // синхронизирует ВСЕ записи в канал и его закрытие
handlers []func(T)
errorFunc func(error)
dlq *Queue[T]
hooks QueueHooks
closeOnce sync.Once
closed bool
wg sync.WaitGroup
rateLimit <-chan time.Time
}
// NewQueue создаёт новую очередь с заданной ёмкостью.
func NewQueue[T any](capacity int) *Queue[T] {
if capacity <= 0 {
panic("queue capacity must be > 0")
}
return &Queue[T]{
ch: make(chan T, capacity),
capacity: capacity,
}
}
// ---------- Конфигурация ----------
func (q *Queue[T]) Register(handler func(T)) {
if handler == nil {
panic("handler cannot be nil")
}
q.mu.Lock()
q.handlers = append(q.handlers, handler)
q.mu.Unlock()
}
func (q *Queue[T]) OnError(fn func(error)) {
q.mu.Lock()
q.errorFunc = fn
q.mu.Unlock()
}
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
q.mu.Lock()
q.dlq = dlq
q.mu.Unlock()
}
func (q *Queue[T]) SetHooks(h QueueHooks) {
q.mu.Lock()
q.hooks = h
q.mu.Unlock()
}
func (q *Queue[T]) SetRateLimit(rps int) {
if rps > 0 {
q.rateLimit = time.Tick(time.Second / time.Duration(rps))
}
}
// ---------- Запуск обработчиков ----------
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if workers <= 0 {
workers = 1
}
q.wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer q.wg.Done()
for {
select {
case item, ok := <-q.ch:
if !ok {
return
}
if q.rateLimit != nil {
<-q.rateLimit
}
q.processItem(item)
case <-ctx.Done():
return
}
}
}()
}
}
// ---------- Отправка в канал (синхронизированная) ----------
// sendBlocking — блокирующая отправка (используется Produce).
// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch)
// одновременно с записью — это устраняет data race.
func (q *Queue[T]) sendBlocking(item T) bool {
// Быстрая проверка закрытия под RLock (необязательна, но экономит работу)
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }() // на случай гонок в чужом коде
q.ch <- item
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
}
// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ).
func (q *Queue[T]) sendNonBlocking(item T) bool {
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }()
select {
case q.ch <- item:
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
default:
return false
}
}
// ---------- Паблик-API продюсеров ----------
// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется.
func (q *Queue[T]) Produce(item T) {
_ = q.sendBlocking(item)
}
// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере).
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
select {
case <-ctx.Done():
q.mu.RLock()
if q.hooks.OnDropped != nil {
q.hooks.OnDropped(item)
}
q.mu.RUnlock()
return
default:
_ = q.sendNonBlocking(item)
}
}
// TryProduce — неблокирующая постановка с таймаутом.
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
t := time.NewTimer(timeout)
defer t.Stop()
for {
if ok := q.sendNonBlocking(item); ok {
return true
}
select {
case <-ctx.Done():
return false
case <-t.C:
return false
default:
// короткая уступка планировщику
time.Sleep(time.Microsecond)
}
}
}
// ---------- Завершение ----------
func (q *Queue[T]) Shutdown(ctx context.Context) error {
q.Close()
done := make(chan struct{})
go func() {
defer close(done)
q.wg.Wait()
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (q *Queue[T]) Close() {
q.closeOnce.Do(func() {
// запретим новые записи: эксклюзивная блокировка перекроет send RLock
q.sendMu.Lock()
q.mu.Lock()
if !q.closed {
q.closed = true
close(q.ch)
}
q.mu.Unlock()
q.sendMu.Unlock()
})
}
func (q *Queue[T]) Wait() { q.wg.Wait() }
// ---------- Инфо-методы ----------
func (q *Queue[T]) Len() int { return len(q.ch) }
func (q *Queue[T]) Cap() int { return q.capacity }
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
func (q *Queue[T]) Closed() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.closed
}
func (q *Queue[T]) String() string {
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
}
// ---------- Внутренняя обработка ----------
func (q *Queue[T]) processItem(item T) {
q.mu.RLock()
handlers := append([]func(T){}, q.handlers...)
errFn := q.errorFunc
dlq := q.dlq
hooks := q.hooks
q.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
// сообщим об ошибке
if errFn != nil {
errFn(fmt.Errorf("handler panic: %v", r))
}
// отправим в DLQ неблокирующе и безопасно
if dlq != nil {
_ = dlq.sendNonBlocking(item)
}
}
}()
h(item)
if hooks.OnHandled != nil {
hooks.OnHandled()
}
}()
}
}