diff --git a/backend/Dockerfile b/backend/Dockerfile index 4524908..bd69c50 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -4,13 +4,15 @@ WORKDIR /build COPY go.mod go.sum ./ RUN go mod download COPY . . -RUN CGO_ENABLED=0 go build -o /app/server ./cmd/server +RUN CGO_ENABLED=0 go build -o /app/server ./cmd/server && \ + CGO_ENABLED=0 go build -o /app/worker ./cmd/worker # Run FROM alpine:3.19 RUN apk add --no-cache ca-certificates WORKDIR /app COPY --from=builder /app/server . +COPY --from=builder /app/worker . COPY migrations ./migrations EXPOSE 8080 CMD ["./server"] diff --git a/backend/Makefile b/backend/Makefile index 30ad910..f8471ef 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -1,4 +1,4 @@ -.PHONY: run test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs +.PHONY: run run-worker test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs docker-logs-worker ifneq (,$(wildcard .env)) include .env @@ -9,6 +9,9 @@ endif run: go run ./cmd/server +run-worker: + go run ./cmd/worker + # Tests test: go test ./... -v -race -count=1 @@ -43,3 +46,6 @@ docker-down: docker-logs: docker compose logs -f app + +docker-logs-worker: + docker compose logs -f worker diff --git a/backend/README.md b/backend/README.md index d8b9e13..79c8f75 100644 --- a/backend/README.md +++ b/backend/README.md @@ -63,6 +63,43 @@ make migrate-up make run ``` +### 4. Запуск воркера + +Сервер (`cmd/server`) и воркер (`cmd/worker`) — два отдельных процесса: + +- **Сервер** — обслуживает HTTP API и SSE-стримы; публикует задания распознавания в Kafka. +- **Воркер** — потребляет задания из Kafka, вызывает AI, сохраняет результаты в БД и оповещает сервер через `pg_notify`. + +**Локально** (инфраструктура в Docker, процессы локально): + +```bash +# Поднять Kafka + PostgreSQL +docker compose up -d postgres kafka kafka-init + +# В отдельном терминале — сервер +make run + +# В другом терминале — воркер +go run ./cmd/worker +``` + +**Переменные окружения воркера:** + +| Переменная | Описание | +|---|---| +| `DATABASE_URL` | DSN подключения к PostgreSQL | +| `OPENAI_API_KEY` | Ключ Gemini API (для распознавания блюд) | +| `KAFKA_BROKERS` | Адреса Kafka-брокеров (по умолчанию `kafka:9092`) | + +**Через Docker Compose** (сервер + воркер + инфраструктура): + +```bash +OPENAI_API_KEY=your-key docker compose up app worker +``` + +Воркеры можно масштабировать горизонтально — запустите несколько контейнеров `worker`, +они разделят нагрузку через consumer group `dish-recognition-workers`. + ## Команды ### Сервер и тесты diff --git a/backend/cmd/server/app.go b/backend/cmd/server/app.go index 20f5a7e..df6918f 100644 --- a/backend/cmd/server/app.go +++ b/backend/cmd/server/app.go @@ -9,9 +9,8 @@ import ( // App bundles the HTTP handler with background services that need lifecycle management. type App struct { - handler http.Handler - workerPool *recognition.WorkerPool - sseBroker *recognition.SSEBroker + handler http.Handler + sseBroker *recognition.SSEBroker } // ServeHTTP implements http.Handler. @@ -19,9 +18,8 @@ func (application *App) ServeHTTP(responseWriter http.ResponseWriter, request *h application.handler.ServeHTTP(responseWriter, request) } -// Start launches the SSE broker's LISTEN loop and the worker pool goroutines. +// Start launches the SSE broker's LISTEN loop. // Call this once before the HTTP server begins accepting connections. func (application *App) Start(applicationContext context.Context) { application.sseBroker.Start(applicationContext) - application.workerPool.Start(applicationContext) } diff --git a/backend/cmd/server/init.go b/backend/cmd/server/init.go index fe01c2b..098f997 100644 --- a/backend/cmd/server/init.go +++ b/backend/cmd/server/init.go @@ -46,24 +46,15 @@ func initApp(appConfig *config.Config, pool *pgxpool.Pool) (*App, error) { ingredientHandler := ingredient.NewHandler(ingredientRepository) productHandler := product.NewHandler(productRepository) - // Kafka producer and consumers + // Kafka producer kafkaProducer, kafkaProducerError := newKafkaProducer(appConfig) if kafkaProducerError != nil { return nil, kafkaProducerError } - paidConsumer, paidConsumerError := newPaidKafkaConsumer(appConfig) - if paidConsumerError != nil { - return nil, paidConsumerError - } - freeConsumer, freeConsumerError := newFreeKafkaConsumer(appConfig) - if freeConsumerError != nil { - return nil, freeConsumerError - } // Recognition pipeline jobRepository := recognition.NewJobRepository(pool) sseBroker := recognition.NewSSEBroker(pool, jobRepository) - workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, paidConsumer, freeConsumer) recognitionHandler := recognition.NewHandler(openaiClient, ingredientRepository, jobRepository, kafkaProducer, sseBroker) menuRepository := menu.NewRepository(pool) @@ -101,8 +92,7 @@ func initApp(appConfig *config.Config, pool *pgxpool.Pool) (*App, error) { mainTagListHandler, ) return &App{ - handler: httpHandler, - workerPool: workerPool, - sseBroker: sseBroker, + handler: httpHandler, + sseBroker: sseBroker, }, nil } diff --git a/backend/cmd/server/providers.go b/backend/cmd/server/providers.go index 7ed8434..bb95e8d 100644 --- a/backend/cmd/server/providers.go +++ b/backend/cmd/server/providers.go @@ -197,14 +197,6 @@ func newKafkaProducer(appConfig *config.Config) (*kafka.Producer, error) { return kafka.NewProducer(appConfig.KafkaBrokers) } -func newPaidKafkaConsumer(appConfig *config.Config) (*kafka.Consumer, error) { - return kafka.NewConsumer(appConfig.KafkaBrokers, "dish-recognition-workers", recognition.TopicPaid) -} - -func newFreeKafkaConsumer(appConfig *config.Config) (*kafka.Consumer, error) { - return kafka.NewConsumer(appConfig.KafkaBrokers, "dish-recognition-workers", recognition.TopicFree) -} - // --------------------------------------------------------------------------- // Interface assertions (compile-time checks) // --------------------------------------------------------------------------- diff --git a/backend/cmd/worker/init.go b/backend/cmd/worker/init.go new file mode 100644 index 0000000..075f080 --- /dev/null +++ b/backend/cmd/worker/init.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + + "github.com/food-ai/backend/internal/adapters/kafka" + "github.com/food-ai/backend/internal/adapters/openai" + "github.com/food-ai/backend/internal/domain/dish" + "github.com/food-ai/backend/internal/domain/recognition" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/kelseyhightower/envconfig" +) + +type workerConfig struct { + DatabaseURL string `envconfig:"DATABASE_URL" required:"true"` + OpenAIAPIKey string `envconfig:"OPENAI_API_KEY" required:"true"` + KafkaBrokers []string `envconfig:"KAFKA_BROKERS" default:"kafka:9092"` +} + +func loadConfig() (*workerConfig, error) { + var workerCfg workerConfig + if configError := envconfig.Process("", &workerCfg); configError != nil { + return nil, configError + } + return &workerCfg, nil +} + +// WorkerApp bundles background services that need lifecycle management. +type WorkerApp struct { + workerPool *recognition.WorkerPool +} + +// Start launches the worker pool goroutines. +func (workerApp *WorkerApp) Start(applicationContext context.Context) { + workerApp.workerPool.Start(applicationContext) +} + +func initWorker(workerCfg *workerConfig, pool *pgxpool.Pool) (*WorkerApp, error) { + openaiClient := openai.NewClient(workerCfg.OpenAIAPIKey) + dishRepository := dish.NewRepository(pool) + jobRepository := recognition.NewJobRepository(pool) + + paidConsumer, paidConsumerError := kafka.NewConsumer( + workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicPaid, + ) + if paidConsumerError != nil { + return nil, paidConsumerError + } + freeConsumer, freeConsumerError := kafka.NewConsumer( + workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicFree, + ) + if freeConsumerError != nil { + return nil, freeConsumerError + } + + workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, paidConsumer, freeConsumer) + return &WorkerApp{workerPool: workerPool}, nil +} diff --git a/backend/cmd/worker/main.go b/backend/cmd/worker/main.go new file mode 100644 index 0000000..dda6bc2 --- /dev/null +++ b/backend/cmd/worker/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/food-ai/backend/internal/infra/database" + "github.com/food-ai/backend/internal/infra/locale" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + slog.SetDefault(logger) + + if runError := run(); runError != nil { + slog.Error("fatal error", "err", runError) + os.Exit(1) + } +} + +func run() error { + workerConfig, configError := loadConfig() + if configError != nil { + return fmt.Errorf("load config: %w", configError) + } + + applicationContext, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + pool, poolError := database.NewPool(applicationContext, workerConfig.DatabaseURL) + if poolError != nil { + return fmt.Errorf("connect to database: %w", poolError) + } + defer pool.Close() + slog.Info("connected to database") + + if loadError := locale.LoadFromDB(applicationContext, pool); loadError != nil { + return fmt.Errorf("load languages: %w", loadError) + } + + workerApp, initError := initWorker(workerConfig, pool) + if initError != nil { + return fmt.Errorf("init worker: %w", initError) + } + + workerApp.Start(applicationContext) + slog.Info("worker started") + + <-applicationContext.Done() + slog.Info("worker shutting down...") + return nil +} diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index bb1f805..e925828 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -67,5 +67,22 @@ services: volumes: - ./firebase-credentials.json:/app/firebase-credentials.json:ro + worker: + build: + context: . + dockerfile: Dockerfile + command: ["./worker"] + environment: + DATABASE_URL: postgres://food_ai:food_ai_local@postgres:5432/food_ai?sslmode=disable + OPENAI_API_KEY: ${OPENAI_API_KEY} + KAFKA_BROKERS: kafka:9092 + depends_on: + postgres: + condition: service_healthy + kafka: + condition: service_healthy + kafka-init: + condition: service_completed_successfully + volumes: pgdata: