package pool import ( "context" "log/slog" "runtime/debug" "sync" ) type WorkFunction func(workContext context.Context) type WorkerPool struct { concurrencyLimit int semaphoreChannel chan struct{} waitGroup sync.WaitGroup logger *slog.Logger } func NewWorkerPool(concurrencyLimit int, logger *slog.Logger) *WorkerPool { return &WorkerPool{ concurrencyLimit: concurrencyLimit, semaphoreChannel: make(chan struct{}, concurrencyLimit), logger: logger, } } func (workerPool *WorkerPool) Submit(workContext context.Context, workFunction WorkFunction) bool { select { case workerPool.semaphoreChannel <- struct{}{}: workerPool.waitGroup.Add(1) go func() { defer workerPool.waitGroup.Done() defer func() { <-workerPool.semaphoreChannel }() defer func() { recoveredPanic := recover() if recoveredPanic != nil { workerPool.logger.Error( "worker panic recovered", "panic_value", recoveredPanic, "stack_trace", string(debug.Stack()), ) } }() workFunction(workContext) }() return true case <-workContext.Done(): return false } } func (workerPool *WorkerPool) Wait() { workerPool.waitGroup.Wait() } func (workerPool *WorkerPool) ActiveWorkerCount() int { return len(workerPool.semaphoreChannel) }