Files
2026-03-19 22:22:44 +02:00

191 lines
6.5 KiB
Go

package recognition
import (
"context"
"log/slog"
"sync"
"github.com/food-ai/backend/internal/adapters/kafka"
)
const defaultWorkerCount = 5
// WorkerPool processes dish recognition jobs from a single Kafka topic.
type WorkerPool struct {
jobRepo JobRepository
recognizer Recognizer
dishRepo DishRepository
consumer *kafka.Consumer
workerCount int
jobs chan string
}
// NewWorkerPool creates a WorkerPool with five workers consuming from a single consumer.
func NewWorkerPool(
jobRepo JobRepository,
recognizer Recognizer,
dishRepo DishRepository,
consumer *kafka.Consumer,
) *WorkerPool {
return &WorkerPool{
jobRepo: jobRepo,
recognizer: recognizer,
dishRepo: dishRepo,
consumer: consumer,
workerCount: defaultWorkerCount,
jobs: make(chan string, 100),
}
}
// Start launches the Kafka feeder goroutine and all worker goroutines.
func (pool *WorkerPool) Start(workerContext context.Context) {
go pool.consumer.Run(workerContext, pool.jobs)
for i := 0; i < pool.workerCount; i++ {
go pool.runWorker(workerContext)
}
}
func (pool *WorkerPool) runWorker(workerContext context.Context) {
for {
select {
case jobID := <-pool.jobs:
pool.processJob(workerContext, jobID)
case <-workerContext.Done():
return
}
}
}
func (pool *WorkerPool) processJob(workerContext context.Context, jobID string) {
job, fetchError := pool.jobRepo.GetJobByID(workerContext, jobID)
if fetchError != nil {
slog.Error("worker: fetch job", "job_id", jobID, "err", fetchError)
return
}
// Transition to processing.
if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusProcessing, nil, nil); updateError != nil {
slog.Error("worker: set processing status", "job_id", jobID, "err", updateError)
}
if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil {
slog.Warn("worker: notify processing", "job_id", jobID, "err", notifyError)
}
// Run AI recognition.
result, recognizeError := pool.recognizer.RecognizeDish(workerContext, job.ImageBase64, job.MimeType, job.Lang)
if recognizeError != nil {
slog.Error("worker: recognize dish", "job_id", jobID, "err", recognizeError)
errMsg := "recognition failed, please try again"
_ = pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg)
_ = pool.jobRepo.NotifyJobUpdate(workerContext, jobID)
return
}
// Resolve dish_id and recipe_id for each candidate in parallel.
var mu sync.Mutex
var wg sync.WaitGroup
for index := range result.Candidates {
wg.Add(1)
go func(candidateIndex int) {
defer wg.Done()
candidate := result.Candidates[candidateIndex]
englishName := candidate.DishName
dishID, created, findError := pool.dishRepo.FindOrCreate(workerContext, englishName)
if findError != nil {
slog.Warn("worker: find or create dish", "name", englishName, "err", findError)
return
}
localizedName := englishName
if job.Lang != "en" {
// Translate synchronously so the saved result JSON already has the right name.
// resolveLocalizedDishName saves all language translations as a side-effect,
// so enrichDishInBackground is not needed afterwards.
localizedName = pool.resolveLocalizedDishName(workerContext, dishID, englishName, job.Lang, created)
} else if created {
go enrichDishInBackground(pool.recognizer, pool.dishRepo, dishID, englishName)
}
mu.Lock()
result.Candidates[candidateIndex].DishID = &dishID
result.Candidates[candidateIndex].DishName = localizedName
mu.Unlock()
recipeID, _, recipeError := pool.dishRepo.FindOrCreateRecipe(
workerContext, dishID,
candidate.Calories, candidate.ProteinG, candidate.FatG, candidate.CarbsG,
)
if recipeError != nil {
slog.Warn("worker: find or create recipe", "dish_id", dishID, "err", recipeError)
return
}
mu.Lock()
result.Candidates[candidateIndex].RecipeID = &recipeID
mu.Unlock()
}(index)
}
wg.Wait()
// Transition to done.
if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusDone, result, nil); updateError != nil {
slog.Error("worker: set done status", "job_id", jobID, "err", updateError)
}
if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil {
slog.Warn("worker: notify done", "job_id", jobID, "err", notifyError)
}
}
// enrichDishInBackground translates a newly created dish name into all supported languages.
// Runs as a fire-and-forget goroutine so it never blocks recognition.
func enrichDishInBackground(recognizer Recognizer, dishRepo DishRepository, dishID, dishName string) {
enrichContext := context.Background()
translations, translateError := recognizer.TranslateDishName(enrichContext, dishName)
if translateError != nil {
slog.Warn("translate dish name", "name", dishName, "err", translateError)
return
}
for lang, translatedName := range translations {
if upsertError := dishRepo.UpsertTranslation(enrichContext, dishID, lang, translatedName); upsertError != nil {
slog.Warn("upsert dish translation", "dish_id", dishID, "lang", lang, "err", upsertError)
}
}
}
// resolveLocalizedDishName returns the dish name in the requested language.
// For a newly created dish it always translates via AI (synchronously) and saves
// all translations to dish_translations as a side-effect.
// For an existing dish it checks dish_translations first; if the row is missing
// (e.g. background enrichment is still in flight) it falls back to AI translation.
// Returns englishName unchanged on any unrecoverable error.
func (pool *WorkerPool) resolveLocalizedDishName(
workerContext context.Context,
dishID, englishName, lang string,
isNewDish bool,
) string {
if !isNewDish {
translatedName, found, getError := pool.dishRepo.GetTranslation(workerContext, dishID, lang)
if getError != nil {
slog.Warn("worker: get dish translation", "dish_id", dishID, "lang", lang, "err", getError)
} else if found {
return translatedName
}
// Fall through to AI translation.
}
translations, translateError := pool.recognizer.TranslateDishName(workerContext, englishName)
if translateError != nil {
slog.Warn("worker: translate dish name", "name", englishName, "err", translateError)
return englishName
}
for languageCode, nameInLang := range translations {
if upsertError := pool.dishRepo.UpsertTranslation(workerContext, dishID, languageCode, nameInLang); upsertError != nil {
slog.Warn("worker: upsert dish translation", "dish_id", dishID, "lang", languageCode, "err", upsertError)
}
}
if localizedName, ok := translations[lang]; ok {
return localizedName
}
return englishName
}