3 Commits

Author SHA1 Message Date
Кобелев Андрей Андреевич
4310ceea11 Add CI info, run workflow only on version update.
All checks were successful
CI / test (push) Successful in 2m42s
CI / Runner Init (on release tag) (push) Has been skipped
2025-10-14 14:09:27 +05:00
Кобелев Андрей Андреевич
bac4f05437 - Context fix without timer.
All checks were successful
CI / test (push) Successful in 5m21s
- Add godoc documentation.
2025-10-14 13:23:12 +05:00
Кобелев Андрей Андреевич
557fb40268 Change package path 2025-10-14 12:53:46 +05:00
9 changed files with 274 additions and 91 deletions

53
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,53 @@
name: CI
on:
push:
branches:
- '**'
tags:
- 'v*'
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
check-latest: true
- name: Go version
run: go version
- name: Vet
run: go vet ./...
- name: Test
run: go test -race ./...
- name: Bench (smoke)
run: go test -bench=. -run=^$ ./...
runner-init:
name: Runner Init (on release tag)
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/v')
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
check-latest: true
- name: Warm module cache
run: go mod download
- name: Verify build and tests
run: |
go vet ./...
go test -race ./...

119
README.md
View File

@@ -1,59 +1,115 @@
# queue
Simple and lightweight queue implementation in Go with optional Dead Letter Queue (DLQ) support.
[![Go Reference](https://pkg.go.dev/badge/git.belvedersky.ru/queue.svg)](https://pkg.go.dev/git.belvedersky.ru/queue)
[![CI](https://git.belvedersky.ru/belvedersky/queue/actions/workflows/ci.yml/badge.svg)](https://git.belvedersky.ru/belvedersky/queue/actions)
Repository:
https://git.belvedersky.ru/belvedersky/queue
Simple, lightweight, concurrent FIFO queue for Go with optional Dead Letter Queue (DLQ), hooks, rate limiting, and graceful shutdown.
Repository: https://git.belvedersky.ru/belvedersky/queue
## Installation
go get git.belvedersky.ru/queue
```bash
go get git.belvedersky.ru/queue
```
## Usage
## Quick Start
Basic example:
```go
package main
package main
import (
"context"
"fmt"
"time"
import (
"fmt"
"git.belvedersky.ru/queue"
)
"git.belvedersky.ru/queue"
)
func main() {
q := queue.NewQueue
type Task struct{ ID int }
q.Push(1)
q.Push(2)
func main() {
// Create queue with a buffer capacity
q := queue.NewQueue[Task](10)
for !q.Empty() {
item := q.Pop()
fmt.Println(item)
}
// Register handler(s). Handlers run in worker goroutines.
q.Register(func(t Task) {
fmt.Printf("handled %d\n", t.ID)
time.Sleep(50 * time.Millisecond) // simulate work
})
// Start workers
ctx := context.Background()
q.HandleParallel(ctx, 2)
// Produce tasks
for i := 0; i < 5; i++ {
q.Produce(Task{ID: i})
}
## Dead Letter Queue Example
// Graceful shutdown: stop intake and wait for handlers to finish
_ = q.Shutdown(context.Background())
}
```
See examples/example_dlq_test.go:
## Dead Letter Queue (DLQ)
mainQ := queue.NewQueue
dlq := queue.NewQueue
Send items that panic in handlers to a separate queue:
mainQ.SetDLQ(dlq)
```go
type Task struct{ ID int }
mainQ := queue.NewQueue[Task](5)
dlq := queue.NewQueue[Task](5)
mainQ.SetDLQ(dlq)
mainQ.Register(func(t Task) {
if t.ID == 3 {
panic("boom") // goes to DLQ
}
})
dlq.Register(func(t Task) {
fmt.Printf("sent to DLQ: %d\n", t.ID)
})
ctx := context.Background()
mainQ.HandleParallel(ctx, 1)
dlq.HandleParallel(ctx, 1)
for i := 0; i < 5; i++ {
mainQ.Produce(Task{ID: i})
}
mainQ.Close(); mainQ.Wait()
dlq.Close(); dlq.Wait()
```
## Hooks and Rate Limiting
```go
q := queue.NewQueue[int](10)
q.SetHooks(queue.QueueHooks{
OnProduced: func() { fmt.Println("produced") },
OnHandled: func() { fmt.Println("handled") },
OnDropped: func(v any) { fmt.Println("dropped:", v) },
})
// Limit processing rate (RPS per worker)
q.SetRateLimit(100)
```
## Examples
Full examples are available in:
https://git.belvedersky.ru/belvedersky/queue/src/branch/main/examples
Includes:
- examples/example_basic_test.go
- examples/example_dlq_test.go
- examples/example_parallel_test.go
See runnable examples in the repository:
- examples/example_basic_test.go
- examples/example_dlq_test.go
- examples/example_parallel_test.go
Run with: `go test ./examples -run Example`.
## Benchmarks
```bash
goos: darwin
goarch: arm64
@@ -67,7 +123,6 @@ PASS
ok git.belvedersky.ru/belvedersky/queue 6.574s
```
## License
MIT

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"git.belvedersky.ru/belvedersky/queue"
"git.belvedersky.ru/queue"
)
type benchTask struct {

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"time"
"git.belvedersky.ru/belvedersky/queue"
"git.belvedersky.ru/queue"
)
// Example_basic демонстрирует базовое использование очереди.

View File

@@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"
"git.belvedersky.ru/belvedersky/queue"
"git.belvedersky.ru/queue"
)
// Example_DLQ показывает, как использовать Dead Letter Queue

View File

@@ -6,7 +6,7 @@ import (
"sync"
"time"
"git.belvedersky.ru/belvedersky/queue"
"git.belvedersky.ru/queue"
)
// Example_parallel демонстрирует работу нескольких воркеров.

2
go.mod
View File

@@ -1,3 +1,3 @@
module git.belvedersky.ru/belvedersky/queue
module git.belvedersky.ru/queue
go 1.25.1

169
queue.go
View File

@@ -1,3 +1,6 @@
// Package queue provides a simple, type-safe, concurrent FIFO queue
// with support for parallel handlers, graceful shutdown, optional
// Dead Letter Queue (DLQ), hooks, and rate limiting.
package queue
import (
@@ -7,35 +10,44 @@ import (
"time"
)
// QueueHooks — набор хуков для метрик и мониторинга.
type QueueHooks struct {
OnProduced func() // вызывается при успешном добавлении элемента
OnHandled func() // вызывается при успешной обработке
OnDropped func(interface{}) // вызывается при пропуске (cancel/timeout)
// QueueHooks defines optional callbacks that are invoked during queue
// operations. Hooks can be used for metrics, logging, and monitoring.
//
// All hooks are invoked best-effort and must be fast and panic-safe in user code.
//
// - OnProduced is called after an item is successfully enqueued.
// - OnHandled is called after a handler returns without panic.
// - OnDropped is called when an item is intentionally dropped due to
// a canceled context or a closed queue.
type QueueHooks[T any] struct {
OnProduced func()
OnHandled func()
OnDropped func(T)
}
// Queue — потокобезопасная обобщённая FIFO очередь с DLQ, параллельной обработкой
// и корректным завершением без data race (даже под -race).
// Queue is a generic, concurrency-safe FIFO queue with optional DLQ, parallel
// handling, rate limiting and graceful shutdown without data races.
type Queue[T any] struct {
ch chan T
capacity int
mu sync.RWMutex // защищает: handlers/errorFunc/dlq/hooks/closed
sendMu sync.RWMutex // синхронизирует ВСЕ записи в канал и его закрытие
mu sync.RWMutex // protects: handlers, errorFunc, dlq, hooks, closed, rateTicker
sendMu sync.RWMutex // synchronizes sends to the channel and its closing
handlers []func(T)
errorFunc func(error)
dlq *Queue[T]
hooks QueueHooks
hooks QueueHooks[T]
closeOnce sync.Once
closed bool
wg sync.WaitGroup
rateLimit <-chan time.Time
wg sync.WaitGroup
rateTicker *time.Ticker
}
// NewQueue создаёт новую очередь с заданной ёмкостью.
// NewQueue creates a new queue with the given buffer capacity.
// Capacity must be > 0.
func NewQueue[T any](capacity int) *Queue[T] {
if capacity <= 0 {
panic("queue capacity must be > 0")
@@ -46,8 +58,9 @@ func NewQueue[T any](capacity int) *Queue[T] {
}
}
// ---------- Конфигурация ----------
// Register adds a handler function that will be invoked for each item.
// Multiple handlers may be registered; each item will be passed to each
// registered handler in registration order. Handlers must be concurrency-safe.
func (q *Queue[T]) Register(handler func(T)) {
if handler == nil {
panic("handler cannot be nil")
@@ -57,32 +70,50 @@ func (q *Queue[T]) Register(handler func(T)) {
q.mu.Unlock()
}
// OnError registers an error callback which is invoked when a handler panics.
// The error contains panic details. Set to nil to disable.
func (q *Queue[T]) OnError(fn func(error)) {
q.mu.Lock()
q.errorFunc = fn
q.mu.Unlock()
}
// SetDLQ sets a Dead Letter Queue which receives items that caused a handler panic.
// Items are enqueued to the DLQ in a non-blocking best-effort manner.
func (q *Queue[T]) SetDLQ(dlq *Queue[T]) {
q.mu.Lock()
q.dlq = dlq
q.mu.Unlock()
}
func (q *Queue[T]) SetHooks(h QueueHooks) {
// SetHooks installs optional hooks for produced/handled/dropped events.
// Passing a zero-value struct clears previously set hooks.
func (q *Queue[T]) SetHooks(h QueueHooks[T]) {
q.mu.Lock()
q.hooks = h
q.mu.Unlock()
}
// SetRateLimit sets a per-queue processing rate limit in requests-per-second.
// If rps <= 0, rate limiting is disabled. Calling SetRateLimit replaces any
// previous ticker and will be applied by active workers.
func (q *Queue[T]) SetRateLimit(rps int) {
q.mu.Lock()
defer q.mu.Unlock()
// Stop previous ticker if exists
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
if rps > 0 {
q.rateLimit = time.Tick(time.Second / time.Duration(rps))
q.rateTicker = time.NewTicker(time.Second / time.Duration(rps))
}
}
// ---------- Запуск обработчиков ----------
// HandleParallel starts worker goroutines that consume items from the queue
// and invoke registered handlers. Workers exit when the channel is closed or
// when the provided context is canceled. If workers <= 0, a single worker
// is started.
func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if workers <= 0 {
workers = 1
@@ -97,8 +128,12 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
if !ok {
return
}
if q.rateLimit != nil {
<-q.rateLimit
// copy ticker safely
q.mu.RLock()
t := q.rateTicker
q.mu.RUnlock()
if t != nil {
<-t.C
}
q.processItem(item)
case <-ctx.Done():
@@ -109,13 +144,11 @@ func (q *Queue[T]) HandleParallel(ctx context.Context, workers int) {
}
}
// ---------- Отправка в канал (синхронизированная) ----------
// sendBlocking — блокирующая отправка (используется Produce).
// Берём RLock на всё время операции записи, чтобы Close() (Lock) не мог выполнить close(q.ch)
// одновременно с записью — это устраняет data race.
// sendBlocking performs a blocking send into the channel (used by Produce).
// It holds a read lock across the send to prevent Close() from closing the
// channel concurrently.
func (q *Queue[T]) sendBlocking(item T) bool {
// Быстрая проверка закрытия под RLock (необязательна, но экономит работу)
// Быстрая проверка закрытия под RLock
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
@@ -130,7 +163,7 @@ func (q *Queue[T]) sendBlocking(item T) bool {
}
defer q.sendMu.RUnlock()
defer func() { _ = recover() }() // на случай гонок в чужом коде
defer func() { _ = recover() }() // guard against external races/panics
q.ch <- item
if hooks.OnProduced != nil {
hooks.OnProduced()
@@ -138,7 +171,7 @@ func (q *Queue[T]) sendBlocking(item T) bool {
return true
}
// sendNonBlocking — неблокирующая отправка (для TryProduce/ProduceWithContext/DLQ).
// sendNonBlocking attempts a non-blocking send (used by TryProduce and DLQ).
func (q *Queue[T]) sendNonBlocking(item T) bool {
q.sendMu.RLock()
q.mu.RLock()
@@ -166,15 +199,48 @@ func (q *Queue[T]) sendNonBlocking(item T) bool {
}
}
// ---------- Паблик-API продюсеров ----------
// sendWithContext blocks until the item is sent or the context is canceled.
func (q *Queue[T]) sendWithContext(ctx context.Context, item T) bool {
q.sendMu.RLock()
q.mu.RLock()
closed := q.closed
hooks := q.hooks
q.mu.RUnlock()
if closed {
q.sendMu.RUnlock()
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
defer q.sendMu.RUnlock()
// Produce — блокирующая постановка. Если очередь закрыта — элемент игнорируется.
defer func() { _ = recover() }()
select {
case q.ch <- item:
if hooks.OnProduced != nil {
hooks.OnProduced()
}
return true
case <-ctx.Done():
if hooks.OnDropped != nil {
hooks.OnDropped(item)
}
return false
}
}
// Produce enqueues an item and blocks until the item is placed into the buffer.
// If the queue is closed, the item is dropped and the method returns immediately.
func (q *Queue[T]) Produce(item T) {
_ = q.sendBlocking(item)
}
// ProduceWithContext — постановка с отменой по ctx (не блокирует при полном буфере).
// ProduceWithContext enqueues an item, blocking until the item is accepted or
// the provided context is canceled. If the context is already canceled or the
// queue is closed, the item is dropped and OnDropped is invoked (if set).
func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
// Если контекст уже отменён — не блокируемся и помечаем как dropped
select {
case <-ctx.Done():
q.mu.RLock()
@@ -184,16 +250,15 @@ func (q *Queue[T]) ProduceWithContext(ctx context.Context, item T) {
q.mu.RUnlock()
return
default:
_ = q.sendNonBlocking(item)
}
_ = q.sendWithContext(ctx, item)
}
// TryProduce — неблокирующая постановка с таймаутом.
// TryProduce attempts to enqueue an item without blocking. It retries until
// either the item is accepted or the timeout elapses. Returns true on success.
func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
t := time.NewTimer(timeout)
defer t.Stop()
for {
if ok := q.sendNonBlocking(item); ok {
@@ -202,8 +267,6 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-t.C:
return false
default:
// короткая уступка планировщику
time.Sleep(time.Microsecond)
@@ -211,8 +274,9 @@ func (q *Queue[T]) TryProduce(item T, timeout time.Duration) bool {
}
}
// ---------- Завершение ----------
// Shutdown closes the queue for new items and waits for workers to finish
// processing or until the provided context is canceled. Returns ctx.Err() on
// timeout/cancel.
func (q *Queue[T]) Shutdown(ctx context.Context) error {
q.Close()
done := make(chan struct{})
@@ -230,38 +294,50 @@ func (q *Queue[T]) Shutdown(ctx context.Context) error {
func (q *Queue[T]) Close() {
q.closeOnce.Do(func() {
// запретим новые записи: эксклюзивная блокировка перекроет send RLock
// forbid new sends: exclusive lock supersedes send RLock
q.sendMu.Lock()
q.mu.Lock()
if !q.closed {
q.closed = true
close(q.ch)
}
// Остановим тикер, если включён
if q.rateTicker != nil {
q.rateTicker.Stop()
q.rateTicker = nil
}
q.mu.Unlock()
q.sendMu.Unlock()
})
}
// Wait blocks until all worker goroutines started by HandleParallel exit.
func (q *Queue[T]) Wait() { q.wg.Wait() }
// ---------- Инфо-методы ----------
// Len returns the number of buffered items currently in the queue.
func (q *Queue[T]) Len() int { return len(q.ch) }
func (q *Queue[T]) Len() int { return len(q.ch) }
func (q *Queue[T]) Cap() int { return q.capacity }
// Cap returns the configured buffer capacity of the queue.
func (q *Queue[T]) Cap() int { return q.capacity }
// IsEmpty reports whether the queue buffer is empty.
func (q *Queue[T]) IsEmpty() bool { return len(q.ch) == 0 }
// Closed reports whether the queue has been closed for sending.
func (q *Queue[T]) Closed() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.closed
}
// String returns a human-readable summary of the queue state.
func (q *Queue[T]) String() string {
return fmt.Sprintf("Queue{len=%d, cap=%d, closed=%t}", q.Len(), q.Cap(), q.Closed())
}
// ---------- Внутренняя обработка ----------
// processItem executes all registered handlers for the item, recovering from
// panics. On panic, it calls the error callback (if any) and forwards the
// item to the DLQ using a best-effort non-blocking send.
func (q *Queue[T]) processItem(item T) {
q.mu.RLock()
handlers := append([]func(T){}, q.handlers...)
@@ -269,7 +345,6 @@ func (q *Queue[T]) processItem(item T) {
dlq := q.dlq
hooks := q.hooks
q.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {

View File

@@ -7,7 +7,7 @@ import (
"testing"
"time"
"git.belvedersky.ru/belvedersky/queue"
"git.belvedersky.ru/queue"
)
type Task struct {
@@ -43,7 +43,7 @@ func TestBasicQueue(t *testing.T) {
q.HandleParallel(ctx, 1)
for i := 0; i < 10; i++ {
for i := range 10 {
q.Produce(Task{ID: i})
}
@@ -73,7 +73,7 @@ func TestParallelProcessing(t *testing.T) {
q.HandleParallel(ctx, 4)
for i := 0; i < 20; i++ {
for i := range 20 {
q.Produce(Task{ID: i})
}
@@ -101,7 +101,7 @@ func TestGracefulShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
q.HandleParallel(ctx, 2)
for i := 0; i < 10; i++ {
for i := range 10 {
q.Produce(Task{ID: i})
}
@@ -152,7 +152,7 @@ func TestDLQ(t *testing.T) {
dlq.HandleParallel(ctx, 1)
// Отправляем 10 задач
for i := 0; i < 10; i++ {
for i := range 10 {
mainQ.Produce(Task{ID: i})
}
@@ -233,10 +233,10 @@ func TestHooks(t *testing.T) {
q := queue.NewQueue[Task](10)
var produced, handled, dropped int32
q.SetHooks(queue.QueueHooks{
q.SetHooks(queue.QueueHooks[Task]{
OnProduced: func() { atomic.AddInt32(&produced, 1) },
OnHandled: func() { atomic.AddInt32(&handled, 1) },
OnDropped: func(_ interface{}) { atomic.AddInt32(&dropped, 1) },
OnDropped: func(_ Task) { atomic.AddInt32(&dropped, 1) },
})
q.Register(func(t Task) {})