From 557fb40268a0f3df638b0e7b4dcb16d0d123ab75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D0=B1=D0=B5=D0=BB=D0=B5=D0=B2=20=D0=90=D0=BD?= =?UTF-8?q?=D0=B4=D1=80=D0=B5=D0=B9=20=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B5?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Tue, 14 Oct 2025 12:53:46 +0500 Subject: [PATCH] Change package path --- README.md | 4 ++++ bench_test.go | 2 +- examples/example_basic_test.go | 2 +- examples/example_dlq_test.go | 2 +- examples/example_parallel_test.go | 2 +- go.mod | 2 +- queue.go | 25 ++++++++++--------------- queue_test.go | 2 +- 8 files changed, 20 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index e91262c..9c95ac4 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,11 @@ # queue +[![Go Reference](https://pkg.go.dev/badge/git.belvedersky.ru/queue.svg)](https://pkg.go.dev/git.belvedersky.ru/queue) + + Simple and lightweight queue implementation in Go with optional Dead Letter Queue (DLQ) support. + Repository: https://git.belvedersky.ru/belvedersky/queue diff --git a/bench_test.go b/bench_test.go index 6a605bc..c3d7915 100644 --- a/bench_test.go +++ b/bench_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "git.belvedersky.ru/belvedersky/queue" + "git.belvedersky.ru/queue" ) type benchTask struct { diff --git a/examples/example_basic_test.go b/examples/example_basic_test.go index b52d3cb..e8f39a3 100644 --- a/examples/example_basic_test.go +++ b/examples/example_basic_test.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "git.belvedersky.ru/belvedersky/queue" + "git.belvedersky.ru/queue" ) // Example_basic демонстрирует базовое использование очереди. diff --git a/examples/example_dlq_test.go b/examples/example_dlq_test.go index 867edd5..a70b6f4 100644 --- a/examples/example_dlq_test.go +++ b/examples/example_dlq_test.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "git.belvedersky.ru/belvedersky/queue" + "git.belvedersky.ru/queue" ) // Example_DLQ показывает, как использовать Dead Letter Queue diff --git a/examples/example_parallel_test.go b/examples/example_parallel_test.go index 345d3b7..1ea2843 100644 --- a/examples/example_parallel_test.go +++ b/examples/example_parallel_test.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "git.belvedersky.ru/belvedersky/queue" + "git.belvedersky.ru/queue" ) // Example_parallel демонстрирует работу нескольких воркеров. diff --git a/go.mod b/go.mod index c5b67b4..fd1c13f 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module git.belvedersky.ru/belvedersky/queue +module git.belvedersky.ru/queue go 1.25.1 diff --git a/queue.go b/queue.go index cb876cb..bcbd214 100644 --- a/queue.go +++ b/queue.go @@ -9,19 +9,19 @@ import ( // QueueHooks — набор хуков для метрик и мониторинга. type QueueHooks struct { - OnProduced func() // вызывается при успешном добавлении элемента - OnHandled func() // вызывается при успешной обработке - OnDropped func(interface{}) // вызывается при пропуске (cancel/timeout) + OnProduced func() // вызывается при успешном добавлении элемента + OnHandled func() // вызывается при успешной обработке + OnDropped func(any) // вызывается при пропуске (cancel/timeout) } // Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой -// и корректным завершением без data race (даже под -race). +// и корректным завершением без data race type Queue[T any] struct { ch chan T capacity int mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed - sendMu sync.RWMutex // синхронизирует ВСЕ записи в канал и его закрытие + sendMu sync.RWMutex // синхронизирует записи в канал и его закрытие handlers []func(T) errorFunc func(error) @@ -47,7 +47,6 @@ func NewQueue[T any](capacity int) *Queue[T] { } // ---------- Конфигурация ---------- - func (q *Queue[T]) Register(handler func(T)) { if handler == nil { panic("handler cannot be nil") @@ -57,24 +56,28 @@ func (q *Queue[T]) Register(handler func(T)) { q.mu.Unlock() } +// ---------- При ошибке ---------- func (q *Queue[T]) OnError(fn func(error)) { q.mu.Lock() q.errorFunc = fn q.mu.Unlock() } +// ---------- Установка DLQ ---------- func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { q.mu.Lock() q.dlq = dlq q.mu.Unlock() } +// ---------- Установка Хуков ---------- func (q *Queue[T]) SetHooks(h QueueHooks) { q.mu.Lock() q.hooks = h q.mu.Unlock() } +// ---------- Установка RateLimit ---------- func (q *Queue[T]) SetRateLimit(rps int) { if rps > 0 { q.rateLimit = time.Tick(time.Second / time.Duration(rps)) @@ -82,7 +85,6 @@ func (q *Queue[T]) SetRateLimit(rps int) { } // ---------- Запуск обработчиков ---------- - func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { if workers <= 0 { workers = 1 @@ -110,12 +112,10 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { } // ---------- Отправка в канал (синхронизированная) ---------- - // sendBlocking — блокирующая отправка (используется Produce). // Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch) -// одновременно с записью — это устраняет data race. func (q *Queue[T]) sendBlocking(item T) bool { - // Быстрая проверка закрытия под RLock (необязательна, но экономит работу) + // Быстрая проверка закрытия под RLock q.sendMu.RLock() q.mu.RLock() closed := q.closed @@ -167,7 +167,6 @@ func (q *Queue[T]) sendNonBlocking(item T) bool { } // ---------- Паблик-API продюсеров ---------- - // Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется. func (q *Queue[T]) Produce(item T) { _ = q.sendBlocking(item) @@ -212,7 +211,6 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { } // ---------- Завершение ---------- - func (q *Queue[T]) Shutdown(ctx context.Context) error { q.Close() done := make(chan struct{}) @@ -245,7 +243,6 @@ func (q *Queue[T]) Close() { func (q *Queue[T]) Wait() { q.wg.Wait() } // ---------- Инфо-методы ---------- - func (q *Queue[T]) Len() int { return len(q.ch) } func (q *Queue[T]) Cap() int { return q.capacity } func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } @@ -261,7 +258,6 @@ func (q *Queue[T]) String() string { } // ---------- Внутренняя обработка ---------- - func (q *Queue[T]) processItem(item T) { q.mu.RLock() handlers := append([]func(T){}, q.handlers...) @@ -269,7 +265,6 @@ func (q *Queue[T]) processItem(item T) { dlq := q.dlq hooks := q.hooks q.mu.RUnlock() - for _, h := range handlers { func() { defer func() { diff --git a/queue_test.go b/queue_test.go index 9c6e74a..675247f 100644 --- a/queue_test.go +++ b/queue_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "git.belvedersky.ru/belvedersky/queue" + "git.belvedersky.ru/queue" ) type Task struct {