294 lines
6.9 KiB
Go
294 lines
6.9 KiB
Go
package queue
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// QueueHooks — набор хуков для метрик и мониторинга.
|
||
type QueueHooks struct {
|
||
OnProduced func() // вызывается при успешном добавлении элемента
|
||
OnHandled func() // вызывается при успешной обработке
|
||
OnDropped func(interface{}) // вызывается при пропуске (cancel/timeout)
|
||
}
|
||
|
||
// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой
|
||
// и корректным завершением без data race (даже под -race).
|
||
type Queue[T any] struct {
|
||
ch chan T
|
||
capacity int
|
||
|
||
mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed
|
||
sendMu sync.RWMutex // синхронизирует ВСЕ записи в канал и его закрытие
|
||
handlers []func(T)
|
||
|
||
errorFunc func(error)
|
||
dlq *Queue[T]
|
||
hooks QueueHooks
|
||
|
||
closeOnce sync.Once
|
||
closed bool
|
||
|
||
wg sync.WaitGroup
|
||
rateLimit <-chan time.Time
|
||
}
|
||
|
||
// NewQueue создаёт новую очередь с заданной ёмкостью.
|
||
func NewQueue[T any](capacity int) *Queue[T] {
|
||
if capacity <= 0 {
|
||
panic("queue capacity must be > 0")
|
||
}
|
||
return &Queue[T]{
|
||
ch: make(chan T, capacity),
|
||
capacity: capacity,
|
||
}
|
||
}
|
||
|
||
// ---------- Конфигурация ----------
|
||
|
||
func (q *Queue[T]) Register(handler func(T)) {
|
||
if handler == nil {
|
||
panic("handler cannot be nil")
|
||
}
|
||
q.mu.Lock()
|
||
q.handlers = append(q.handlers, handler)
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
func (q *Queue[T]) OnError(fn func(error)) {
|
||
q.mu.Lock()
|
||
q.errorFunc = fn
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
|
||
q.mu.Lock()
|
||
q.dlq = dlq
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
func (q *Queue[T]) SetHooks(h QueueHooks) {
|
||
q.mu.Lock()
|
||
q.hooks = h
|
||
q.mu.Unlock()
|
||
}
|
||
|
||
func (q *Queue[T]) SetRateLimit(rps int) {
|
||
if rps > 0 {
|
||
q.rateLimit = time.Tick(time.Second / time.Duration(rps))
|
||
}
|
||
}
|
||
|
||
// ---------- Запуск обработчиков ----------
|
||
|
||
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
|
||
if workers <= 0 {
|
||
workers = 1
|
||
}
|
||
q.wg.Add(workers)
|
||
for i := 0; i < workers; i++ {
|
||
go func() {
|
||
defer q.wg.Done()
|
||
for {
|
||
select {
|
||
case item, ok := <-q.ch:
|
||
if !ok {
|
||
return
|
||
}
|
||
if q.rateLimit != nil {
|
||
<-q.rateLimit
|
||
}
|
||
q.processItem(item)
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// ---------- Отправка в канал (синхронизированная) ----------
|
||
|
||
// sendBlocking — блокирующая отправка (используется Produce).
|
||
// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch)
|
||
// одновременно с записью — это устраняет data race.
|
||
func (q *Queue[T]) sendBlocking(item T) bool {
|
||
// Быстрая проверка закрытия под RLock (необязательна, но экономит работу)
|
||
q.sendMu.RLock()
|
||
q.mu.RLock()
|
||
closed := q.closed
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
if closed {
|
||
q.sendMu.RUnlock()
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
defer q.sendMu.RUnlock()
|
||
|
||
defer func() { _ = recover() }() // на случай гонок в чужом коде
|
||
q.ch <- item
|
||
if hooks.OnProduced != nil {
|
||
hooks.OnProduced()
|
||
}
|
||
return true
|
||
}
|
||
|
||
// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ).
|
||
func (q *Queue[T]) sendNonBlocking(item T) bool {
|
||
q.sendMu.RLock()
|
||
q.mu.RLock()
|
||
closed := q.closed
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
if closed {
|
||
q.sendMu.RUnlock()
|
||
if hooks.OnDropped != nil {
|
||
hooks.OnDropped(item)
|
||
}
|
||
return false
|
||
}
|
||
defer q.sendMu.RUnlock()
|
||
|
||
defer func() { _ = recover() }()
|
||
select {
|
||
case q.ch <- item:
|
||
if hooks.OnProduced != nil {
|
||
hooks.OnProduced()
|
||
}
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
// ---------- Паблик-API продюсеров ----------
|
||
|
||
// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется.
|
||
func (q *Queue[T]) Produce(item T) {
|
||
_ = q.sendBlocking(item)
|
||
}
|
||
|
||
// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере).
|
||
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
|
||
select {
|
||
case <-ctx.Done():
|
||
q.mu.RLock()
|
||
if q.hooks.OnDropped != nil {
|
||
q.hooks.OnDropped(item)
|
||
}
|
||
q.mu.RUnlock()
|
||
return
|
||
default:
|
||
_ = q.sendNonBlocking(item)
|
||
}
|
||
}
|
||
|
||
// TryProduce — неблокирующая постановка с таймаутом.
|
||
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
|
||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
defer cancel()
|
||
t := time.NewTimer(timeout)
|
||
defer t.Stop()
|
||
|
||
for {
|
||
if ok := q.sendNonBlocking(item); ok {
|
||
return true
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return false
|
||
case <-t.C:
|
||
return false
|
||
default:
|
||
// короткая уступка планировщику
|
||
time.Sleep(time.Microsecond)
|
||
}
|
||
}
|
||
}
|
||
|
||
// ---------- Завершение ----------
|
||
|
||
func (q *Queue[T]) Shutdown(ctx context.Context) error {
|
||
q.Close()
|
||
done := make(chan struct{})
|
||
go func() {
|
||
defer close(done)
|
||
q.wg.Wait()
|
||
}()
|
||
select {
|
||
case <-done:
|
||
return nil
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
func (q *Queue[T]) Close() {
|
||
q.closeOnce.Do(func() {
|
||
// запретим новые записи: эксклюзивная блокировка перекроет send RLock
|
||
q.sendMu.Lock()
|
||
q.mu.Lock()
|
||
if !q.closed {
|
||
q.closed = true
|
||
close(q.ch)
|
||
}
|
||
q.mu.Unlock()
|
||
q.sendMu.Unlock()
|
||
})
|
||
}
|
||
|
||
func (q *Queue[T]) Wait() { q.wg.Wait() }
|
||
|
||
// ---------- Инфо-методы ----------
|
||
|
||
func (q *Queue[T]) Len() int { return len(q.ch) }
|
||
func (q *Queue[T]) Cap() int { return q.capacity }
|
||
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
|
||
|
||
func (q *Queue[T]) Closed() bool {
|
||
q.mu.RLock()
|
||
defer q.mu.RUnlock()
|
||
return q.closed
|
||
}
|
||
|
||
func (q *Queue[T]) String() string {
|
||
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
|
||
}
|
||
|
||
// ---------- Внутренняя обработка ----------
|
||
|
||
func (q *Queue[T]) processItem(item T) {
|
||
q.mu.RLock()
|
||
handlers := append([]func(T){}, q.handlers...)
|
||
errFn := q.errorFunc
|
||
dlq := q.dlq
|
||
hooks := q.hooks
|
||
q.mu.RUnlock()
|
||
|
||
for _, h := range handlers {
|
||
func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// сообщим об ошибке
|
||
if errFn != nil {
|
||
errFn(fmt.Errorf("handler panic: %v", r))
|
||
}
|
||
// отправим в DLQ неблокирующе и безопасно
|
||
if dlq != nil {
|
||
_ = dlq.sendNonBlocking(item)
|
||
}
|
||
}
|
||
}()
|
||
h(item)
|
||
if hooks.OnHandled != nil {
|
||
hooks.OnHandled()
|
||
}
|
||
}()
|
||
}
|
||
}
|