2025-02-20 22:59:55 -05:00

137 lines
2.9 KiB
Go

// Handles job queues
package queues
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang-collections/collections/queue"
"github.com/rayaman/simply-jobber/pkg/models"
)
type Simple struct {
queue *queue.Queue
processJob models.ProcessFunc
mutex sync.Mutex
mutexq sync.Mutex
jid int
responses []models.ResponseFunc
handles map[models.JobType]models.ProcessFunc
workers uint
context *context.Context
cancel context.CancelFunc
}
type pkg struct {
models.Job
jid int
}
func NewSimple(workers uint) *Simple {
sq := &Simple{workers: workers, handles: map[models.JobType]models.ProcessFunc{}, queue: queue.New(), mutex: sync.Mutex{}, mutexq: sync.Mutex{}, processJob: func(j models.Job) (models.JobResponse, error) { return models.JobResponse{}, nil }}
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered!")
}
}()
ctx, cancel := context.WithCancel(context.Background())
sq.context = &ctx
sq.cancel = cancel
sq.genWorkers(workers)
return sq
}
func (sq *Simple) Len() int {
sq.mutexq.Lock()
defer sq.mutexq.Unlock()
return sq.queue.Len()
}
func (sq *Simple) genWorkers(workers uint) {
for range workers {
go func() {
ctx := sq.context
for {
time.Sleep(time.Millisecond)
select {
case <-(*ctx).Done():
return
default:
if p, ok := sq.dequeue(); ok {
var err error
var res models.JobResponse
sq.mutex.Lock()
if handle, ok := sq.handles[p.Type]; ok {
sq.mutex.Unlock()
res, err = handle(p.Job)
} else {
sq.mutex.Unlock()
res, err = sq.processJob(p.Job)
}
if err == nil {
for _, jr := range sq.responses {
jr(res, p.jid)
}
} else {
// Todo log
}
}
}
}
}()
}
}
func (sq *Simple) Scale(workers uint) {
sq.mutex.Lock()
defer sq.mutex.Unlock()
sq.cancel()
ctx, cancel := context.WithCancel(context.Background())
sq.context = &ctx
sq.cancel = cancel
sq.genWorkers(workers)
}
func (sq *Simple) dequeue() (pkg, bool) {
sq.mutexq.Lock()
defer sq.mutexq.Unlock()
if sq.queue.Len() > 0 {
return sq.queue.Dequeue().(pkg), true
} else {
return pkg{}, false
}
}
func (sq *Simple) SetProcessor(fn models.ProcessFunc, args ...string) error {
sq.mutex.Lock()
defer sq.mutex.Unlock()
sq.processJob = fn
return nil
}
func (sq *Simple) OnJobResponse(jr models.ResponseFunc) {
sq.mutex.Lock()
defer sq.mutex.Unlock()
sq.responses = append(sq.responses, jr)
}
func (sq *Simple) AddHandle(t models.JobType, fn models.ProcessFunc) error {
sq.mutex.Lock()
defer sq.mutex.Unlock()
if _, ok := sq.handles[t]; ok {
return fmt.Errorf("job type [%v] already exists", t)
}
sq.handles[t] = fn
return nil
}
func (sq *Simple) Send(j models.Job) (int, error) {
sq.mutexq.Lock()
defer sq.mutexq.Unlock()
sq.queue.Enqueue(pkg{Job: j, jid: sq.jid})
sq.jid++
return sq.jid, nil
}