From 72f0e52a78324a4c54df9dbe6f291dd1a26e360b Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Thu, 20 Feb 2025 22:59:55 -0500 Subject: [PATCH] simple queues working --- go.mod | 5 ++ go.sum | 2 + main.go | 124 +++++++++++++++++++++++++++++++++++ pkg/api/queues/simple.go | 136 +++++++++++++++++++++++++++++++++++++++ pkg/models/job.go | 50 ++++++++++++++ pkg/models/queue.go | 18 ++++++ 6 files changed, 335 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 pkg/api/queues/simple.go create mode 100644 pkg/models/job.go create mode 100644 pkg/models/queue.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..377dc12 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/rayaman/simply-jobber + +go 1.23.0 + +require github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3a6eee6 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= diff --git a/main.go b/main.go new file mode 100644 index 0000000..b4b93b3 --- /dev/null +++ b/main.go @@ -0,0 +1,124 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/rayaman/simply-jobber/pkg/api/queues" + "github.com/rayaman/simply-jobber/pkg/models" +) + +type Data struct { + Message string `json:"message"` +} + +type Data2 struct { + Msg string `json:"message"` +} + +var ( + Job1 models.JobType = "Test 1" + Job2 models.JobType = "Test 2" +) + +func main() { + var queue models.Queue = queues.NewSimple(0) + + myJob := models.Job{ + Type: Job1, + Data: Data{ + Message: "Job message :D", + }, + } + + myJob2 := models.Job{ + Type: Job2, + Data: Data2{ + Msg: "Job message :D", + }, + } + + err := queue.AddHandle(Job1, func(j models.Job) (models.JobResponse, error) { + data, err := models.GetDataFromStruct(j, Data{}) + + if err != nil { + return models.JobResponse{}, fmt.Errorf("cannot get data from job") + } + + fmt.Printf("Got data 1 %v\n", data.Message) + time.Sleep(time.Second) + + return models.JobResponse{}, nil + }) + + if err != nil { + panic(err) + } + + err = queue.AddHandle(Job2, func(j models.Job) (models.JobResponse, error) { + data, err := models.GetDataFromStruct(j, Data2{}) + + if err != nil { + return models.JobResponse{}, fmt.Errorf("cannot get data from job") + } + + fmt.Printf("Got data 2 %v\n", data.Msg) + time.Sleep(time.Second * 2) + + return models.JobResponse{}, nil + }) + + if err != nil { + panic(err) + } + + // queue.SetProcessor(func(j models.Job) (models.JobResponse, error) { + // switch j.Type { + // case "test": + // data, err := models.GetDataFromStruct(j, Data{}) + // if err != nil { + // return models.JobResponse{}, fmt.Errorf("cannot get data from job") + // } + // include := "N/A" + // if data.Message == "Job message :D" { + // include = "Special+" + // } + // return models.JobResponse{ + // Type: "test", + // Data: Data{ + // Message: "Completed job :D " + include, + // }, + // }, nil + // default: + // return models.JobResponse{}, fmt.Errorf("cannot process job") + // } + // }) + + // queue.OnJobResponse(func(jr models.JobResponse, i int) { + // data, err := models.GetDataFromStruct(models.Job(jr), Data{}) + // if err != nil { + // return + // } + // fmt.Printf("JobID: %v Message: %v\n", i, data.Message) + // }) + go func() { + time.Sleep(time.Second * 10) + queue.Scale(1) + time.Sleep(time.Second * 5) + queue.Scale(5) + time.Sleep(time.Second * 5) + queue.Scale(0) + time.Sleep(time.Second * 5) + queue.Scale(5) + time.Sleep(time.Second * 5) + os.Exit(0) + }() + + for { + time.Sleep(time.Second * 1) + queue.Send(myJob) + time.Sleep(time.Second * 1) + queue.Send(myJob2) + } +} diff --git a/pkg/api/queues/simple.go b/pkg/api/queues/simple.go new file mode 100644 index 0000000..1294c41 --- /dev/null +++ b/pkg/api/queues/simple.go @@ -0,0 +1,136 @@ +// Handles job queues +package queues + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/golang-collections/collections/queue" + "github.com/rayaman/simply-jobber/pkg/models" +) + +type Simple struct { + queue *queue.Queue + processJob models.ProcessFunc + mutex sync.Mutex + mutexq sync.Mutex + jid int + responses []models.ResponseFunc + handles map[models.JobType]models.ProcessFunc + workers uint + context *context.Context + cancel context.CancelFunc +} + +type pkg struct { + models.Job + jid int +} + +func NewSimple(workers uint) *Simple { + sq := &Simple{workers: workers, handles: map[models.JobType]models.ProcessFunc{}, queue: queue.New(), mutex: sync.Mutex{}, mutexq: sync.Mutex{}, processJob: func(j models.Job) (models.JobResponse, error) { return models.JobResponse{}, nil }} + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered!") + } + }() + ctx, cancel := context.WithCancel(context.Background()) + sq.context = &ctx + sq.cancel = cancel + sq.genWorkers(workers) + return sq +} + +func (sq *Simple) Len() int { + sq.mutexq.Lock() + defer sq.mutexq.Unlock() + return sq.queue.Len() +} + +func (sq *Simple) genWorkers(workers uint) { + for range workers { + go func() { + ctx := sq.context + for { + time.Sleep(time.Millisecond) + select { + case <-(*ctx).Done(): + return + default: + if p, ok := sq.dequeue(); ok { + var err error + var res models.JobResponse + sq.mutex.Lock() + if handle, ok := sq.handles[p.Type]; ok { + sq.mutex.Unlock() + res, err = handle(p.Job) + } else { + sq.mutex.Unlock() + res, err = sq.processJob(p.Job) + } + if err == nil { + for _, jr := range sq.responses { + jr(res, p.jid) + } + } else { + // Todo log + } + } + } + } + }() + } +} + +func (sq *Simple) Scale(workers uint) { + sq.mutex.Lock() + defer sq.mutex.Unlock() + sq.cancel() + ctx, cancel := context.WithCancel(context.Background()) + sq.context = &ctx + sq.cancel = cancel + sq.genWorkers(workers) +} + +func (sq *Simple) dequeue() (pkg, bool) { + sq.mutexq.Lock() + defer sq.mutexq.Unlock() + if sq.queue.Len() > 0 { + return sq.queue.Dequeue().(pkg), true + } else { + return pkg{}, false + } +} + +func (sq *Simple) SetProcessor(fn models.ProcessFunc, args ...string) error { + sq.mutex.Lock() + defer sq.mutex.Unlock() + sq.processJob = fn + return nil +} + +func (sq *Simple) OnJobResponse(jr models.ResponseFunc) { + sq.mutex.Lock() + defer sq.mutex.Unlock() + sq.responses = append(sq.responses, jr) +} + +func (sq *Simple) AddHandle(t models.JobType, fn models.ProcessFunc) error { + sq.mutex.Lock() + defer sq.mutex.Unlock() + if _, ok := sq.handles[t]; ok { + return fmt.Errorf("job type [%v] already exists", t) + } + sq.handles[t] = fn + return nil +} + +func (sq *Simple) Send(j models.Job) (int, error) { + sq.mutexq.Lock() + defer sq.mutexq.Unlock() + sq.queue.Enqueue(pkg{Job: j, jid: sq.jid}) + sq.jid++ + return sq.jid, nil +} diff --git a/pkg/models/job.go b/pkg/models/job.go new file mode 100644 index 0000000..837430d --- /dev/null +++ b/pkg/models/job.go @@ -0,0 +1,50 @@ +package models + +import "encoding/json" + +type JobName string +type JobType string +type JobResponse Job + +type Job struct { + // Type of job + Type JobType `json:"type"` + // The data you are packaging with the job + Data any `json:"data"` +} + +// Takes in raw job data and a reference struct for it's data +func GetJobFromData[T any](j []byte, data T) (*Job, T, error) { + job := &Job{} + err := json.Unmarshal(j, job) + + if err != nil { + return nil, data, err + } + + dat, err := json.Marshal(job.Data) + + if err != nil { + return nil, data, err + } + + err = json.Unmarshal(dat, &data) + + job.Data = data + + return job, data, err +} + +func GetDataFromStruct[T any](job Job, data T) (T, error) { + dat, err := json.Marshal(job.Data) + + if err != nil { + return data, err + } + + err = json.Unmarshal(dat, &data) + + job.Data = data + + return data, err +} diff --git a/pkg/models/queue.go b/pkg/models/queue.go new file mode 100644 index 0000000..b3e48e7 --- /dev/null +++ b/pkg/models/queue.go @@ -0,0 +1,18 @@ +package models + +type Queue interface { + Send(Job) (int, error) + // Callback function that is triggered when a job has been processed. Can view the job response + OnJobResponse(ResponseFunc) + // Sets a function that will be triggered by all jobs not processed by a handle + SetProcessor(ProcessFunc, ...string) error + // Adds a handle for a certain job type. this func is only triggered by a piticular job type + AddHandle(JobType, ProcessFunc) error + // Allows you to scale the queue + Scale(uint) + // Number of queued jobs + Len() int +} + +type ProcessFunc func(Job) (JobResponse, error) +type ResponseFunc func(JobResponse, int)