Backend:
- Migration 002: product_recognition_jobs table with JSONB images column
and job_type CHECK ('receipt' | 'products')
- New Kafka topics: ai.products.paid / ai.products.free
- ProductJob model, ProductJobRepository (mirrors dish job pattern)
- itemEnricher extracted from Handler — shared by HTTP handler and worker
- ProductSSEBroker: PG LISTEN on product_job_update channel
- ProductWorkerPool: 5 workers, branches on job_type to call
RecognizeReceipt or RecognizeProducts per image in parallel
- Handler: RecognizeReceipt and RecognizeProducts now return 202 Accepted
instead of blocking; 4 new endpoints: GET /ai/product-jobs,
/product-jobs/history, /product-jobs/{id}, /product-jobs/{id}/stream
- cmd/worker: extended to run ProductWorkerPool alongside dish WorkerPool
- cmd/server: wires productJobRepository + productSSEBroker; both SSE
brokers started in App.Start()
Flutter client:
- ProductJobCreated, ProductJobResult, ProductJobSummary, ProductJobEvent
models + submitReceiptRecognition/submitProductsRecognition/stream methods
- Shared _openSseStream helper eliminates duplicate SSE parsing loop
- ScanScreen: replace blocking AI calls with async submit + navigate to
ProductJobWatchScreen
- ProductJobWatchScreen: watches SSE stream, navigates to /scan/confirm
when done, shows error on failure
- ProductsScreen: prepends _RecentScansSection (hidden when empty); compact
horizontal list of recent scans with "See all" → history
- ProductJobHistoryScreen: full list of all product recognition jobs
- New routes: /scan/product-job-watch, /products/job-history
- L10n: 7 new keys in all 12 ARB files
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
86 lines
2.8 KiB
Go
86 lines
2.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/food-ai/backend/internal/adapters/kafka"
|
|
"github.com/food-ai/backend/internal/adapters/openai"
|
|
"github.com/food-ai/backend/internal/domain/dish"
|
|
"github.com/food-ai/backend/internal/domain/product"
|
|
"github.com/food-ai/backend/internal/domain/recognition"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/kelseyhightower/envconfig"
|
|
)
|
|
|
|
type workerConfig struct {
|
|
DatabaseURL string `envconfig:"DATABASE_URL" required:"true"`
|
|
OpenAIAPIKey string `envconfig:"OPENAI_API_KEY" required:"true"`
|
|
KafkaBrokers []string `envconfig:"KAFKA_BROKERS" default:"kafka:9092"`
|
|
WorkerPlan string `envconfig:"WORKER_PLAN" default:"free"` // "paid" | "free"
|
|
}
|
|
|
|
func loadConfig() (*workerConfig, error) {
|
|
var workerCfg workerConfig
|
|
if configError := envconfig.Process("", &workerCfg); configError != nil {
|
|
return nil, configError
|
|
}
|
|
return &workerCfg, nil
|
|
}
|
|
|
|
// WorkerApp bundles background services that need lifecycle management.
|
|
type WorkerApp struct {
|
|
workerPool *recognition.WorkerPool
|
|
productWorkerPool *recognition.ProductWorkerPool
|
|
}
|
|
|
|
// Start launches the dish and product worker pool goroutines.
|
|
func (workerApp *WorkerApp) Start(applicationContext context.Context) {
|
|
workerApp.workerPool.Start(applicationContext)
|
|
workerApp.productWorkerPool.Start(applicationContext)
|
|
}
|
|
|
|
func initWorker(workerCfg *workerConfig, pool *pgxpool.Pool) (*WorkerApp, error) {
|
|
openaiClient := openai.NewClient(workerCfg.OpenAIAPIKey)
|
|
|
|
// Dish recognition worker.
|
|
dishRepository := dish.NewRepository(pool)
|
|
jobRepository := recognition.NewJobRepository(pool)
|
|
|
|
dishTopic := recognition.TopicFree
|
|
dishGroupID := "dish-recognition-free"
|
|
if workerCfg.WorkerPlan == "paid" {
|
|
dishTopic = recognition.TopicPaid
|
|
dishGroupID = "dish-recognition-paid"
|
|
}
|
|
|
|
dishConsumer, dishConsumerError := kafka.NewConsumer(workerCfg.KafkaBrokers, dishGroupID, dishTopic)
|
|
if dishConsumerError != nil {
|
|
return nil, dishConsumerError
|
|
}
|
|
|
|
workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, dishConsumer)
|
|
|
|
// Product recognition worker.
|
|
productRepository := product.NewRepository(pool)
|
|
productJobRepository := recognition.NewProductJobRepository(pool)
|
|
|
|
productTopic := recognition.ProductTopicFree
|
|
productGroupID := "product-recognition-free"
|
|
if workerCfg.WorkerPlan == "paid" {
|
|
productTopic = recognition.ProductTopicPaid
|
|
productGroupID = "product-recognition-paid"
|
|
}
|
|
|
|
productConsumer, productConsumerError := kafka.NewConsumer(workerCfg.KafkaBrokers, productGroupID, productTopic)
|
|
if productConsumerError != nil {
|
|
return nil, productConsumerError
|
|
}
|
|
|
|
productWorkerPool := recognition.NewProductWorkerPool(productJobRepository, openaiClient, productRepository, productConsumer)
|
|
|
|
return &WorkerApp{
|
|
workerPool: workerPool,
|
|
productWorkerPool: productWorkerPool,
|
|
}, nil
|
|
}
|