package recognition import ( "context" "log/slog" "sync" "github.com/food-ai/backend/internal/adapters/ai" "github.com/food-ai/backend/internal/adapters/kafka" ) // ProductWorkerPool processes product/receipt recognition jobs from a single Kafka topic. type ProductWorkerPool struct { productJobRepo ProductJobRepository enricher *itemEnricher recognizer Recognizer consumer *kafka.Consumer workerCount int jobs chan string } // NewProductWorkerPool creates a ProductWorkerPool with five workers consuming from a single consumer. func NewProductWorkerPool( productJobRepo ProductJobRepository, recognizer Recognizer, productRepo ProductRepository, consumer *kafka.Consumer, ) *ProductWorkerPool { return &ProductWorkerPool{ productJobRepo: productJobRepo, enricher: newItemEnricher(recognizer, productRepo), recognizer: recognizer, consumer: consumer, workerCount: defaultWorkerCount, jobs: make(chan string, 100), } } // Start launches the Kafka feeder goroutine and all worker goroutines. func (pool *ProductWorkerPool) Start(workerContext context.Context) { go pool.consumer.Run(workerContext, pool.jobs) for i := 0; i < pool.workerCount; i++ { go pool.runWorker(workerContext) } } func (pool *ProductWorkerPool) runWorker(workerContext context.Context) { for { select { case jobID := <-pool.jobs: pool.processJob(workerContext, jobID) case <-workerContext.Done(): return } } } func (pool *ProductWorkerPool) processJob(workerContext context.Context, jobID string) { job, fetchError := pool.productJobRepo.GetProductJobByID(workerContext, jobID) if fetchError != nil { slog.Error("product worker: fetch job", "job_id", jobID, "err", fetchError) return } if updateError := pool.productJobRepo.UpdateProductJobStatus(workerContext, jobID, JobStatusProcessing, nil, nil); updateError != nil { slog.Error("product worker: set processing status", "job_id", jobID, "err", updateError) } if notifyError := pool.productJobRepo.NotifyProductJobUpdate(workerContext, jobID); notifyError != nil { slog.Warn("product worker: notify processing", "job_id", jobID, "err", notifyError) } var recognizedItems []ai.RecognizedItem var unrecognized []ai.UnrecognizedItem var recognizeError error switch job.JobType { case "receipt": if len(job.Images) == 0 { errMsg := "no images in job" _ = pool.productJobRepo.UpdateProductJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg) _ = pool.productJobRepo.NotifyProductJobUpdate(workerContext, jobID) return } imagePayload := job.Images[0] var receiptResult *ai.ReceiptResult receiptResult, recognizeError = pool.recognizer.RecognizeReceipt(workerContext, imagePayload.ImageBase64, imagePayload.MimeType, job.Lang) if recognizeError == nil && receiptResult != nil { recognizedItems = receiptResult.Items unrecognized = receiptResult.Unrecognized } case "products": allItems := make([][]ai.RecognizedItem, len(job.Images)) var wg sync.WaitGroup for index, imagePayload := range job.Images { wg.Add(1) go func(workerIndex int, payload ProductImagePayload) { defer wg.Done() items, itemsError := pool.recognizer.RecognizeProducts(workerContext, payload.ImageBase64, payload.MimeType, job.Lang) if itemsError != nil { slog.WarnContext(workerContext, "product worker: recognize products from image", "index", workerIndex, "err", itemsError) return } allItems[workerIndex] = items }(index, imagePayload) } wg.Wait() recognizedItems = MergeAndDeduplicate(allItems) default: errMsg := "unknown job type: " + job.JobType slog.Error("product worker: unknown job type", "job_id", jobID, "job_type", job.JobType) _ = pool.productJobRepo.UpdateProductJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg) _ = pool.productJobRepo.NotifyProductJobUpdate(workerContext, jobID) return } if recognizeError != nil { slog.Error("product worker: recognize", "job_id", jobID, "err", recognizeError) errMsg := "recognition failed, please try again" _ = pool.productJobRepo.UpdateProductJobStatus(workerContext, jobID, JobStatusFailed, nil, &errMsg) _ = pool.productJobRepo.NotifyProductJobUpdate(workerContext, jobID) return } enriched := pool.enricher.enrich(workerContext, recognizedItems) resultItems := make([]ProductJobResultItem, len(enriched)) for index, item := range enriched { resultItems[index] = ProductJobResultItem{ Name: item.Name, Quantity: item.Quantity, Unit: item.Unit, Category: item.Category, Confidence: item.Confidence, QuantityConfidence: item.QuantityConfidence, MappingID: item.MappingID, StorageDays: item.StorageDays, } } jobResult := &ProductJobResult{ JobType: job.JobType, Items: resultItems, Unrecognized: unrecognized, } if updateError := pool.productJobRepo.UpdateProductJobStatus(workerContext, jobID, JobStatusDone, jobResult, nil); updateError != nil { slog.Error("product worker: set done status", "job_id", jobID, "err", updateError) } if notifyError := pool.productJobRepo.NotifyProductJobUpdate(workerContext, jobID); notifyError != nil { slog.Warn("product worker: notify done", "job_id", jobID, "err", notifyError) } }