package main import ( "context" "github.com/food-ai/backend/internal/adapters/kafka" "github.com/food-ai/backend/internal/adapters/openai" "github.com/food-ai/backend/internal/domain/dish" "github.com/food-ai/backend/internal/domain/product" "github.com/food-ai/backend/internal/domain/recognition" "github.com/jackc/pgx/v5/pgxpool" "github.com/kelseyhightower/envconfig" ) type workerConfig struct { DatabaseURL string `envconfig:"DATABASE_URL" required:"true"` OpenAIAPIKey string `envconfig:"OPENAI_API_KEY" required:"true"` KafkaBrokers []string `envconfig:"KAFKA_BROKERS" default:"kafka:9092"` WorkerPlan string `envconfig:"WORKER_PLAN" default:"free"` // "paid" | "free" } func loadConfig() (*workerConfig, error) { var workerCfg workerConfig if configError := envconfig.Process("", &workerCfg); configError != nil { return nil, configError } return &workerCfg, nil } // WorkerApp bundles background services that need lifecycle management. type WorkerApp struct { workerPool *recognition.WorkerPool productWorkerPool *recognition.ProductWorkerPool } // Start launches the dish and product worker pool goroutines. func (workerApp *WorkerApp) Start(applicationContext context.Context) { workerApp.workerPool.Start(applicationContext) workerApp.productWorkerPool.Start(applicationContext) } func initWorker(workerCfg *workerConfig, pool *pgxpool.Pool) (*WorkerApp, error) { openaiClient := openai.NewClient(workerCfg.OpenAIAPIKey) // Dish recognition worker. dishRepository := dish.NewRepository(pool) jobRepository := recognition.NewJobRepository(pool) dishTopic := recognition.TopicFree dishGroupID := "dish-recognition-free" if workerCfg.WorkerPlan == "paid" { dishTopic = recognition.TopicPaid dishGroupID = "dish-recognition-paid" } dishConsumer, dishConsumerError := kafka.NewConsumer(workerCfg.KafkaBrokers, dishGroupID, dishTopic) if dishConsumerError != nil { return nil, dishConsumerError } workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, dishConsumer) // Product recognition worker. productRepository := product.NewRepository(pool) productJobRepository := recognition.NewProductJobRepository(pool) productTopic := recognition.ProductTopicFree productGroupID := "product-recognition-free" if workerCfg.WorkerPlan == "paid" { productTopic = recognition.ProductTopicPaid productGroupID = "product-recognition-paid" } productConsumer, productConsumerError := kafka.NewConsumer(workerCfg.KafkaBrokers, productGroupID, productTopic) if productConsumerError != nil { return nil, productConsumerError } productWorkerPool := recognition.NewProductWorkerPool(productJobRepository, openaiClient, productRepository, productConsumer) return &WorkerApp{ workerPool: workerPool, productWorkerPool: productWorkerPool, }, nil }