commit e3c9643d0591406847ad5312a4241afb31a5a31e Author: Кобелев Андрей Андреевич Date: Fri Oct 10 22:50:00 2025 +0500 Initial release of queue package diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7033c77 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# macOS system files +.DS_Store +.AppleDouble +.LSOverride +Icon? +._* + +# Go build artifacts +*.exe +*.exe~ +*.dll +*.so +*.dylib +*.test +*.out + +# Go workspace cache and temporary files +bin/ +build/ +dist/ +*.log +*.tmp + +# Go module and sum caches +vendor/ +go.work +go.work.sum + +# IDE/editor stuff +.vscode/ +.idea/ +*.swp +*.swo + +# Gitea/Git internal junk +*.orig +*.bak + +# Coverage and benchmark outputs +coverage.out +bench.txt + +# System hidden files +ehthumbs.db +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..e91262c --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# queue + +Simple and lightweight queue implementation in Go with optional Dead Letter Queue (DLQ) support. + +Repository: + https://git.belvedersky.ru/belvedersky/queue + +## Installation + + go get git.belvedersky.ru/queue + +## Usage + +Basic example: + + package main + + import ( + "fmt" + "git.belvedersky.ru/queue" + ) + + func main() { + q := queue.NewQueue + + q.Push(1) + q.Push(2) + + for !q.Empty() { + item := q.Pop() + fmt.Println(item) + } + } + +## Dead Letter Queue Example + +See examples/example_dlq_test.go: + + mainQ := queue.NewQueue + dlq := queue.NewQueue + + mainQ.SetDLQ(dlq) + + +## Examples + +Full examples are available in: + https://git.belvedersky.ru/belvedersky/queue/src/branch/main/examples + +Includes: + - examples/example_basic_test.go + - examples/example_dlq_test.go + - examples/example_parallel_test.go + + +## Benchmarks +```bash +goos: darwin +goarch: arm64 +pkg: git.belvedersky.ru/belvedersky/queue +cpu: Apple M1 Pro +BenchmarkProduceConsume/workers_1-10 9488 118244 ns/op 8 B/op 1 allocs/op +BenchmarkProduceConsume/workers_4-10 40800 29371 ns/op 8 B/op 1 allocs/op +BenchmarkProduceConsume/workers_16-10 163110 7340 ns/op 8 B/op 1 allocs/op +BenchmarkTryProduce-10 1610434 774.8 ns/op 520 B/op 7 allocs/op +PASS +ok git.belvedersky.ru/belvedersky/queue 6.574s +``` + + +## License + +MIT diff --git a/bench_test.go b/bench_test.go new file mode 100644 index 0000000..6a605bc --- /dev/null +++ b/bench_test.go @@ -0,0 +1,67 @@ +package queue_test + +import ( + "context" + "sync" + "testing" + "time" + + "git.belvedersky.ru/belvedersky/queue" +) + +type benchTask struct { + ID int +} + +// BenchmarkProduceConsume измеряет производительность очереди +// при различном количестве воркеров. +func BenchmarkProduceConsume(b *testing.B) { + benchmarks := []struct { + name string + workers int + }{ + {"workers_1", 1}, + {"workers_4", 4}, + {"workers_16", 16}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q := queue.NewQueue[benchTask](10) + var wg sync.WaitGroup + wg.Add(b.N) + + q.Register(func(t benchTask) { + time.Sleep(100 * time.Microsecond) // имитация нагрузки + wg.Done() + }) + + q.HandleParallel(ctx, bm.workers) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Produce(benchTask{ID: i}) + } + + wg.Wait() + q.Shutdown(context.Background()) + }) + } +} + +// BenchmarkTryProduce тестирует неблокирующую постановку задач. +func BenchmarkTryProduce(b *testing.B) { + ctx := context.Background() + q := queue.NewQueue[benchTask](10) + q.HandleParallel(ctx, 4) + defer q.Close() + + task := benchTask{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.TryProduce(task, 5*time.Millisecond) + } +} diff --git a/examples/example_basic_test.go b/examples/example_basic_test.go new file mode 100644 index 0000000..b52d3cb --- /dev/null +++ b/examples/example_basic_test.go @@ -0,0 +1,35 @@ +package queue_test + +import ( + "context" + "fmt" + "time" + + "git.belvedersky.ru/belvedersky/queue" +) + +// Example_basic демонстрирует базовое использование очереди. +func Example_basic() { + type Task struct{ ID int } + + q := queue.NewQueue[Task](5) + q.Register(func(t Task) { + fmt.Printf("Handled task %d\n", t.ID) + time.Sleep(50 * time.Millisecond) + }) + + ctx := context.Background() + q.HandleParallel(ctx, 1) + + for i := range 5 { + q.Produce(Task{ID: i}) + } + + q.Shutdown(context.Background()) + // Output: + // Handled task 0 + // Handled task 1 + // Handled task 2 + // Handled task 3 + // Handled task 4 +} diff --git a/examples/example_dlq_test.go b/examples/example_dlq_test.go new file mode 100644 index 0000000..867edd5 --- /dev/null +++ b/examples/example_dlq_test.go @@ -0,0 +1,60 @@ +package queue_test + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "git.belvedersky.ru/belvedersky/queue" +) + +// Example_DLQ показывает, как использовать Dead Letter Queue +// для перехвата задач, вызвавших панику. +func Example_dlq() { + type Task struct{ ID int } + + mainQ := queue.NewQueue[Task](5) + dlq := queue.NewQueue[Task](5) + mainQ.SetDLQ(dlq) + + var handled, failed int32 + + mainQ.Register(func(t Task) { + if t.ID == 3 { + panic("boom") + } + fmt.Printf("✅ handled %d\n", t.ID) + atomic.AddInt32(&handled, 1) + }) + + dlq.Register(func(t Task) { + fmt.Printf("🚨 sent to DLQ: %d\n", t.ID) + atomic.AddInt32(&failed, 1) + }) + + ctx := context.Background() + mainQ.HandleParallel(ctx, 1) + dlq.HandleParallel(ctx, 1) + + for i := range 5 { + mainQ.Produce(Task{ID: i}) + } + + mainQ.Close() + mainQ.Wait() + dlq.Close() + dlq.Wait() + + time.Sleep(50 * time.Millisecond) // на случай асинхронного вывода + + fmt.Printf("Total handled: %d, failed: %d\n", handled, failed) + + // Unordered output: + // ✅ handled 0 + // ✅ handled 1 + // ✅ handled 2 + // ✅ handled 4 + // 🚨 sent to DLQ: 3 + // Total handled: 4, failed: 1 +} diff --git a/examples/example_parallel_test.go b/examples/example_parallel_test.go new file mode 100644 index 0000000..345d3b7 --- /dev/null +++ b/examples/example_parallel_test.go @@ -0,0 +1,37 @@ +package queue_test + +import ( + "context" + "fmt" + "sync" + "time" + + "git.belvedersky.ru/belvedersky/queue" +) + +// Example_parallel демонстрирует работу нескольких воркеров. +func Example_parallel() { + type Task struct{ ID int } + + q := queue.NewQueue[Task](5) + var mu sync.Mutex + q.Register(func(t Task) { + mu.Lock() + fmt.Printf("Worker processed: %d\n", t.ID) + mu.Unlock() + time.Sleep(20 * time.Millisecond) + }) + + ctx := context.Background() + q.HandleParallel(ctx, 4) + + for i := range 3 { + q.Produce(Task{ID: i}) + } + + q.Shutdown(context.Background()) + // Unordered output: + // Worker processed: 0 + // Worker processed: 1 + // Worker processed: 2 +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c5b67b4 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.belvedersky.ru/belvedersky/queue + +go 1.25.1 diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..cb876cb --- /dev/null +++ b/queue.go @@ -0,0 +1,293 @@ +package queue + +import ( + "context" + "fmt" + "sync" + "time" +) + +// QueueHooks — набор хуков для метрик и мониторинга. +type QueueHooks struct { + OnProduced func() // вызывается при успешном добавлении элемента + OnHandled func() // вызывается при успешной обработке + OnDropped func(interface{}) // вызывается при пропуске (cancel/timeout) +} + +// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой +// и корректным завершением без data race (даже под -race). +type Queue[T any] struct { + ch chan T + capacity int + + mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed + sendMu sync.RWMutex // синхронизирует ВСЕ записи в канал и его закрытие + handlers []func(T) + + errorFunc func(error) + dlq *Queue[T] + hooks QueueHooks + + closeOnce sync.Once + closed bool + + wg sync.WaitGroup + rateLimit <-chan time.Time +} + +// NewQueue создаёт новую очередь с заданной ёмкостью. +func NewQueue[T any](capacity int) *Queue[T] { + if capacity <= 0 { + panic("queue capacity must be > 0") + } + return &Queue[T]{ + ch: make(chan T, capacity), + capacity: capacity, + } +} + +// ---------- Конфигурация ---------- + +func (q *Queue[T]) Register(handler func(T)) { + if handler == nil { + panic("handler cannot be nil") + } + q.mu.Lock() + q.handlers = append(q.handlers, handler) + q.mu.Unlock() +} + +func (q *Queue[T]) OnError(fn func(error)) { + q.mu.Lock() + q.errorFunc = fn + q.mu.Unlock() +} + +func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { + q.mu.Lock() + q.dlq = dlq + q.mu.Unlock() +} + +func (q *Queue[T]) SetHooks(h QueueHooks) { + q.mu.Lock() + q.hooks = h + q.mu.Unlock() +} + +func (q *Queue[T]) SetRateLimit(rps int) { + if rps > 0 { + q.rateLimit = time.Tick(time.Second / time.Duration(rps)) + } +} + +// ---------- Запуск обработчиков ---------- + +func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { + if workers <= 0 { + workers = 1 + } + q.wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer q.wg.Done() + for { + select { + case item, ok := <-q.ch: + if !ok { + return + } + if q.rateLimit != nil { + <-q.rateLimit + } + q.processItem(item) + case <-ctx.Done(): + return + } + } + }() + } +} + +// ---------- Отправка в канал (синхронизированная) ---------- + +// sendBlocking — блокирующая отправка (используется Produce). +// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch) +// одновременно с записью — это устраняет data race. +func (q *Queue[T]) sendBlocking(item T) bool { + // Быстрая проверка закрытия под RLock (необязательна, но экономит работу) + q.sendMu.RLock() + q.mu.RLock() + closed := q.closed + hooks := q.hooks + q.mu.RUnlock() + if closed { + q.sendMu.RUnlock() + if hooks.OnDropped != nil { + hooks.OnDropped(item) + } + return false + } + defer q.sendMu.RUnlock() + + defer func() { _ = recover() }() // на случай гонок в чужом коде + q.ch <- item + if hooks.OnProduced != nil { + hooks.OnProduced() + } + return true +} + +// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ). +func (q *Queue[T]) sendNonBlocking(item T) bool { + q.sendMu.RLock() + q.mu.RLock() + closed := q.closed + hooks := q.hooks + q.mu.RUnlock() + if closed { + q.sendMu.RUnlock() + if hooks.OnDropped != nil { + hooks.OnDropped(item) + } + return false + } + defer q.sendMu.RUnlock() + + defer func() { _ = recover() }() + select { + case q.ch <- item: + if hooks.OnProduced != nil { + hooks.OnProduced() + } + return true + default: + return false + } +} + +// ---------- Паблик-API продюсеров ---------- + +// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется. +func (q *Queue[T]) Produce(item T) { + _ = q.sendBlocking(item) +} + +// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере). +func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { + select { + case <-ctx.Done(): + q.mu.RLock() + if q.hooks.OnDropped != nil { + q.hooks.OnDropped(item) + } + q.mu.RUnlock() + return + default: + _ = q.sendNonBlocking(item) + } +} + +// TryProduce — неблокирующая постановка с таймаутом. +func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + t := time.NewTimer(timeout) + defer t.Stop() + + for { + if ok := q.sendNonBlocking(item); ok { + return true + } + select { + case <-ctx.Done(): + return false + case <-t.C: + return false + default: + // короткая уступка планировщику + time.Sleep(time.Microsecond) + } + } +} + +// ---------- Завершение ---------- + +func (q *Queue[T]) Shutdown(ctx context.Context) error { + q.Close() + done := make(chan struct{}) + go func() { + defer close(done) + q.wg.Wait() + }() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (q *Queue[T]) Close() { + q.closeOnce.Do(func() { + // запретим новые записи: эксклюзивная блокировка перекроет send RLock + q.sendMu.Lock() + q.mu.Lock() + if !q.closed { + q.closed = true + close(q.ch) + } + q.mu.Unlock() + q.sendMu.Unlock() + }) +} + +func (q *Queue[T]) Wait() { q.wg.Wait() } + +// ---------- Инфо-методы ---------- + +func (q *Queue[T]) Len() int { return len(q.ch) } +func (q *Queue[T]) Cap() int { return q.capacity } +func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } + +func (q *Queue[T]) Closed() bool { + q.mu.RLock() + defer q.mu.RUnlock() + return q.closed +} + +func (q *Queue[T]) String() string { + return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed()) +} + +// ---------- Внутренняя обработка ---------- + +func (q *Queue[T]) processItem(item T) { + q.mu.RLock() + handlers := append([]func(T){}, q.handlers...) + errFn := q.errorFunc + dlq := q.dlq + hooks := q.hooks + q.mu.RUnlock() + + for _, h := range handlers { + func() { + defer func() { + if r := recover(); r != nil { + // сообщим об ошибке + if errFn != nil { + errFn(fmt.Errorf("handler panic: %v", r)) + } + // отправим в DLQ неблокирующе и безопасно + if dlq != nil { + _ = dlq.sendNonBlocking(item) + } + } + }() + h(item) + if hooks.OnHandled != nil { + hooks.OnHandled() + } + }() + } +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..9c6e74a --- /dev/null +++ b/queue_test.go @@ -0,0 +1,283 @@ +package queue_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "git.belvedersky.ru/belvedersky/queue" +) + +type Task struct { + ID int +} + +// waitForCondition ждёт, пока функция вернёт true или истечёт таймаут. +func waitForCondition(t *testing.T, cond func() bool, timeout time.Duration, step time.Duration) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(step) + } + t.Fatalf("condition not met within %v", timeout) +} + +// ----------------------------------------------------------------------------- +// 🧩 Базовые тесты +// ----------------------------------------------------------------------------- + +func TestBasicQueue(t *testing.T) { + q := queue.NewQueue[Task](10) + + var count int32 + q.Register(func(t Task) { + atomic.AddInt32(&count, 1) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q.HandleParallel(ctx, 1) + + for i := 0; i < 10; i++ { + q.Produce(Task{ID: i}) + } + + q.Close() + q.Wait() + + if count != 10 { + t.Fatalf("expected 10 handled items, got %d", count) + } +} + +// ----------------------------------------------------------------------------- +// ⚙️ Тест параллельной обработки +// ----------------------------------------------------------------------------- + +func TestParallelProcessing(t *testing.T) { + q := queue.NewQueue[Task](20) + + var count int32 + q.Register(func(t Task) { + time.Sleep(10 * time.Millisecond) + atomic.AddInt32(&count, 1) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q.HandleParallel(ctx, 4) + + for i := 0; i < 20; i++ { + q.Produce(Task{ID: i}) + } + + q.Close() + q.Wait() + + if count != 20 { + t.Fatalf("expected 20 handled items, got %d", count) + } +} + +// ----------------------------------------------------------------------------- +// 🧘 Graceful shutdown +// ----------------------------------------------------------------------------- + +func TestGracefulShutdown(t *testing.T) { + q := queue.NewQueue[Task](20) + + var count int32 + q.Register(func(t Task) { + time.Sleep(20 * time.Millisecond) + atomic.AddInt32(&count, 1) + }) + + ctx, cancel := context.WithCancel(context.Background()) + q.HandleParallel(ctx, 2) + + for i := 0; i < 10; i++ { + q.Produce(Task{ID: i}) + } + + shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel2() + + if err := q.Shutdown(shutdownCtx); err != nil { + t.Fatalf("shutdown failed: %v", err) + } + + cancel() + + if count == 0 { + t.Fatalf("expected >0 handled items") + } +} + +// ----------------------------------------------------------------------------- +// ❌ Проверка DLQ (Dead Letter Queue) +// ----------------------------------------------------------------------------- + +func TestDLQ(t *testing.T) { + mainQ := queue.NewQueue[Task](10) + dlq := queue.NewQueue[Task](1) + + mainQ.SetDLQ(dlq) + + var handled, dlqHandled int32 + var wg sync.WaitGroup + wg.Add(1) // ожидаем хотя бы одно сообщение в DLQ + + // Основная очередь: один из элементов вызывает панику + mainQ.Register(func(t Task) { + if t.ID == 5 { + panic("boom") // должен попасть в DLQ + } + atomic.AddInt32(&handled, 1) + }) + + // DLQ: увеличиваем счётчик и снимаем ожидание + dlq.Register(func(t Task) { + atomic.AddInt32(&dlqHandled, 1) + wg.Done() + }) + + ctx := context.Background() + mainQ.HandleParallel(ctx, 1) + dlq.HandleParallel(ctx, 1) + + // Отправляем 10 задач + for i := 0; i < 10; i++ { + mainQ.Produce(Task{ID: i}) + } + + // Закрываем и дожидаемся выполнения основной очереди + mainQ.Close() + mainQ.Wait() + + // Ожидаем появления элемента в DLQ + wg.Wait() + + // Теперь можно корректно завершить DLQ + dlq.Close() + dlq.Wait() + + if handled != 9 { + t.Fatalf("expected 9 handled items, got %d", handled) + } + if dlqHandled != 1 { + t.Fatalf("expected 1 DLQ item, got %d", dlqHandled) + } +} + +// ----------------------------------------------------------------------------- +// ⚠️ Проверка OnError +// ----------------------------------------------------------------------------- + +func TestOnError(t *testing.T) { + q := queue.NewQueue[Task](10) + var called int32 + + q.OnError(func(err error) { + atomic.AddInt32(&called, 1) + }) + + q.Register(func(t Task) { + panic("panic test") + }) + + ctx := context.Background() + q.HandleParallel(ctx, 1) + q.Produce(Task{ID: 1}) + q.Close() + q.Wait() + + if called == 0 { + t.Fatalf("expected error handler to be called") + } +} + +// ----------------------------------------------------------------------------- +// 🧠 Проверка TryProduce и отмены +// ----------------------------------------------------------------------------- + +func TestTryProduceAndCancel(t *testing.T) { + q := queue.NewQueue[Task](10) + ctx, cancel := context.WithCancel(context.Background()) + + q.HandleParallel(ctx, 1) + defer cancel() + + ok := q.TryProduce(Task{ID: 1}, 50*time.Millisecond) + if !ok { + t.Fatalf("expected TryProduce to succeed") + } + + cancelCtx, cancel2 := context.WithCancel(context.Background()) + cancel2() + q.ProduceWithContext(cancelCtx, Task{ID: 2}) // должен быть пропущен + q.Close() + q.Wait() +} + +// ----------------------------------------------------------------------------- +// 🧾 Проверка Hook'ов +// ----------------------------------------------------------------------------- + +func TestHooks(t *testing.T) { + q := queue.NewQueue[Task](10) + + var produced, handled, dropped int32 + q.SetHooks(queue.QueueHooks{ + OnProduced: func() { atomic.AddInt32(&produced, 1) }, + OnHandled: func() { atomic.AddInt32(&handled, 1) }, + OnDropped: func(_ interface{}) { atomic.AddInt32(&dropped, 1) }, + }) + + q.Register(func(t Task) {}) + ctx := context.Background() + q.HandleParallel(ctx, 1) + + q.Produce(Task{ID: 1}) + q.TryProduce(Task{ID: 2}, 10*time.Millisecond) + q.Close() + q.Wait() + + if produced == 0 { + t.Fatalf("expected produced hook to trigger") + } + if handled == 0 { + t.Fatalf("expected handled hook to trigger") + } + + _ = dropped // может быть 0 +} + +// ----------------------------------------------------------------------------- +// 🏎️ Проверка на гонки (используй go test -race) +// ----------------------------------------------------------------------------- + +func TestRaceConditions(t *testing.T) { + q := queue.NewQueue[Task](10) + + q.Register(func(t Task) { + time.Sleep(time.Millisecond) + }) + + ctx, cancel := context.WithCancel(context.Background()) + q.HandleParallel(ctx, 4) + + for i := range 100 { + go q.TryProduce(Task{ID: i}, 10*time.Millisecond) + } + + time.Sleep(200 * time.Millisecond) + q.Close() + cancel() + q.Wait() +}