- Context fix without timer.
All checks were successful
CI / test (push) Successful in 5m21s

- Add godoc documentation.
This commit is contained in:
Кобелев Андрей Андреевич
2025-10-14 13:23:12 +05:00
parent 557fb40268
commit bac4f05437
4 changed files with 246 additions and 83 deletions

33
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,33 @@
name: CI
on:
push:
branches:
- '**'
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
check-latest: true
- name: Go version
run: go version
- name: Vet
run: go vet ./...
- name: Test
run: go test -race ./...
- name: Bench (smoke)
run: go test -bench=. -run=^$ ./...

118
README.md
View File

@@ -2,62 +2,113 @@
[![Go Reference](https://pkg.go.dev/badge/git.belvedersky.ru/queue.svg)](https://pkg.go.dev/git.belvedersky.ru/queue) [![Go Reference](https://pkg.go.dev/badge/git.belvedersky.ru/queue.svg)](https://pkg.go.dev/git.belvedersky.ru/queue)
Simple, lightweight, concurrent FIFO queue for Go with optional Dead Letter Queue (DLQ), hooks, rate limiting, and graceful shutdown.
Simple and lightweight queue implementation in Go with optional Dead Letter Queue (DLQ) support. Repository: https://git.belvedersky.ru/belvedersky/queue
Repository:
https://git.belvedersky.ru/belvedersky/queue
## Installation ## Installation
go get git.belvedersky.ru/queue ```bash
go get git.belvedersky.ru/queue
```
## Usage ## Quick Start
Basic example: ```go
package main
package main import (
"context"
"fmt"
"time"
import ( "git.belvedersky.ru/queue"
"fmt" )
"git.belvedersky.ru/queue"
)
func main() { type Task struct{ ID int }
q := queue.NewQueue
q.Push(1) func main() {
q.Push(2) // Create queue with a buffer capacity
q := queue.NewQueue[Task](10)
for !q.Empty() { // Register handler(s). Handlers run in worker goroutines.
item := q.Pop() q.Register(func(t Task) {
fmt.Println(item) fmt.Printf("handled %d\n", t.ID)
} time.Sleep(50 * time.Millisecond) // simulate work
})
// Start workers
ctx := context.Background()
q.HandleParallel(ctx, 2)
// Produce tasks
for i := 0; i < 5; i++ {
q.Produce(Task{ID: i})
} }
## Dead Letter Queue Example // Graceful shutdown: stop intake and wait for handlers to finish
_ = q.Shutdown(context.Background())
}
```
See examples/example_dlq_test.go: ## Dead Letter Queue (DLQ)
mainQ := queue.NewQueue Send items that panic in handlers to a separate queue:
dlq := queue.NewQueue
mainQ.SetDLQ(dlq) ```go
type Task struct{ ID int }
mainQ := queue.NewQueue[Task](5)
dlq := queue.NewQueue[Task](5)
mainQ.SetDLQ(dlq)
mainQ.Register(func(t Task) {
if t.ID == 3 {
panic("boom") // goes to DLQ
}
})
dlq.Register(func(t Task) {
fmt.Printf("sent to DLQ: %d\n", t.ID)
})
ctx := context.Background()
mainQ.HandleParallel(ctx, 1)
dlq.HandleParallel(ctx, 1)
for i := 0; i < 5; i++ {
mainQ.Produce(Task{ID: i})
}
mainQ.Close(); mainQ.Wait()
dlq.Close(); dlq.Wait()
```
## Hooks and Rate Limiting
```go
q := queue.NewQueue[int](10)
q.SetHooks(queue.QueueHooks{
OnProduced: func() { fmt.Println("produced") },
OnHandled: func() { fmt.Println("handled") },
OnDropped: func(v any) { fmt.Println("dropped:", v) },
})
// Limit processing rate (RPS per worker)
q.SetRateLimit(100)
```
## Examples ## Examples
Full examples are available in: See runnable examples in the repository:
https://git.belvedersky.ru/belvedersky/queue/src/branch/main/examples - examples/example_basic_test.go
- examples/example_dlq_test.go
Includes: - examples/example_parallel_test.go
- examples/example_basic_test.go
- examples/example_dlq_test.go
- examples/example_parallel_test.go
Run with: `go test ./examples -run Example`.
## Benchmarks ## Benchmarks
```bash ```bash
goos: darwin goos: darwin
goarch: arm64 goarch: arm64
@@ -71,7 +122,6 @@ PASS
ok git.belvedersky.ru/belvedersky/queue 6.574s ok git.belvedersky.ru/belvedersky/queue 6.574s
``` ```
## License ## License
MIT MIT

166
queue.go
View File

@@ -1,3 +1,6 @@
// Package queue provides a simple, type-safe, concurrent FIFO queue
// with support for parallel handlers, graceful shutdown, optional
// Dead Letter Queue (DLQ), hooks, and rate limiting.
package queue package queue
import ( import (
@@ -7,35 +10,44 @@ import (
"time" "time"
) )
// QueueHooks — набор хуков для метрик и мониторинга. // QueueHooks defines optional callbacks that are invoked during queue
type QueueHooks struct { // operations. Hooks can be used for metrics, logging, and monitoring.
OnProduced func() // вызывается при успешном добавлении элемента //
OnHandled func() // вызывается при успешной обработке // All hooks are invoked best-effort and must be fast and panic-safe in user code.
OnDropped func(any) // вызывается при пропуске (cancel/timeout) //
// - OnProduced is called after an item is successfully enqueued.
// - OnHandled is called after a handler returns without panic.
// - OnDropped is called when an item is intentionally dropped due to
// a canceled context or a closed queue.
type QueueHooks[T any] struct {
OnProduced func()
OnHandled func()
OnDropped func(T)
} }
// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой // Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel
// и корректным завершением без data race // handling, rate limiting and graceful shutdown without data races.
type Queue[T any] struct { type Queue[T any] struct {
ch chan T ch chan T
capacity int capacity int
mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker
sendMu sync.RWMutex // синхронизирует записи в канал и его закрытие sendMu sync.RWMutex // synchronizes sends to the channel and its closing
handlers []func(T) handlers []func(T)
errorFunc func(error) errorFunc func(error)
dlq *Queue[T] dlq *Queue[T]
hooks QueueHooks hooks QueueHooks[T]
closeOnce sync.Once closeOnce sync.Once
closed bool closed bool
wg sync.WaitGroup wg sync.WaitGroup
rateLimit <-chan time.Time rateTicker *time.Ticker
} }
// NewQueue создаёт новую очередь с заданной ёмкостью. // NewQueue creates a new queue with the given buffer capacity.
// Capacity must be > 0.
func NewQueue[T any](capacity int) *Queue[T] { func NewQueue[T any](capacity int) *Queue[T] {
if capacity <= 0 { if capacity <= 0 {
panic("queue capacity must be > 0") panic("queue capacity must be > 0")
@@ -46,7 +58,9 @@ func NewQueue[T any](capacity int) *Queue[T] {
} }
} }
// ---------- Конфигурация ---------- // Register adds a handler function that will be invoked for each item.
// Multiple handlers may be registered; each item will be passed to each
// registered handler in registration order. Handlers must be concurrency-safe.
func (q *Queue[T]) Register(handler func(T)) { func (q *Queue[T]) Register(handler func(T)) {
if handler == nil { if handler == nil {
panic("handler cannot be nil") panic("handler cannot be nil")
@@ -56,35 +70,50 @@ func (q *Queue[T]) Register(handler func(T)) {
q.mu.Unlock() q.mu.Unlock()
} }
// ---------- При ошибке ---------- // OnError registers an error callback which is invoked when a handler panics.
// The error contains panic details. Set to nil to disable.
func (q *Queue[T]) OnError(fn func(error)) { func (q *Queue[T]) OnError(fn func(error)) {
q.mu.Lock() q.mu.Lock()
q.errorFunc = fn q.errorFunc = fn
q.mu.Unlock() q.mu.Unlock()
} }
// ---------- Установка DLQ ---------- // SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic.
// Items are enqueued to the DLQ in a non-blocking best-effort manner.
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
q.mu.Lock() q.mu.Lock()
q.dlq = dlq q.dlq = dlq
q.mu.Unlock() q.mu.Unlock()
} }
// ---------- Установка Хуков ---------- // SetHooks installs optional hooks for produced/handled/dropped events.
func (q *Queue[T]) SetHooks(h QueueHooks) { // Passing a zero-value struct clears previously set hooks.
func (q *Queue[T]) SetHooks(h QueueHooks[T]) {
q.mu.Lock() q.mu.Lock()
q.hooks = h q.hooks = h
q.mu.Unlock() q.mu.Unlock()
} }
// ---------- Установка RateLimit ---------- // SetRateLimit sets a per-queue processing rate limit in requests-per-second.
// If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any
// previous ticker and will be applied by active workers.
func (q *Queue[T]) SetRateLimit(rps int) { func (q *Queue[T]) SetRateLimit(rps int) {
q.mu.Lock()
defer q.mu.Unlock()
// Stop previous ticker if exists
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
if rps > 0 { if rps > 0 {
q.rateLimit = time.Tick(time.Second / time.Duration(rps)) q.rateTicker = time.NewTicker(time.Second / time.Duration(rps))
} }
} }
// ---------- Запуск обработчиков ---------- // HandleParallel starts worker goroutines that consume items from the queue
// and invoke registered handlers. Workers exit when the channel is closed or
// when the provided context is canceled. If workers <= 0, a single worker
// is started.
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if workers <= 0 { if workers <= 0 {
workers = 1 workers = 1
@@ -99,8 +128,12 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if !ok { if !ok {
return return
} }
if q.rateLimit != nil { // copy ticker safely
<-q.rateLimit q.mu.RLock()
t := q.rateTicker
q.mu.RUnlock()
if t != nil {
<-t.C
} }
q.processItem(item) q.processItem(item)
case <-ctx.Done(): case <-ctx.Done():
@@ -111,9 +144,9 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
} }
} }
// ---------- Отправка в канал (синхронизированная) ---------- // sendBlocking performs a blocking send into the channel (used by Produce).
// sendBlocking — блокирующая отправка (используется Produce). // It holds a read lock across the send to prevent Close() from closing the
// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch) // channel concurrently.
func (q *Queue[T]) sendBlocking(item T) bool { func (q *Queue[T]) sendBlocking(item T) bool {
// Быстрая проверка закрытия под RLock // Быстрая проверка закрытия под RLock
q.sendMu.RLock() q.sendMu.RLock()
@@ -130,7 +163,7 @@ func (q *Queue[T]) sendBlocking(item T) bool {
} }
defer q.sendMu.RUnlock() defer q.sendMu.RUnlock()
defer func() { _ = recover() }() // на случай гонок в чужом коде defer func() { _ = recover() }() // guard against external races/panics
q.ch <- item q.ch <- item
if hooks.OnProduced != nil { if hooks.OnProduced != nil {
hooks.OnProduced() hooks.OnProduced()
@@ -138,7 +171,7 @@ func (q *Queue[T]) sendBlocking(item T) bool {
return true return true
} }
// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ). // sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ).
func (q *Queue[T]) sendNonBlocking(item T) bool { func (q *Queue[T]) sendNonBlocking(item T) bool {
q.sendMu.RLock() q.sendMu.RLock()
q.mu.RLock() q.mu.RLock()
@@ -166,14 +199,48 @@ func (q *Queue[T]) sendNonBlocking(item T) bool {
} }
} }
// ---------- Паблик-API продюсеров ---------- // sendWithContext blocks until the item is sent or the context is canceled.
// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется. func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool {
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }()
select {
case q.ch <- item:
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
case <-ctx.Done():
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
}
// Produce enqueues an item and blocks until the item is placed into the buffer.
// If the queue is closed, the item is dropped and the method returns immediately.
func (q *Queue[T]) Produce(item T) { func (q *Queue[T]) Produce(item T) {
_ = q.sendBlocking(item) _ = q.sendBlocking(item)
} }
// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере). // ProduceWithContext enqueues an item, blocking until the item is accepted or
// the provided context is canceled. If the context is already canceled or the
// queue is closed, the item is dropped and OnDropped is invoked (if set).
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
// Если контекст уже отменён — не блокируемся и помечаем как dropped
select { select {
case <-ctx.Done(): case <-ctx.Done():
q.mu.RLock() q.mu.RLock()
@@ -183,16 +250,15 @@ func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
q.mu.RUnlock() q.mu.RUnlock()
return return
default: default:
_ = q.sendNonBlocking(item)
} }
_ = q.sendWithContext(ctx, item)
} }
// TryProduce — неблокирующая постановка с таймаутом. // TryProduce attempts to enqueue an item without blocking. It retries until
// either the item is accepted or the timeout elapses. Returns true on success.
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
t := time.NewTimer(timeout)
defer t.Stop()
for { for {
if ok := q.sendNonBlocking(item); ok { if ok := q.sendNonBlocking(item); ok {
@@ -201,8 +267,6 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false return false
case <-t.C:
return false
default: default:
// короткая уступка планировщику // короткая уступка планировщику
time.Sleep(time.Microsecond) time.Sleep(time.Microsecond)
@@ -210,7 +274,9 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
} }
} }
// ---------- Завершение ---------- // Shutdown closes the queue for new items and waits for workers to finish
// processing or until the provided context is canceled. Returns ctx.Err() on
// timeout/cancel.
func (q *Queue[T]) Shutdown(ctx context.Context) error { func (q *Queue[T]) Shutdown(ctx context.Context) error {
q.Close() q.Close()
done := make(chan struct{}) done := make(chan struct{})
@@ -228,36 +294,50 @@ func (q *Queue[T]) Shutdown(ctx context.Context) error {
func (q *Queue[T]) Close() { func (q *Queue[T]) Close() {
q.closeOnce.Do(func() { q.closeOnce.Do(func() {
// запретим новые записи: эксклюзивная блокировка перекроет send RLock // forbid new sends: exclusive lock supersedes send RLock
q.sendMu.Lock() q.sendMu.Lock()
q.mu.Lock() q.mu.Lock()
if !q.closed { if !q.closed {
q.closed = true q.closed = true
close(q.ch) close(q.ch)
} }
// Остановим тикер, если включён
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
q.mu.Unlock() q.mu.Unlock()
q.sendMu.Unlock() q.sendMu.Unlock()
}) })
} }
// Wait blocks until all worker goroutines started by HandleParallel exit.
func (q *Queue[T]) Wait() { q.wg.Wait() } func (q *Queue[T]) Wait() { q.wg.Wait() }
// ---------- Инфо-методы ---------- // Len returns the number of buffered items currently in the queue.
func (q *Queue[T]) Len() int { return len(q.ch) } func (q *Queue[T]) Len() int { return len(q.ch) }
func (q *Queue[T]) Cap() int { return q.capacity }
// Cap returns the configured buffer capacity of the queue.
func (q *Queue[T]) Cap() int { return q.capacity }
// IsEmpty reports whether the queue buffer is empty.
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
// Closed reports whether the queue has been closed for sending.
func (q *Queue[T]) Closed() bool { func (q *Queue[T]) Closed() bool {
q.mu.RLock() q.mu.RLock()
defer q.mu.RUnlock() defer q.mu.RUnlock()
return q.closed return q.closed
} }
// String returns a human-readable summary of the queue state.
func (q *Queue[T]) String() string { func (q *Queue[T]) String() string {
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed()) return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
} }
// ---------- Внутренняя обработка ---------- // processItem executes all registered handlers for the item, recovering from
// panics. On panic, it calls the error callback (if any) and forwards the
// item to the DLQ using a best-effort non-blocking send.
func (q *Queue[T]) processItem(item T) { func (q *Queue[T]) processItem(item T) {
q.mu.RLock() q.mu.RLock()
handlers := append([]func(T){}, q.handlers...) handlers := append([]func(T){}, q.handlers...)

View File

@@ -43,7 +43,7 @@ func TestBasicQueue(t *testing.T) {
q.HandleParallel(ctx, 1) q.HandleParallel(ctx, 1)
for i := 0; i < 10; i++ { for i := range 10 {
q.Produce(Task{ID: i}) q.Produce(Task{ID: i})
} }
@@ -73,7 +73,7 @@ func TestParallelProcessing(t *testing.T) {
q.HandleParallel(ctx, 4) q.HandleParallel(ctx, 4)
for i := 0; i < 20; i++ { for i := range 20 {
q.Produce(Task{ID: i}) q.Produce(Task{ID: i})
} }
@@ -101,7 +101,7 @@ func TestGracefulShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
q.HandleParallel(ctx, 2) q.HandleParallel(ctx, 2)
for i := 0; i < 10; i++ { for i := range 10 {
q.Produce(Task{ID: i}) q.Produce(Task{ID: i})
} }
@@ -152,7 +152,7 @@ func TestDLQ(t *testing.T) {
dlq.HandleParallel(ctx, 1) dlq.HandleParallel(ctx, 1)
// Отправляем 10 задач // Отправляем 10 задач
for i := 0; i < 10; i++ { for i := range 10 {
mainQ.Produce(Task{ID: i}) mainQ.Produce(Task{ID: i})
} }
@@ -233,10 +233,10 @@ func TestHooks(t *testing.T) {
q := queue.NewQueue[Task](10) q := queue.NewQueue[Task](10)
var produced, handled, dropped int32 var produced, handled, dropped int32
q.SetHooks(queue.QueueHooks{ q.SetHooks(queue.QueueHooks[Task]{
OnProduced: func() { atomic.AddInt32(&produced, 1) }, OnProduced: func() { atomic.AddInt32(&produced, 1) },
OnHandled: func() { atomic.AddInt32(&handled, 1) }, OnHandled: func() { atomic.AddInt32(&handled, 1) },
OnDropped: func(_ interface{}) { atomic.AddInt32(&dropped, 1) }, OnDropped: func(_ Task) { atomic.AddInt32(&dropped, 1) },
}) })
q.Register(func(t Task) {}) q.Register(func(t Task) {})