diff --git a/backend/Makefile b/backend/Makefile index 82b9326..1a95813 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -1,4 +1,4 @@ -.PHONY: run run-worker dev dev-infra-up dev-infra-down test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs docker-logs-worker +.PHONY: run run-worker-paid run-worker-free dev dev-infra-up dev-infra-down test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs docker-logs-worker ifneq (,$(wildcard .env)) include .env @@ -9,14 +9,18 @@ endif run: go run ./cmd/server -run-worker: - go run ./cmd/worker +run-worker-paid: + WORKER_PLAN=paid go run ./cmd/worker -# Start only infra (postgres, kafka) in Docker, run server + worker locally +run-worker-free: + WORKER_PLAN=free go run ./cmd/worker + +# Start only infra (postgres, kafka) in Docker, run server + both workers locally dev: dev-infra-up @trap 'kill 0' INT; \ go run ./cmd/server & \ - go run ./cmd/worker & \ + WORKER_PLAN=paid go run ./cmd/worker & \ + WORKER_PLAN=free go run ./cmd/worker & \ wait dev-infra-up: @@ -61,4 +65,4 @@ docker-logs: docker compose logs -f app docker-logs-worker: - docker compose logs -f worker + docker compose logs -f worker-paid worker-free diff --git a/backend/cmd/worker/init.go b/backend/cmd/worker/init.go index 075f080..2acff6d 100644 --- a/backend/cmd/worker/init.go +++ b/backend/cmd/worker/init.go @@ -15,6 +15,7 @@ type workerConfig struct { DatabaseURL string `envconfig:"DATABASE_URL" required:"true"` OpenAIAPIKey string `envconfig:"OPENAI_API_KEY" required:"true"` KafkaBrokers []string `envconfig:"KAFKA_BROKERS" default:"kafka:9092"` + WorkerPlan string `envconfig:"WORKER_PLAN" default:"free"` // "paid" | "free" } func loadConfig() (*workerConfig, error) { @@ -40,19 +41,18 @@ func initWorker(workerCfg *workerConfig, pool *pgxpool.Pool) (*WorkerApp, error) dishRepository := dish.NewRepository(pool) jobRepository := recognition.NewJobRepository(pool) - paidConsumer, paidConsumerError := kafka.NewConsumer( - workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicPaid, - ) - if paidConsumerError != nil { - return nil, paidConsumerError - } - freeConsumer, freeConsumerError := kafka.NewConsumer( - workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicFree, - ) - if freeConsumerError != nil { - return nil, freeConsumerError + topic := recognition.TopicFree + groupID := "dish-recognition-free" + if workerCfg.WorkerPlan == "paid" { + topic = recognition.TopicPaid + groupID = "dish-recognition-paid" } - workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, paidConsumer, freeConsumer) + consumer, consumerError := kafka.NewConsumer(workerCfg.KafkaBrokers, groupID, topic) + if consumerError != nil { + return nil, consumerError + } + + workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, consumer) return &WorkerApp{workerPool: workerPool}, nil } diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index 713e9d1..1f786a4 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -69,7 +69,7 @@ services: volumes: - ./firebase-credentials.json:/app/firebase-credentials.json:ro - worker: + worker-paid: build: context: . dockerfile: Dockerfile @@ -78,10 +78,25 @@ services: DATABASE_URL: postgres://food_ai:food_ai_local@postgres:5432/food_ai?sslmode=disable OPENAI_API_KEY: ${OPENAI_API_KEY} KAFKA_BROKERS: kafka:9092 + WORKER_PLAN: paid depends_on: postgres: condition: service_healthy - kafka: + kafka-init: + condition: service_completed_successfully + + worker-free: + build: + context: . + dockerfile: Dockerfile + command: ["./worker"] + environment: + DATABASE_URL: postgres://food_ai:food_ai_local@postgres:5432/food_ai?sslmode=disable + OPENAI_API_KEY: ${OPENAI_API_KEY} + KAFKA_BROKERS: kafka:9092 + WORKER_PLAN: free + depends_on: + postgres: condition: service_healthy kafka-init: condition: service_completed_successfully diff --git a/backend/internal/domain/recognition/worker.go b/backend/internal/domain/recognition/worker.go index 4f875d3..84f7700 100644 --- a/backend/internal/domain/recognition/worker.go +++ b/backend/internal/domain/recognition/worker.go @@ -4,50 +4,42 @@ import ( "context" "log/slog" "sync" - "time" "github.com/food-ai/backend/internal/adapters/kafka" ) const defaultWorkerCount = 5 -// WorkerPool processes dish recognition jobs from Kafka with priority queuing. -// Paid jobs are processed before free jobs. +// WorkerPool processes dish recognition jobs from a single Kafka topic. type WorkerPool struct { - jobRepo JobRepository - recognizer Recognizer - dishRepo DishRepository - paidConsumer *kafka.Consumer - freeConsumer *kafka.Consumer - workerCount int - paidJobs chan string - freeJobs chan string + jobRepo JobRepository + recognizer Recognizer + dishRepo DishRepository + consumer *kafka.Consumer + workerCount int + jobs chan string } -// NewWorkerPool creates a WorkerPool with five workers. +// NewWorkerPool creates a WorkerPool with five workers consuming from a single consumer. func NewWorkerPool( jobRepo JobRepository, recognizer Recognizer, dishRepo DishRepository, - paidConsumer *kafka.Consumer, - freeConsumer *kafka.Consumer, + consumer *kafka.Consumer, ) *WorkerPool { return &WorkerPool{ - jobRepo: jobRepo, - recognizer: recognizer, - dishRepo: dishRepo, - paidConsumer: paidConsumer, - freeConsumer: freeConsumer, - workerCount: defaultWorkerCount, - paidJobs: make(chan string, 100), - freeJobs: make(chan string, 100), + jobRepo: jobRepo, + recognizer: recognizer, + dishRepo: dishRepo, + consumer: consumer, + workerCount: defaultWorkerCount, + jobs: make(chan string, 100), } } -// Start launches the Kafka feeder goroutines and all worker goroutines. +// Start launches the Kafka feeder goroutine and all worker goroutines. func (pool *WorkerPool) Start(workerContext context.Context) { - go pool.paidConsumer.Run(workerContext, pool.paidJobs) - go pool.freeConsumer.Run(workerContext, pool.freeJobs) + go pool.consumer.Run(workerContext, pool.jobs) for i := 0; i < pool.workerCount; i++ { go pool.runWorker(workerContext) } @@ -55,26 +47,11 @@ func (pool *WorkerPool) Start(workerContext context.Context) { func (pool *WorkerPool) runWorker(workerContext context.Context) { for { - // Priority step: drain paid queue without blocking. select { - case jobID := <-pool.paidJobs: - pool.processJob(workerContext, jobID) - continue - case <-workerContext.Done(): - return - default: - } - - // Fall back to either queue with a 100ms timeout. - select { - case jobID := <-pool.paidJobs: - pool.processJob(workerContext, jobID) - case jobID := <-pool.freeJobs: + case jobID := <-pool.jobs: pool.processJob(workerContext, jobID) case <-workerContext.Done(): return - case <-time.After(100 * time.Millisecond): - // nothing available; loop again } } }