Files
queue/queue_test.go
Кобелев Андрей Андреевич bac4f05437
All checks were successful
CI / test (push) Successful in 5m21s
- Context fix without timer.
- Add godoc documentation.
2025-10-14 13:23:12 +05:00

284 lines
6.7 KiB
Go

package queue_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"git.belvedersky.ru/queue"
)
type Task struct {
ID int
}
// waitForCondition ждёт, пока функция вернёт true или истечёт таймаут.
func waitForCondition(t *testing.T, cond func() bool, timeout time.Duration, step time.Duration) {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if cond() {
return
}
time.Sleep(step)
}
t.Fatalf("condition not met within %v", timeout)
}
// -----------------------------------------------------------------------------
// 🧩 Базовые тесты
// -----------------------------------------------------------------------------
func TestBasicQueue(t *testing.T) {
q := queue.NewQueue[Task](10)
var count int32
q.Register(func(t Task) {
atomic.AddInt32(&count, 1)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q.HandleParallel(ctx, 1)
for i := range 10 {
q.Produce(Task{ID: i})
}
q.Close()
q.Wait()
if count != 10 {
t.Fatalf("expected 10 handled items, got %d", count)
}
}
// -----------------------------------------------------------------------------
// ⚙️ Тест параллельной обработки
// -----------------------------------------------------------------------------
func TestParallelProcessing(t *testing.T) {
q := queue.NewQueue[Task](20)
var count int32
q.Register(func(t Task) {
time.Sleep(10 * time.Millisecond)
atomic.AddInt32(&count, 1)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q.HandleParallel(ctx, 4)
for i := range 20 {
q.Produce(Task{ID: i})
}
q.Close()
q.Wait()
if count != 20 {
t.Fatalf("expected 20 handled items, got %d", count)
}
}
// -----------------------------------------------------------------------------
// 🧘 Graceful shutdown
// -----------------------------------------------------------------------------
func TestGracefulShutdown(t *testing.T) {
q := queue.NewQueue[Task](20)
var count int32
q.Register(func(t Task) {
time.Sleep(20 * time.Millisecond)
atomic.AddInt32(&count, 1)
})
ctx, cancel := context.WithCancel(context.Background())
q.HandleParallel(ctx, 2)
for i := range 10 {
q.Produce(Task{ID: i})
}
shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
if err := q.Shutdown(shutdownCtx); err != nil {
t.Fatalf("shutdown failed: %v", err)
}
cancel()
if count == 0 {
t.Fatalf("expected >0 handled items")
}
}
// -----------------------------------------------------------------------------
// ❌ Проверка DLQ (Dead Letter Queue)
// -----------------------------------------------------------------------------
func TestDLQ(t *testing.T) {
mainQ := queue.NewQueue[Task](10)
dlq := queue.NewQueue[Task](1)
mainQ.SetDLQ(dlq)
var handled, dlqHandled int32
var wg sync.WaitGroup
wg.Add(1) // ожидаем хотя бы одно сообщение в DLQ
// Основная очередь: один из элементов вызывает панику
mainQ.Register(func(t Task) {
if t.ID == 5 {
panic("boom") // должен попасть в DLQ
}
atomic.AddInt32(&handled, 1)
})
// DLQ: увеличиваем счётчик и снимаем ожидание
dlq.Register(func(t Task) {
atomic.AddInt32(&dlqHandled, 1)
wg.Done()
})
ctx := context.Background()
mainQ.HandleParallel(ctx, 1)
dlq.HandleParallel(ctx, 1)
// Отправляем 10 задач
for i := range 10 {
mainQ.Produce(Task{ID: i})
}
// Закрываем и дожидаемся выполнения основной очереди
mainQ.Close()
mainQ.Wait()
// Ожидаем появления элемента в DLQ
wg.Wait()
// Теперь можно корректно завершить DLQ
dlq.Close()
dlq.Wait()
if handled != 9 {
t.Fatalf("expected 9 handled items, got %d", handled)
}
if dlqHandled != 1 {
t.Fatalf("expected 1 DLQ item, got %d", dlqHandled)
}
}
// -----------------------------------------------------------------------------
// ⚠️ Проверка OnError
// -----------------------------------------------------------------------------
func TestOnError(t *testing.T) {
q := queue.NewQueue[Task](10)
var called int32
q.OnError(func(err error) {
atomic.AddInt32(&called, 1)
})
q.Register(func(t Task) {
panic("panic test")
})
ctx := context.Background()
q.HandleParallel(ctx, 1)
q.Produce(Task{ID: 1})
q.Close()
q.Wait()
if called == 0 {
t.Fatalf("expected error handler to be called")
}
}
// -----------------------------------------------------------------------------
// 🧠 Проверка TryProduce и отмены
// -----------------------------------------------------------------------------
func TestTryProduceAndCancel(t *testing.T) {
q := queue.NewQueue[Task](10)
ctx, cancel := context.WithCancel(context.Background())
q.HandleParallel(ctx, 1)
defer cancel()
ok := q.TryProduce(Task{ID: 1}, 50*time.Millisecond)
if !ok {
t.Fatalf("expected TryProduce to succeed")
}
cancelCtx, cancel2 := context.WithCancel(context.Background())
cancel2()
q.ProduceWithContext(cancelCtx, Task{ID: 2}) // должен быть пропущен
q.Close()
q.Wait()
}
// -----------------------------------------------------------------------------
// 🧾 Проверка Hook'ов
// -----------------------------------------------------------------------------
func TestHooks(t *testing.T) {
q := queue.NewQueue[Task](10)
var produced, handled, dropped int32
q.SetHooks(queue.QueueHooks[Task]{
OnProduced: func() { atomic.AddInt32(&produced, 1) },
OnHandled: func() { atomic.AddInt32(&handled, 1) },
OnDropped: func(_ Task) { atomic.AddInt32(&dropped, 1) },
})
q.Register(func(t Task) {})
ctx := context.Background()
q.HandleParallel(ctx, 1)
q.Produce(Task{ID: 1})
q.TryProduce(Task{ID: 2}, 10*time.Millisecond)
q.Close()
q.Wait()
if produced == 0 {
t.Fatalf("expected produced hook to trigger")
}
if handled == 0 {
t.Fatalf("expected handled hook to trigger")
}
_ = dropped // может быть 0
}
// -----------------------------------------------------------------------------
// 🏎️ Проверка на гонки (используй go test -race)
// -----------------------------------------------------------------------------
func TestRaceConditions(t *testing.T) {
q := queue.NewQueue[Task](10)
q.Register(func(t Task) {
time.Sleep(time.Millisecond)
})
ctx, cancel := context.WithCancel(context.Background())
q.HandleParallel(ctx, 4)
for i := range 100 {
go q.TryProduce(Task{ID: i}, 10*time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
q.Close()
cancel()
q.Wait()
}