Replace dual-consumer priority WorkerPool with a single consumer per worker process. WORKER_PLAN=paid|free selects the Kafka topic and consumer group ID (dish-recognition-paid / dish-recognition-free). docker-compose now runs worker-paid and worker-free as separate services for independent scaling. Makefile dev target launches both workers locally. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
143 lines
4.6 KiB
Go
143 lines
4.6 KiB
Go
package recognition
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
|
|
"github.com/food-ai/backend/internal/adapters/kafka"
|
|
)
|
|
|
|
const defaultWorkerCount = 5
|
|
|
|
// WorkerPool processes dish recognition jobs from a single Kafka topic.
|
|
type WorkerPool struct {
|
|
jobRepo JobRepository
|
|
recognizer Recognizer
|
|
dishRepo DishRepository
|
|
consumer *kafka.Consumer
|
|
workerCount int
|
|
jobs chan string
|
|
}
|
|
|
|
// NewWorkerPool creates a WorkerPool with five workers consuming from a single consumer.
|
|
func NewWorkerPool(
|
|
jobRepo JobRepository,
|
|
recognizer Recognizer,
|
|
dishRepo DishRepository,
|
|
consumer *kafka.Consumer,
|
|
) *WorkerPool {
|
|
return &WorkerPool{
|
|
jobRepo: jobRepo,
|
|
recognizer: recognizer,
|
|
dishRepo: dishRepo,
|
|
consumer: consumer,
|
|
workerCount: defaultWorkerCount,
|
|
jobs: make(chan string, 100),
|
|
}
|
|
}
|
|
|
|
// Start launches the Kafka feeder goroutine and all worker goroutines.
|
|
func (pool *WorkerPool) Start(workerContext context.Context) {
|
|
go pool.consumer.Run(workerContext, pool.jobs)
|
|
for i := 0; i < pool.workerCount; i++ {
|
|
go pool.runWorker(workerContext)
|
|
}
|
|
}
|
|
|
|
func (pool *WorkerPool) runWorker(workerContext context.Context) {
|
|
for {
|
|
select {
|
|
case jobID := <-pool.jobs:
|
|
pool.processJob(workerContext, jobID)
|
|
case <-workerContext.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pool *WorkerPool) processJob(workerContext context.Context, jobID string) {
|
|
job, fetchError := pool.jobRepo.GetJobByID(workerContext, jobID)
|
|
if fetchError != nil {
|
|
slog.Error("worker: fetch job", "job_id", jobID, "err", fetchError)
|
|
return
|
|
}
|
|
|
|
// Transition to processing.
|
|
if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusProcessing, nil, nil); updateError != nil {
|
|
slog.Error("worker: set processing status", "job_id", jobID, "err", updateError)
|
|
}
|
|
if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil {
|
|
slog.Warn("worker: notify processing", "job_id", jobID, "err", notifyError)
|
|
}
|
|
|
|
// Run AI recognition.
|
|
result, recognizeError := pool.recognizer.RecognizeDish(workerContext, job.ImageBase64, job.MimeType, job.Lang)
|
|
if recognizeError != nil {
|
|
slog.Error("worker: recognize dish", "job_id", jobID, "err", recognizeError)
|
|
errMsg := "recognition failed, please try again"
|
|
_ = pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg)
|
|
_ = pool.jobRepo.NotifyJobUpdate(workerContext, jobID)
|
|
return
|
|
}
|
|
|
|
// Resolve dish_id and recipe_id for each candidate in parallel.
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
for index := range result.Candidates {
|
|
wg.Add(1)
|
|
go func(candidateIndex int) {
|
|
defer wg.Done()
|
|
candidate := result.Candidates[candidateIndex]
|
|
dishID, created, findError := pool.dishRepo.FindOrCreate(workerContext, candidate.DishName)
|
|
if findError != nil {
|
|
slog.Warn("worker: find or create dish", "name", candidate.DishName, "err", findError)
|
|
return
|
|
}
|
|
mu.Lock()
|
|
result.Candidates[candidateIndex].DishID = &dishID
|
|
mu.Unlock()
|
|
if created {
|
|
go enrichDishInBackground(pool.recognizer, pool.dishRepo, dishID, candidate.DishName)
|
|
}
|
|
|
|
recipeID, _, recipeError := pool.dishRepo.FindOrCreateRecipe(
|
|
workerContext, dishID,
|
|
candidate.Calories, candidate.ProteinG, candidate.FatG, candidate.CarbsG,
|
|
)
|
|
if recipeError != nil {
|
|
slog.Warn("worker: find or create recipe", "dish_id", dishID, "err", recipeError)
|
|
return
|
|
}
|
|
mu.Lock()
|
|
result.Candidates[candidateIndex].RecipeID = &recipeID
|
|
mu.Unlock()
|
|
}(index)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Transition to done.
|
|
if updateError := pool.jobRepo.UpdateJobStatus(workerContext, jobID, JobStatusDone, result, nil); updateError != nil {
|
|
slog.Error("worker: set done status", "job_id", jobID, "err", updateError)
|
|
}
|
|
if notifyError := pool.jobRepo.NotifyJobUpdate(workerContext, jobID); notifyError != nil {
|
|
slog.Warn("worker: notify done", "job_id", jobID, "err", notifyError)
|
|
}
|
|
}
|
|
|
|
// enrichDishInBackground translates a newly created dish name into all supported languages.
|
|
// Runs as a fire-and-forget goroutine so it never blocks recognition.
|
|
func enrichDishInBackground(recognizer Recognizer, dishRepo DishRepository, dishID, dishName string) {
|
|
enrichContext := context.Background()
|
|
translations, translateError := recognizer.TranslateDishName(enrichContext, dishName)
|
|
if translateError != nil {
|
|
slog.Warn("translate dish name", "name", dishName, "err", translateError)
|
|
return
|
|
}
|
|
for lang, translatedName := range translations {
|
|
if upsertError := dishRepo.UpsertTranslation(enrichContext, dishID, lang, translatedName); upsertError != nil {
|
|
slog.Warn("upsert dish translation", "dish_id", dishID, "lang", lang, "err", upsertError)
|
|
}
|
|
}
|
|
}
|