Files
queue/README.md
Кобелев Андрей Андреевич 4310ceea11
All checks were successful
CI / test (push) Successful in 2m42s
CI / Runner Init (on release tag) (push) Has been skipped
Add CI info, run workflow only on version update.
2025-10-14 14:09:27 +05:00

2.9 KiB

queue

Go Reference CI

Simple, lightweight, concurrent FIFO queue for Go with optional Dead Letter Queue (DLQ), hooks, rate limiting, and graceful shutdown.

Repository: https://git.belvedersky.ru/belvedersky/queue

Installation

go get git.belvedersky.ru/queue

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "git.belvedersky.ru/queue"
)

type Task struct{ ID int }

func main() {
    // Create queue with a buffer capacity
    q := queue.NewQueue[Task](10)

    // Register handler(s). Handlers run in worker goroutines.
    q.Register(func(t Task) {
        fmt.Printf("handled %d\n", t.ID)
        time.Sleep(50 * time.Millisecond) // simulate work
    })

    // Start workers
    ctx := context.Background()
    q.HandleParallel(ctx, 2)

    // Produce tasks
    for i := 0; i < 5; i++ {
        q.Produce(Task{ID: i})
    }

    // Graceful shutdown: stop intake and wait for handlers to finish
    _ = q.Shutdown(context.Background())
}

Dead Letter Queue (DLQ)

Send items that panic in handlers to a separate queue:

type Task struct{ ID int }

mainQ := queue.NewQueue[Task](5)
dlq := queue.NewQueue[Task](5)
mainQ.SetDLQ(dlq)

mainQ.Register(func(t Task) {
    if t.ID == 3 {
        panic("boom") // goes to DLQ
    }
})

dlq.Register(func(t Task) {
    fmt.Printf("sent to DLQ: %d\n", t.ID)
})

ctx := context.Background()
mainQ.HandleParallel(ctx, 1)
dlq.HandleParallel(ctx, 1)

for i := 0; i < 5; i++ {
    mainQ.Produce(Task{ID: i})
}

mainQ.Close(); mainQ.Wait()
dlq.Close(); dlq.Wait()

Hooks and Rate Limiting

q := queue.NewQueue[int](10)
q.SetHooks(queue.QueueHooks{
    OnProduced: func() { fmt.Println("produced") },
    OnHandled:  func() { fmt.Println("handled") },
    OnDropped:  func(v any) { fmt.Println("dropped:", v) },
})

// Limit processing rate (RPS per worker)
q.SetRateLimit(100)

Examples

See runnable examples in the repository:

  • examples/example_basic_test.go
  • examples/example_dlq_test.go
  • examples/example_parallel_test.go

Run with: go test ./examples -run Example.

Benchmarks

goos: darwin
goarch: arm64
pkg: git.belvedersky.ru/belvedersky/queue
cpu: Apple M1 Pro
BenchmarkProduceConsume/workers_1-10         	    9488	    118244 ns/op	       8 B/op	       1 allocs/op
BenchmarkProduceConsume/workers_4-10         	   40800	     29371 ns/op	       8 B/op	       1 allocs/op
BenchmarkProduceConsume/workers_16-10        	  163110	      7340 ns/op	       8 B/op	       1 allocs/op
BenchmarkTryProduce-10                       	 1610434	       774.8 ns/op	     520 B/op	       7 allocs/op
PASS
ok  	git.belvedersky.ru/belvedersky/queue	6.574s

License

MIT