All checks were successful
CI / test (push) Successful in 5m21s
- Add godoc documentation.
284 lines
6.7 KiB
Go
284 lines
6.7 KiB
Go
package queue_test
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.belvedersky.ru/queue"
|
|
)
|
|
|
|
type Task struct {
|
|
ID int
|
|
}
|
|
|
|
// waitForCondition ждёт, пока функция вернёт true или истечёт таймаут.
|
|
func waitForCondition(t *testing.T, cond func() bool, timeout time.Duration, step time.Duration) {
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
if cond() {
|
|
return
|
|
}
|
|
time.Sleep(step)
|
|
}
|
|
t.Fatalf("condition not met within %v", timeout)
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// 🧩 Базовые тесты
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestBasicQueue(t *testing.T) {
|
|
q := queue.NewQueue[Task](10)
|
|
|
|
var count int32
|
|
q.Register(func(t Task) {
|
|
atomic.AddInt32(&count, 1)
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
q.HandleParallel(ctx, 1)
|
|
|
|
for i := range 10 {
|
|
q.Produce(Task{ID: i})
|
|
}
|
|
|
|
q.Close()
|
|
q.Wait()
|
|
|
|
if count != 10 {
|
|
t.Fatalf("expected 10 handled items, got %d", count)
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// ⚙️ Тест параллельной обработки
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestParallelProcessing(t *testing.T) {
|
|
q := queue.NewQueue[Task](20)
|
|
|
|
var count int32
|
|
q.Register(func(t Task) {
|
|
time.Sleep(10 * time.Millisecond)
|
|
atomic.AddInt32(&count, 1)
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
q.HandleParallel(ctx, 4)
|
|
|
|
for i := range 20 {
|
|
q.Produce(Task{ID: i})
|
|
}
|
|
|
|
q.Close()
|
|
q.Wait()
|
|
|
|
if count != 20 {
|
|
t.Fatalf("expected 20 handled items, got %d", count)
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// 🧘 Graceful shutdown
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestGracefulShutdown(t *testing.T) {
|
|
q := queue.NewQueue[Task](20)
|
|
|
|
var count int32
|
|
q.Register(func(t Task) {
|
|
time.Sleep(20 * time.Millisecond)
|
|
atomic.AddInt32(&count, 1)
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
q.HandleParallel(ctx, 2)
|
|
|
|
for i := range 10 {
|
|
q.Produce(Task{ID: i})
|
|
}
|
|
|
|
shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel2()
|
|
|
|
if err := q.Shutdown(shutdownCtx); err != nil {
|
|
t.Fatalf("shutdown failed: %v", err)
|
|
}
|
|
|
|
cancel()
|
|
|
|
if count == 0 {
|
|
t.Fatalf("expected >0 handled items")
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// ❌ Проверка DLQ (Dead Letter Queue)
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestDLQ(t *testing.T) {
|
|
mainQ := queue.NewQueue[Task](10)
|
|
dlq := queue.NewQueue[Task](1)
|
|
|
|
mainQ.SetDLQ(dlq)
|
|
|
|
var handled, dlqHandled int32
|
|
var wg sync.WaitGroup
|
|
wg.Add(1) // ожидаем хотя бы одно сообщение в DLQ
|
|
|
|
// Основная очередь: один из элементов вызывает панику
|
|
mainQ.Register(func(t Task) {
|
|
if t.ID == 5 {
|
|
panic("boom") // должен попасть в DLQ
|
|
}
|
|
atomic.AddInt32(&handled, 1)
|
|
})
|
|
|
|
// DLQ: увеличиваем счётчик и снимаем ожидание
|
|
dlq.Register(func(t Task) {
|
|
atomic.AddInt32(&dlqHandled, 1)
|
|
wg.Done()
|
|
})
|
|
|
|
ctx := context.Background()
|
|
mainQ.HandleParallel(ctx, 1)
|
|
dlq.HandleParallel(ctx, 1)
|
|
|
|
// Отправляем 10 задач
|
|
for i := range 10 {
|
|
mainQ.Produce(Task{ID: i})
|
|
}
|
|
|
|
// Закрываем и дожидаемся выполнения основной очереди
|
|
mainQ.Close()
|
|
mainQ.Wait()
|
|
|
|
// Ожидаем появления элемента в DLQ
|
|
wg.Wait()
|
|
|
|
// Теперь можно корректно завершить DLQ
|
|
dlq.Close()
|
|
dlq.Wait()
|
|
|
|
if handled != 9 {
|
|
t.Fatalf("expected 9 handled items, got %d", handled)
|
|
}
|
|
if dlqHandled != 1 {
|
|
t.Fatalf("expected 1 DLQ item, got %d", dlqHandled)
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// ⚠️ Проверка OnError
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestOnError(t *testing.T) {
|
|
q := queue.NewQueue[Task](10)
|
|
var called int32
|
|
|
|
q.OnError(func(err error) {
|
|
atomic.AddInt32(&called, 1)
|
|
})
|
|
|
|
q.Register(func(t Task) {
|
|
panic("panic test")
|
|
})
|
|
|
|
ctx := context.Background()
|
|
q.HandleParallel(ctx, 1)
|
|
q.Produce(Task{ID: 1})
|
|
q.Close()
|
|
q.Wait()
|
|
|
|
if called == 0 {
|
|
t.Fatalf("expected error handler to be called")
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// 🧠 Проверка TryProduce и отмены
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestTryProduceAndCancel(t *testing.T) {
|
|
q := queue.NewQueue[Task](10)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
q.HandleParallel(ctx, 1)
|
|
defer cancel()
|
|
|
|
ok := q.TryProduce(Task{ID: 1}, 50*time.Millisecond)
|
|
if !ok {
|
|
t.Fatalf("expected TryProduce to succeed")
|
|
}
|
|
|
|
cancelCtx, cancel2 := context.WithCancel(context.Background())
|
|
cancel2()
|
|
q.ProduceWithContext(cancelCtx, Task{ID: 2}) // должен быть пропущен
|
|
q.Close()
|
|
q.Wait()
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// 🧾 Проверка Hook'ов
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestHooks(t *testing.T) {
|
|
q := queue.NewQueue[Task](10)
|
|
|
|
var produced, handled, dropped int32
|
|
q.SetHooks(queue.QueueHooks[Task]{
|
|
OnProduced: func() { atomic.AddInt32(&produced, 1) },
|
|
OnHandled: func() { atomic.AddInt32(&handled, 1) },
|
|
OnDropped: func(_ Task) { atomic.AddInt32(&dropped, 1) },
|
|
})
|
|
|
|
q.Register(func(t Task) {})
|
|
ctx := context.Background()
|
|
q.HandleParallel(ctx, 1)
|
|
|
|
q.Produce(Task{ID: 1})
|
|
q.TryProduce(Task{ID: 2}, 10*time.Millisecond)
|
|
q.Close()
|
|
q.Wait()
|
|
|
|
if produced == 0 {
|
|
t.Fatalf("expected produced hook to trigger")
|
|
}
|
|
if handled == 0 {
|
|
t.Fatalf("expected handled hook to trigger")
|
|
}
|
|
|
|
_ = dropped // может быть 0
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// 🏎️ Проверка на гонки (используй go test -race)
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func TestRaceConditions(t *testing.T) {
|
|
q := queue.NewQueue[Task](10)
|
|
|
|
q.Register(func(t Task) {
|
|
time.Sleep(time.Millisecond)
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
q.HandleParallel(ctx, 4)
|
|
|
|
for i := range 100 {
|
|
go q.TryProduce(Task{ID: i}, 10*time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
q.Close()
|
|
cancel()
|
|
q.Wait()
|
|
}
|