package recognition import ( "context" "log/slog" "sync" "github.com/food-ai/backend/internal/adapters/kafka" ) const defaultWorkerCount = 5 // WorkerPool processes dish recognition jobs from a single Kafka topic. type WorkerPool struct { jobRepo JobRepository recognizer Recognizer dishRepo DishRepository consumer *kafka.Consumer workerCount int jobs chan string } // NewWorkerPool creates a WorkerPool with five workers consuming from a single consumer. func NewWorkerPool( jobRepo JobRepository, recognizer Recognizer, dishRepo DishRepository, consumer *kafka.Consumer, ) *WorkerPool { return &WorkerPool{ jobRepo: jobRepo, recognizer: recognizer, dishRepo: dishRepo, consumer: consumer, workerCount: defaultWorkerCount, jobs: make(chan string, 100), } } // Start launches the Kafka feeder goroutine and all worker goroutines. func (pool *WorkerPool) Start(workerContext context.Context) { go pool.consumer.Run(workerContext, pool.jobs) for i := 0; i < pool.workerCount; i++ { go pool.runWorker(workerContext) } } func (pool *WorkerPool) runWorker(workerContext context.Context) { for { select { case jobID := <-pool.jobs: pool.processJob(workerContext, jobID) case <-workerContext.Done(): return } } } func (pool *WorkerPool) processJob(workerContext context.Context, jobID string) { job, fetchError := pool.jobRepo.GetJobByID(workerContext, jobID) if fetchError != nil { slog.Error("worker: fetch job", "job_id", jobID, "err", fetchError) return } // Transition to processing. if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusProcessing, nil, nil); updateError != nil { slog.Error("worker: set processing status", "job_id", jobID, "err", updateError) } if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil { slog.Warn("worker: notify processing", "job_id", jobID, "err", notifyError) } // Run AI recognition. result, recognizeError := pool.recognizer.RecognizeDish(workerContext, job.ImageBase64, job.MimeType, job.Lang) if recognizeError != nil { slog.Error("worker: recognize dish", "job_id", jobID, "err", recognizeError) errMsg := "recognition failed, please try again" _ = pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg) _ = pool.jobRepo.NotifyJobUpdate(workerContext, jobID) return } // Resolve dish_id and recipe_id for each candidate in parallel. var mu sync.Mutex var wg sync.WaitGroup for index := range result.Candidates { wg.Add(1) go func(candidateIndex int) { defer wg.Done() candidate := result.Candidates[candidateIndex] englishName := candidate.DishName dishID, created, findError := pool.dishRepo.FindOrCreate(workerContext, englishName) if findError != nil { slog.Warn("worker: find or create dish", "name", englishName, "err", findError) return } localizedName := englishName if job.Lang != "en" { // Translate synchronously so the saved result JSON already has the right name. // resolveLocalizedDishName saves all language translations as a side-effect, // so enrichDishInBackground is not needed afterwards. localizedName = pool.resolveLocalizedDishName(workerContext, dishID, englishName, job.Lang, created) } else if created { go enrichDishInBackground(pool.recognizer, pool.dishRepo, dishID, englishName) } mu.Lock() result.Candidates[candidateIndex].DishID = &dishID result.Candidates[candidateIndex].DishName = localizedName mu.Unlock() recipeID, _, recipeError := pool.dishRepo.FindOrCreateRecipe( workerContext, dishID, candidate.Calories, candidate.ProteinG, candidate.FatG, candidate.CarbsG, ) if recipeError != nil { slog.Warn("worker: find or create recipe", "dish_id", dishID, "err", recipeError) return } mu.Lock() result.Candidates[candidateIndex].RecipeID = &recipeID mu.Unlock() }(index) } wg.Wait() // Transition to done. if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusDone, result, nil); updateError != nil { slog.Error("worker: set done status", "job_id", jobID, "err", updateError) } if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil { slog.Warn("worker: notify done", "job_id", jobID, "err", notifyError) } } // enrichDishInBackground translates a newly created dish name into all supported languages. // Runs as a fire-and-forget goroutine so it never blocks recognition. func enrichDishInBackground(recognizer Recognizer, dishRepo DishRepository, dishID, dishName string) { enrichContext := context.Background() translations, translateError := recognizer.TranslateDishName(enrichContext, dishName) if translateError != nil { slog.Warn("translate dish name", "name", dishName, "err", translateError) return } for lang, translatedName := range translations { if upsertError := dishRepo.UpsertTranslation(enrichContext, dishID, lang, translatedName); upsertError != nil { slog.Warn("upsert dish translation", "dish_id", dishID, "lang", lang, "err", upsertError) } } } // resolveLocalizedDishName returns the dish name in the requested language. // For a newly created dish it always translates via AI (synchronously) and saves // all translations to dish_translations as a side-effect. // For an existing dish it checks dish_translations first; if the row is missing // (e.g. background enrichment is still in flight) it falls back to AI translation. // Returns englishName unchanged on any unrecoverable error. func (pool *WorkerPool) resolveLocalizedDishName( workerContext context.Context, dishID, englishName, lang string, isNewDish bool, ) string { if !isNewDish { translatedName, found, getError := pool.dishRepo.GetTranslation(workerContext, dishID, lang) if getError != nil { slog.Warn("worker: get dish translation", "dish_id", dishID, "lang", lang, "err", getError) } else if found { return translatedName } // Fall through to AI translation. } translations, translateError := pool.recognizer.TranslateDishName(workerContext, englishName) if translateError != nil { slog.Warn("worker: translate dish name", "name", englishName, "err", translateError) return englishName } for languageCode, nameInLang := range translations { if upsertError := pool.dishRepo.UpsertTranslation(workerContext, dishID, languageCode, nameInLang); upsertError != nil { slog.Warn("worker: upsert dish translation", "dish_id", dishID, "lang", languageCode, "err", upsertError) } } if localizedName, ok := translations[lang]; ok { return localizedName } return englishName }