diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..a379645 --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,33 @@ +name: CI + +on: + push: + branches: + - '**' + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: go.mod + check-latest: true + + - name: Go version + run: go version + + - name: Vet + run: go vet ./... + + - name: Test + run: go test -race ./... + + - name: Bench (smoke) + run: go test -bench=. -run=^$ ./... + diff --git a/README.md b/README.md index 9c95ac4..574bb7b 100644 --- a/README.md +++ b/README.md @@ -2,62 +2,113 @@ [![Go Reference](https://pkg.go.dev/badge/git.belvedersky.ru/queue.svg)](https://pkg.go.dev/git.belvedersky.ru/queue) +Simple, lightweight, concurrent FIFO queue for Go with optional Dead Letter Queue (DLQ), hooks, rate limiting, and graceful shutdown. -Simple and lightweight queue implementation in Go with optional Dead Letter Queue (DLQ) support. - - -Repository: - https://git.belvedersky.ru/belvedersky/queue +Repository: https://git.belvedersky.ru/belvedersky/queue ## Installation - go get git.belvedersky.ru/queue +```bash +go get git.belvedersky.ru/queue +``` -## Usage +## Quick Start -Basic example: +```go +package main - package main +import ( + "context" + "fmt" + "time" - import ( - "fmt" - "git.belvedersky.ru/queue" - ) + "git.belvedersky.ru/queue" +) - func main() { - q := queue.NewQueue +type Task struct{ ID int } - q.Push(1) - q.Push(2) +func main() { + // Create queue with a buffer capacity + q := queue.NewQueue[Task](10) - for !q.Empty() { - item := q.Pop() - fmt.Println(item) - } + // Register handler(s). Handlers run in worker goroutines. + q.Register(func(t Task) { + fmt.Printf("handled %d\n", t.ID) + time.Sleep(50 * time.Millisecond) // simulate work + }) + + // Start workers + ctx := context.Background() + q.HandleParallel(ctx, 2) + + // Produce tasks + for i := 0; i < 5; i++ { + q.Produce(Task{ID: i}) } -## Dead Letter Queue Example + // Graceful shutdown: stop intake and wait for handlers to finish + _ = q.Shutdown(context.Background()) +} +``` -See examples/example_dlq_test.go: +## Dead Letter Queue (DLQ) - mainQ := queue.NewQueue - dlq := queue.NewQueue +Send items that panic in handlers to a separate queue: - mainQ.SetDLQ(dlq) +```go +type Task struct{ ID int } +mainQ := queue.NewQueue[Task](5) +dlq := queue.NewQueue[Task](5) +mainQ.SetDLQ(dlq) + +mainQ.Register(func(t Task) { + if t.ID == 3 { + panic("boom") // goes to DLQ + } +}) + +dlq.Register(func(t Task) { + fmt.Printf("sent to DLQ: %d\n", t.ID) +}) + +ctx := context.Background() +mainQ.HandleParallel(ctx, 1) +dlq.HandleParallel(ctx, 1) + +for i := 0; i < 5; i++ { + mainQ.Produce(Task{ID: i}) +} + +mainQ.Close(); mainQ.Wait() +dlq.Close(); dlq.Wait() +``` + +## Hooks and Rate Limiting + +```go +q := queue.NewQueue[int](10) +q.SetHooks(queue.QueueHooks{ + OnProduced: func() { fmt.Println("produced") }, + OnHandled: func() { fmt.Println("handled") }, + OnDropped: func(v any) { fmt.Println("dropped:", v) }, +}) + +// Limit processing rate (RPS per worker) +q.SetRateLimit(100) +``` ## Examples -Full examples are available in: - https://git.belvedersky.ru/belvedersky/queue/src/branch/main/examples - -Includes: - - examples/example_basic_test.go - - examples/example_dlq_test.go - - examples/example_parallel_test.go +See runnable examples in the repository: +- examples/example_basic_test.go +- examples/example_dlq_test.go +- examples/example_parallel_test.go +Run with: `go test ./examples -run Example`. ## Benchmarks + ```bash goos: darwin goarch: arm64 @@ -71,7 +122,6 @@ PASS ok git.belvedersky.ru/belvedersky/queue 6.574s ``` - ## License MIT diff --git a/queue.go b/queue.go index bcbd214..b1d354e 100644 --- a/queue.go +++ b/queue.go @@ -1,3 +1,6 @@ +// Package queue provides a simple, type-safe, concurrent FIFO queue +// with support for parallel handlers, graceful shutdown, optional +// Dead Letter Queue (DLQ), hooks, and rate limiting. package queue import ( @@ -7,35 +10,44 @@ import ( "time" ) -// QueueHooks — набор хуков для метрик и мониторинга. -type QueueHooks struct { - OnProduced func() // вызывается при успешном добавлении элемента - OnHandled func() // вызывается при успешной обработке - OnDropped func(any) // вызывается при пропуске (cancel/timeout) +// QueueHooks defines optional callbacks that are invoked during queue +// operations. Hooks can be used for metrics, logging, and monitoring. +// +// All hooks are invoked best-effort and must be fast and panic-safe in user code. +// +// - OnProduced is called after an item is successfully enqueued. +// - OnHandled is called after a handler returns without panic. +// - OnDropped is called when an item is intentionally dropped due to +// a canceled context or a closed queue. +type QueueHooks[T any] struct { + OnProduced func() + OnHandled func() + OnDropped func(T) } -// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой -// и корректным завершением без data race +// Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel +// handling, rate limiting and graceful shutdown without data races. type Queue[T any] struct { ch chan T capacity int - mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed - sendMu sync.RWMutex // синхронизирует записи в канал и его закрытие + mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker + sendMu sync.RWMutex // synchronizes sends to the channel and its closing handlers []func(T) errorFunc func(error) dlq *Queue[T] - hooks QueueHooks + hooks QueueHooks[T] closeOnce sync.Once closed bool - wg sync.WaitGroup - rateLimit <-chan time.Time + wg sync.WaitGroup + rateTicker *time.Ticker } -// NewQueue создаёт новую очередь с заданной ёмкостью. +// NewQueue creates a new queue with the given buffer capacity. +// Capacity must be > 0. func NewQueue[T any](capacity int) *Queue[T] { if capacity <= 0 { panic("queue capacity must be > 0") @@ -46,7 +58,9 @@ func NewQueue[T any](capacity int) *Queue[T] { } } -// ---------- Конфигурация ---------- +// Register adds a handler function that will be invoked for each item. +// Multiple handlers may be registered; each item will be passed to each +// registered handler in registration order. Handlers must be concurrency-safe. func (q *Queue[T]) Register(handler func(T)) { if handler == nil { panic("handler cannot be nil") @@ -56,35 +70,50 @@ func (q *Queue[T]) Register(handler func(T)) { q.mu.Unlock() } -// ---------- При ошибке ---------- +// OnError registers an error callback which is invoked when a handler panics. +// The error contains panic details. Set to nil to disable. func (q *Queue[T]) OnError(fn func(error)) { q.mu.Lock() q.errorFunc = fn q.mu.Unlock() } -// ---------- Установка DLQ ---------- +// SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic. +// Items are enqueued to the DLQ in a non-blocking best-effort manner. func (q *Queue[T]) SetDLQ(dlq *Queue[T]) { q.mu.Lock() q.dlq = dlq q.mu.Unlock() } -// ---------- Установка Хуков ---------- -func (q *Queue[T]) SetHooks(h QueueHooks) { +// SetHooks installs optional hooks for produced/handled/dropped events. +// Passing a zero-value struct clears previously set hooks. +func (q *Queue[T]) SetHooks(h QueueHooks[T]) { q.mu.Lock() q.hooks = h q.mu.Unlock() } -// ---------- Установка RateLimit ---------- +// SetRateLimit sets a per-queue processing rate limit in requests-per-second. +// If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any +// previous ticker and will be applied by active workers. func (q *Queue[T]) SetRateLimit(rps int) { + q.mu.Lock() + defer q.mu.Unlock() + // Stop previous ticker if exists + if q.rateTicker != nil { + q.rateTicker.Stop() + q.rateTicker = nil + } if rps > 0 { - q.rateLimit = time.Tick(time.Second / time.Duration(rps)) + q.rateTicker = time.NewTicker(time.Second / time.Duration(rps)) } } -// ---------- Запуск обработчиков ---------- +// HandleParallel starts worker goroutines that consume items from the queue +// and invoke registered handlers. Workers exit when the channel is closed or +// when the provided context is canceled. If workers <= 0, a single worker +// is started. func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { if workers <= 0 { workers = 1 @@ -99,8 +128,12 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { if !ok { return } - if q.rateLimit != nil { - <-q.rateLimit + // copy ticker safely + q.mu.RLock() + t := q.rateTicker + q.mu.RUnlock() + if t != nil { + <-t.C } q.processItem(item) case <-ctx.Done(): @@ -111,9 +144,9 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) { } } -// ---------- Отправка в канал (синхронизированная) ---------- -// sendBlocking — блокирующая отправка (используется Produce). -// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch) +// sendBlocking performs a blocking send into the channel (used by Produce). +// It holds a read lock across the send to prevent Close() from closing the +// channel concurrently. func (q *Queue[T]) sendBlocking(item T) bool { // Быстрая проверка закрытия под RLock q.sendMu.RLock() @@ -130,7 +163,7 @@ func (q *Queue[T]) sendBlocking(item T) bool { } defer q.sendMu.RUnlock() - defer func() { _ = recover() }() // на случай гонок в чужом коде + defer func() { _ = recover() }() // guard against external races/panics q.ch <- item if hooks.OnProduced != nil { hooks.OnProduced() @@ -138,7 +171,7 @@ func (q *Queue[T]) sendBlocking(item T) bool { return true } -// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ). +// sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ). func (q *Queue[T]) sendNonBlocking(item T) bool { q.sendMu.RLock() q.mu.RLock() @@ -166,14 +199,48 @@ func (q *Queue[T]) sendNonBlocking(item T) bool { } } -// ---------- Паблик-API продюсеров ---------- -// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется. +// sendWithContext blocks until the item is sent or the context is canceled. +func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool { + q.sendMu.RLock() + q.mu.RLock() + closed := q.closed + hooks := q.hooks + q.mu.RUnlock() + if closed { + q.sendMu.RUnlock() + if hooks.OnDropped != nil { + hooks.OnDropped(item) + } + return false + } + defer q.sendMu.RUnlock() + + defer func() { _ = recover() }() + select { + case q.ch <- item: + if hooks.OnProduced != nil { + hooks.OnProduced() + } + return true + case <-ctx.Done(): + if hooks.OnDropped != nil { + hooks.OnDropped(item) + } + return false + } +} + +// Produce enqueues an item and blocks until the item is placed into the buffer. +// If the queue is closed, the item is dropped and the method returns immediately. func (q *Queue[T]) Produce(item T) { _ = q.sendBlocking(item) } -// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере). +// ProduceWithContext enqueues an item, blocking until the item is accepted or +// the provided context is canceled. If the context is already canceled or the +// queue is closed, the item is dropped and OnDropped is invoked (if set). func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { + // Если контекст уже отменён — не блокируемся и помечаем как dropped select { case <-ctx.Done(): q.mu.RLock() @@ -183,16 +250,15 @@ func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) { q.mu.RUnlock() return default: - _ = q.sendNonBlocking(item) } + _ = q.sendWithContext(ctx, item) } -// TryProduce — неблокирующая постановка с таймаутом. +// TryProduce attempts to enqueue an item without blocking. It retries until +// either the item is accepted or the timeout elapses. Returns true on success. func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - t := time.NewTimer(timeout) - defer t.Stop() for { if ok := q.sendNonBlocking(item); ok { @@ -201,8 +267,6 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { select { case <-ctx.Done(): return false - case <-t.C: - return false default: // короткая уступка планировщику time.Sleep(time.Microsecond) @@ -210,7 +274,9 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool { } } -// ---------- Завершение ---------- +// Shutdown closes the queue for new items and waits for workers to finish +// processing or until the provided context is canceled. Returns ctx.Err() on +// timeout/cancel. func (q *Queue[T]) Shutdown(ctx context.Context) error { q.Close() done := make(chan struct{}) @@ -228,36 +294,50 @@ func (q *Queue[T]) Shutdown(ctx context.Context) error { func (q *Queue[T]) Close() { q.closeOnce.Do(func() { - // запретим новые записи: эксклюзивная блокировка перекроет send RLock + // forbid new sends: exclusive lock supersedes send RLock q.sendMu.Lock() q.mu.Lock() if !q.closed { q.closed = true close(q.ch) } + // Остановим тикер, если включён + if q.rateTicker != nil { + q.rateTicker.Stop() + q.rateTicker = nil + } q.mu.Unlock() q.sendMu.Unlock() }) } +// Wait blocks until all worker goroutines started by HandleParallel exit. func (q *Queue[T]) Wait() { q.wg.Wait() } -// ---------- Инфо-методы ---------- -func (q *Queue[T]) Len() int { return len(q.ch) } -func (q *Queue[T]) Cap() int { return q.capacity } +// Len returns the number of buffered items currently in the queue. +func (q *Queue[T]) Len() int { return len(q.ch) } + +// Cap returns the configured buffer capacity of the queue. +func (q *Queue[T]) Cap() int { return q.capacity } + +// IsEmpty reports whether the queue buffer is empty. func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 } +// Closed reports whether the queue has been closed for sending. func (q *Queue[T]) Closed() bool { q.mu.RLock() defer q.mu.RUnlock() return q.closed } +// String returns a human-readable summary of the queue state. func (q *Queue[T]) String() string { return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed()) } -// ---------- Внутренняя обработка ---------- +// processItem executes all registered handlers for the item, recovering from +// panics. On panic, it calls the error callback (if any) and forwards the +// item to the DLQ using a best-effort non-blocking send. func (q *Queue[T]) processItem(item T) { q.mu.RLock() handlers := append([]func(T){}, q.handlers...) diff --git a/queue_test.go b/queue_test.go index 675247f..0e0cf02 100644 --- a/queue_test.go +++ b/queue_test.go @@ -43,7 +43,7 @@ func TestBasicQueue(t *testing.T) { q.HandleParallel(ctx, 1) - for i := 0; i < 10; i++ { + for i := range 10 { q.Produce(Task{ID: i}) } @@ -73,7 +73,7 @@ func TestParallelProcessing(t *testing.T) { q.HandleParallel(ctx, 4) - for i := 0; i < 20; i++ { + for i := range 20 { q.Produce(Task{ID: i}) } @@ -101,7 +101,7 @@ func TestGracefulShutdown(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) q.HandleParallel(ctx, 2) - for i := 0; i < 10; i++ { + for i := range 10 { q.Produce(Task{ID: i}) } @@ -152,7 +152,7 @@ func TestDLQ(t *testing.T) { dlq.HandleParallel(ctx, 1) // Отправляем 10 задач - for i := 0; i < 10; i++ { + for i := range 10 { mainQ.Produce(Task{ID: i}) } @@ -233,10 +233,10 @@ func TestHooks(t *testing.T) { q := queue.NewQueue[Task](10) var produced, handled, dropped int32 - q.SetHooks(queue.QueueHooks{ + q.SetHooks(queue.QueueHooks[Task]{ OnProduced: func() { atomic.AddInt32(&produced, 1) }, OnHandled: func() { atomic.AddInt32(&handled, 1) }, - OnDropped: func(_ interface{}) { atomic.AddInt32(&dropped, 1) }, + OnDropped: func(_ Task) { atomic.AddInt32(&dropped, 1) }, }) q.Register(func(t Task) {})