feat: split worker into separate binary (cmd/worker)

Kafka consumers and WorkerPool are moved out of the server process into
a dedicated worker binary. Server now handles HTTP + SSE only; worker
handles Kafka consumption and AI processing.

- cmd/worker/main.go + init.go: new binary with minimal config
  (DATABASE_URL, OPENAI_API_KEY, KAFKA_BROKERS)
- cmd/server: remove WorkerPool, paidConsumer, freeConsumer
- Dockerfile: builds both server and worker binaries
- docker-compose.yml: add worker service
- Makefile: add run-worker and docker-logs-worker targets
- README.md: document worker startup and env vars

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
dbastrikin
2026-03-18 20:09:33 +02:00
parent 0f533ccaeb
commit 48fd2baa8c
9 changed files with 186 additions and 28 deletions

View File

@@ -4,13 +4,15 @@ WORKDIR /build
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /app/server ./cmd/server
RUN CGO_ENABLED=0 go build -o /app/server ./cmd/server && \
CGO_ENABLED=0 go build -o /app/worker ./cmd/worker
# Run
FROM alpine:3.19
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /app/server .
COPY --from=builder /app/worker .
COPY migrations ./migrations
EXPOSE 8080
CMD ["./server"]

View File

@@ -1,4 +1,4 @@
.PHONY: run test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs
.PHONY: run run-worker test test-integration lint migrate-up migrate-down migrate-create migrate-status docker-up docker-down docker-logs docker-logs-worker
ifneq (,$(wildcard .env))
include .env
@@ -9,6 +9,9 @@ endif
run:
go run ./cmd/server
run-worker:
go run ./cmd/worker
# Tests
test:
go test ./... -v -race -count=1
@@ -43,3 +46,6 @@ docker-down:
docker-logs:
docker compose logs -f app
docker-logs-worker:
docker compose logs -f worker

View File

@@ -63,6 +63,43 @@ make migrate-up
make run
```
### 4. Запуск воркера
Сервер (`cmd/server`) и воркер (`cmd/worker`) — два отдельных процесса:
- **Сервер** — обслуживает HTTP API и SSE-стримы; публикует задания распознавания в Kafka.
- **Воркер** — потребляет задания из Kafka, вызывает AI, сохраняет результаты в БД и оповещает сервер через `pg_notify`.
**Локально** (инфраструктура в Docker, процессы локально):
```bash
# Поднять Kafka + PostgreSQL
docker compose up -d postgres kafka kafka-init
# В отдельном терминале — сервер
make run
# В другом терминале — воркер
go run ./cmd/worker
```
**Переменные окружения воркера:**
| Переменная | Описание |
|---|---|
| `DATABASE_URL` | DSN подключения к PostgreSQL |
| `OPENAI_API_KEY` | Ключ Gemini API (для распознавания блюд) |
| `KAFKA_BROKERS` | Адреса Kafka-брокеров (по умолчанию `kafka:9092`) |
**Через Docker Compose** (сервер + воркер + инфраструктура):
```bash
OPENAI_API_KEY=your-key docker compose up app worker
```
Воркеры можно масштабировать горизонтально — запустите несколько контейнеров `worker`,
они разделят нагрузку через consumer group `dish-recognition-workers`.
## Команды
### Сервер и тесты

View File

@@ -10,7 +10,6 @@ import (
// App bundles the HTTP handler with background services that need lifecycle management.
type App struct {
handler http.Handler
workerPool *recognition.WorkerPool
sseBroker *recognition.SSEBroker
}
@@ -19,9 +18,8 @@ func (application *App) ServeHTTP(responseWriter http.ResponseWriter, request *h
application.handler.ServeHTTP(responseWriter, request)
}
// Start launches the SSE broker's LISTEN loop and the worker pool goroutines.
// Start launches the SSE broker's LISTEN loop.
// Call this once before the HTTP server begins accepting connections.
func (application *App) Start(applicationContext context.Context) {
application.sseBroker.Start(applicationContext)
application.workerPool.Start(applicationContext)
}

View File

@@ -46,24 +46,15 @@ func initApp(appConfig *config.Config, pool *pgxpool.Pool) (*App, error) {
ingredientHandler := ingredient.NewHandler(ingredientRepository)
productHandler := product.NewHandler(productRepository)
// Kafka producer and consumers
// Kafka producer
kafkaProducer, kafkaProducerError := newKafkaProducer(appConfig)
if kafkaProducerError != nil {
return nil, kafkaProducerError
}
paidConsumer, paidConsumerError := newPaidKafkaConsumer(appConfig)
if paidConsumerError != nil {
return nil, paidConsumerError
}
freeConsumer, freeConsumerError := newFreeKafkaConsumer(appConfig)
if freeConsumerError != nil {
return nil, freeConsumerError
}
// Recognition pipeline
jobRepository := recognition.NewJobRepository(pool)
sseBroker := recognition.NewSSEBroker(pool, jobRepository)
workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, paidConsumer, freeConsumer)
recognitionHandler := recognition.NewHandler(openaiClient, ingredientRepository, jobRepository, kafkaProducer, sseBroker)
menuRepository := menu.NewRepository(pool)
@@ -102,7 +93,6 @@ func initApp(appConfig *config.Config, pool *pgxpool.Pool) (*App, error) {
)
return &App{
handler: httpHandler,
workerPool: workerPool,
sseBroker: sseBroker,
}, nil
}

View File

@@ -197,14 +197,6 @@ func newKafkaProducer(appConfig *config.Config) (*kafka.Producer, error) {
return kafka.NewProducer(appConfig.KafkaBrokers)
}
func newPaidKafkaConsumer(appConfig *config.Config) (*kafka.Consumer, error) {
return kafka.NewConsumer(appConfig.KafkaBrokers, "dish-recognition-workers", recognition.TopicPaid)
}
func newFreeKafkaConsumer(appConfig *config.Config) (*kafka.Consumer, error) {
return kafka.NewConsumer(appConfig.KafkaBrokers, "dish-recognition-workers", recognition.TopicFree)
}
// ---------------------------------------------------------------------------
// Interface assertions (compile-time checks)
// ---------------------------------------------------------------------------

View File

@@ -0,0 +1,58 @@
package main
import (
"context"
"github.com/food-ai/backend/internal/adapters/kafka"
"github.com/food-ai/backend/internal/adapters/openai"
"github.com/food-ai/backend/internal/domain/dish"
"github.com/food-ai/backend/internal/domain/recognition"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kelseyhightower/envconfig"
)
type workerConfig struct {
DatabaseURL string `envconfig:"DATABASE_URL" required:"true"`
OpenAIAPIKey string `envconfig:"OPENAI_API_KEY" required:"true"`
KafkaBrokers []string `envconfig:"KAFKA_BROKERS" default:"kafka:9092"`
}
func loadConfig() (*workerConfig, error) {
var workerCfg workerConfig
if configError := envconfig.Process("", &workerCfg); configError != nil {
return nil, configError
}
return &workerCfg, nil
}
// WorkerApp bundles background services that need lifecycle management.
type WorkerApp struct {
workerPool *recognition.WorkerPool
}
// Start launches the worker pool goroutines.
func (workerApp *WorkerApp) Start(applicationContext context.Context) {
workerApp.workerPool.Start(applicationContext)
}
func initWorker(workerCfg *workerConfig, pool *pgxpool.Pool) (*WorkerApp, error) {
openaiClient := openai.NewClient(workerCfg.OpenAIAPIKey)
dishRepository := dish.NewRepository(pool)
jobRepository := recognition.NewJobRepository(pool)
paidConsumer, paidConsumerError := kafka.NewConsumer(
workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicPaid,
)
if paidConsumerError != nil {
return nil, paidConsumerError
}
freeConsumer, freeConsumerError := kafka.NewConsumer(
workerCfg.KafkaBrokers, "dish-recognition-workers", recognition.TopicFree,
)
if freeConsumerError != nil {
return nil, freeConsumerError
}
workerPool := recognition.NewWorkerPool(jobRepository, openaiClient, dishRepository, paidConsumer, freeConsumer)
return &WorkerApp{workerPool: workerPool}, nil
}

View File

@@ -0,0 +1,58 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/food-ai/backend/internal/infra/database"
"github.com/food-ai/backend/internal/infra/locale"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)
if runError := run(); runError != nil {
slog.Error("fatal error", "err", runError)
os.Exit(1)
}
}
func run() error {
workerConfig, configError := loadConfig()
if configError != nil {
return fmt.Errorf("load config: %w", configError)
}
applicationContext, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, poolError := database.NewPool(applicationContext, workerConfig.DatabaseURL)
if poolError != nil {
return fmt.Errorf("connect to database: %w", poolError)
}
defer pool.Close()
slog.Info("connected to database")
if loadError := locale.LoadFromDB(applicationContext, pool); loadError != nil {
return fmt.Errorf("load languages: %w", loadError)
}
workerApp, initError := initWorker(workerConfig, pool)
if initError != nil {
return fmt.Errorf("init worker: %w", initError)
}
workerApp.Start(applicationContext)
slog.Info("worker started")
<-applicationContext.Done()
slog.Info("worker shutting down...")
return nil
}

View File

@@ -67,5 +67,22 @@ services:
volumes:
- ./firebase-credentials.json:/app/firebase-credentials.json:ro
worker:
build:
context: .
dockerfile: Dockerfile
command: ["./worker"]
environment:
DATABASE_URL: postgres://food_ai:food_ai_local@postgres:5432/food_ai?sslmode=disable
OPENAI_API_KEY: ${OPENAI_API_KEY}
KAFKA_BROKERS: kafka:9092
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy
kafka-init:
condition: service_completed_successfully
volumes:
pgdata: