Files
food-ai/backend/cmd/importoff/main.go
dbastrikin 78f1c8bf76 feat: food search sheet with FTS+trgm, dish/recent endpoints, multilingual aliases
Backend:
- GET /dishes/search — hybrid FTS (english + simple) + trgm + ILIKE search
- GET /diary/recent — recently used dishes and products for the current user
- product search upgraded: FTS on canonical_name and product_aliases, ranked by GREATEST(ts_rank, similarity)
- importoff: collect product_name_ru/de/fr/... as product_aliases for multilingual search (e.g. "сникерс" → "Snickers")
- migrations: FTS + trgm indexes merged into 001_initial_schema.sql (002 removed)

Flutter:
- FoodSearchSheet: debounced search field, recently-used section, product/dish results, scan-photo and barcode chips
- DishPortionSheet: quick ½/1/1½/2 buttons + custom input
- + button in meal card now opens FoodSearchSheet instead of going directly to AI scan
- 7 new l10n keys across all 12 languages

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 15:28:29 +02:00

434 lines
13 KiB
Go

package main
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"strings"
"syscall"
"github.com/jackc/pgx/v5"
)
// offRecord is the JSON shape of one line in the OpenFoodFacts JSONL dump.
type offRecord struct {
Code string `json:"code"`
ProductName string `json:"product_name"`
ProductNameEN string `json:"product_name_en"`
ProductNameRu string `json:"product_name_ru"`
ProductNameDe string `json:"product_name_de"`
ProductNameFr string `json:"product_name_fr"`
ProductNameEs string `json:"product_name_es"`
ProductNameIt string `json:"product_name_it"`
ProductNamePt string `json:"product_name_pt"`
ProductNameZh string `json:"product_name_zh"`
ProductNameJa string `json:"product_name_ja"`
ProductNameKo string `json:"product_name_ko"`
ProductNameAr string `json:"product_name_ar"`
ProductNameHi string `json:"product_name_hi"`
CategoriesTags []string `json:"categories_tags"`
Nutriments offNutriments `json:"nutriments"`
UniqueScansN int `json:"unique_scans_n"`
}
type offNutriments struct {
EnergyKcal100g *float64 `json:"energy-kcal_100g"`
Proteins100g *float64 `json:"proteins_100g"`
Fat100g *float64 `json:"fat_100g"`
Carbohydrates100g *float64 `json:"carbohydrates_100g"`
Fiber100g *float64 `json:"fiber_100g"`
}
// aliasRow holds one multilingual alias for a product.
type aliasRow struct {
lang string
alias string
}
type productImportRow struct {
canonicalName string
barcode string
category *string
calories float64
protein *float64
fat *float64
carbs *float64
fiber *float64
aliases []aliasRow
}
// categoryPrefixes maps OpenFoodFacts category tag prefixes to our product_categories slugs.
// Entries are checked in order; the first match wins.
var categoryPrefixes = []struct {
prefix string
slug string
}{
{"en:dairies", "dairy"},
{"en:dairy-products", "dairy"},
{"en:meats", "meat"},
{"en:poultry", "meat"},
{"en:fish-and-seafood", "meat"},
{"en:fruits", "produce"},
{"en:vegetables", "produce"},
{"en:plant-based-foods", "produce"},
{"en:breads", "bakery"},
{"en:pastries", "bakery"},
{"en:baked-goods", "bakery"},
{"en:frozen-foods", "frozen"},
{"en:beverages", "beverages"},
{"en:drinks", "beverages"},
{"en:sodas", "beverages"},
{"en:waters", "beverages"},
}
func resolveCategory(categoriesTags []string) *string {
for _, tag := range categoriesTags {
for _, mapping := range categoryPrefixes {
if strings.HasPrefix(tag, mapping.prefix) {
slug := mapping.slug
return &slug
}
}
}
other := "other"
return &other
}
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)
if runError := run(); runError != nil {
slog.Error("import failed", "error", runError)
os.Exit(1)
}
}
func run() error {
filePathFlag := flag.String("file", "", "path to OFF JSONL or JSONL.GZ file (required)")
dsnFlag := flag.String("dsn", os.Getenv("DATABASE_URL"), "postgres connection DSN")
limitFlag := flag.Int("limit", 0, "stop after N accepted products (0 = no limit)")
batchSizeFlag := flag.Int("batch", 500, "products per upsert batch")
minScansFlag := flag.Int("min-scans", 1, "minimum unique_scans_n to include a product (0 = no filter)")
flag.Parse()
if *filePathFlag == "" {
return fmt.Errorf("-file is required")
}
if *dsnFlag == "" {
return fmt.Errorf("-dsn or DATABASE_URL is required")
}
importContext, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pgxConn, connectError := pgx.Connect(importContext, *dsnFlag)
if connectError != nil {
return fmt.Errorf("connect to database: %w", connectError)
}
defer pgxConn.Close(importContext)
slog.Info("connected to database")
// Create a temporary staging table for the COPY step.
// It has no unique constraints so COPY never fails on duplicate data.
if _, createError := pgxConn.Exec(importContext, `
CREATE TEMP TABLE IF NOT EXISTS products_import (
canonical_name TEXT,
barcode TEXT,
category TEXT,
calories_per_100g DOUBLE PRECISION,
protein_per_100g DOUBLE PRECISION,
fat_per_100g DOUBLE PRECISION,
carbs_per_100g DOUBLE PRECISION,
fiber_per_100g DOUBLE PRECISION
)`); createError != nil {
return fmt.Errorf("create staging table: %w", createError)
}
dataFile, openError := os.Open(*filePathFlag)
if openError != nil {
return fmt.Errorf("open file: %w", openError)
}
defer dataFile.Close()
var lineScanner *bufio.Scanner
if strings.HasSuffix(*filePathFlag, ".gz") {
gzipReader, gzipError := gzip.NewReader(dataFile)
if gzipError != nil {
return fmt.Errorf("open gzip reader: %w", gzipError)
}
defer gzipReader.Close()
lineScanner = bufio.NewScanner(gzipReader)
} else {
lineScanner = bufio.NewScanner(dataFile)
}
// OFF lines can exceed the default 64 KB scanner buffer.
lineScanner.Buffer(make([]byte, 16*1024*1024), 16*1024*1024)
var (
productBatch []productImportRow
seenInBatch = make(map[string]bool, *batchSizeFlag)
totalAccepted int
totalInserted int64
totalSkipped int
)
flushCurrent := func() error {
inserted, flushError := flushBatch(importContext, pgxConn, productBatch)
if flushError != nil {
return flushError
}
totalInserted += inserted
productBatch = productBatch[:0]
for key := range seenInBatch {
delete(seenInBatch, key)
}
slog.Info("progress",
"accepted", totalAccepted,
"inserted", totalInserted,
"skipped", totalSkipped,
)
return nil
}
for lineScanner.Scan() {
if importContext.Err() != nil {
break
}
var record offRecord
if decodeError := json.Unmarshal(lineScanner.Bytes(), &record); decodeError != nil {
totalSkipped++
continue
}
// Resolve canonical name: prefer English, fall back to any language.
canonicalName := record.ProductNameEN
if canonicalName == "" {
canonicalName = record.ProductName
}
// Apply filter rules.
if record.Code == "" || canonicalName == "" {
totalSkipped++
continue
}
if record.Nutriments.EnergyKcal100g == nil || *record.Nutriments.EnergyKcal100g <= 0 {
totalSkipped++
continue
}
if *minScansFlag > 0 && record.UniqueScansN < *minScansFlag {
totalSkipped++
continue
}
// Deduplicate by canonical_name within the current batch.
if seenInBatch[canonicalName] {
totalSkipped++
continue
}
seenInBatch[canonicalName] = true
// Collect non-English localised names as aliases so that searches in
// other languages (e.g. "сникерс" → "Snickers") can find the product.
langNames := map[string]string{
"ru": strings.TrimSpace(record.ProductNameRu),
"de": strings.TrimSpace(record.ProductNameDe),
"fr": strings.TrimSpace(record.ProductNameFr),
"es": strings.TrimSpace(record.ProductNameEs),
"it": strings.TrimSpace(record.ProductNameIt),
"pt": strings.TrimSpace(record.ProductNamePt),
"zh": strings.TrimSpace(record.ProductNameZh),
"ja": strings.TrimSpace(record.ProductNameJa),
"ko": strings.TrimSpace(record.ProductNameKo),
"ar": strings.TrimSpace(record.ProductNameAr),
"hi": strings.TrimSpace(record.ProductNameHi),
}
var productAliases []aliasRow
for lang, name := range langNames {
if name != "" && name != canonicalName {
productAliases = append(productAliases, aliasRow{lang: lang, alias: name})
}
}
totalAccepted++
productBatch = append(productBatch, productImportRow{
canonicalName: canonicalName,
barcode: record.Code,
category: resolveCategory(record.CategoriesTags),
calories: *record.Nutriments.EnergyKcal100g,
protein: record.Nutriments.Proteins100g,
fat: record.Nutriments.Fat100g,
carbs: record.Nutriments.Carbohydrates100g,
fiber: record.Nutriments.Fiber100g,
aliases: productAliases,
})
if len(productBatch) >= *batchSizeFlag {
if flushError := flushCurrent(); flushError != nil {
return flushError
}
}
if *limitFlag > 0 && totalAccepted >= *limitFlag {
break
}
}
if scanError := lineScanner.Err(); scanError != nil {
return fmt.Errorf("read file: %w", scanError)
}
// Flush any remaining rows.
if len(productBatch) > 0 {
if flushError := flushCurrent(); flushError != nil {
return flushError
}
}
slog.Info("import complete",
"accepted", totalAccepted,
"inserted", totalInserted,
"skipped", totalSkipped,
)
return nil
}
// flushBatch upserts productBatch into the catalog.
// It uses a staging table + COPY for fast bulk loading, then INSERT … SELECT with
// ON CONFLICT to safely merge into products.
func flushBatch(
requestContext context.Context,
pgxConn *pgx.Conn,
productBatch []productImportRow,
) (int64, error) {
if len(productBatch) == 0 {
return 0, nil
}
// Truncate staging table so it is empty before each batch.
if _, truncateError := pgxConn.Exec(requestContext, `TRUNCATE products_import`); truncateError != nil {
return 0, fmt.Errorf("truncate staging table: %w", truncateError)
}
// COPY product rows into the staging table.
// No unique constraints here, so the COPY never errors on duplicate data.
_, copyError := pgxConn.CopyFrom(
requestContext,
pgx.Identifier{"products_import"},
[]string{
"canonical_name", "barcode", "category",
"calories_per_100g", "protein_per_100g", "fat_per_100g",
"carbs_per_100g", "fiber_per_100g",
},
pgx.CopyFromSlice(len(productBatch), func(rowIndex int) ([]any, error) {
row := productBatch[rowIndex]
return []any{
row.canonicalName, row.barcode, row.category,
row.calories, row.protein, row.fat,
row.carbs, row.fiber,
}, nil
}),
)
if copyError != nil {
return 0, fmt.Errorf("copy products to staging table: %w", copyError)
}
// Nullify any barcode in the staging table that is already assigned to a
// different product in the catalog. This prevents unique-constraint violations
// on the barcode column during the INSERT below.
if _, updateError := pgxConn.Exec(requestContext, `
UPDATE products_import pi
SET barcode = NULL
WHERE pi.barcode IS NOT NULL
AND EXISTS (
SELECT 1 FROM products p
WHERE p.barcode = pi.barcode
)`); updateError != nil {
return 0, fmt.Errorf("nullify conflicting barcodes: %w", updateError)
}
// Insert from staging into the catalog.
// ON CONFLICT (canonical_name): update nutritional columns but leave barcode
// unchanged — we do not want to reassign a barcode that already belongs to this
// canonical entry, nor steal one from another entry.
upsertRows, upsertError := pgxConn.Query(requestContext, `
INSERT INTO products (
canonical_name, barcode, category,
calories_per_100g, protein_per_100g, fat_per_100g,
carbs_per_100g, fiber_per_100g
)
SELECT
canonical_name, barcode, category,
calories_per_100g, protein_per_100g, fat_per_100g,
carbs_per_100g, fiber_per_100g
FROM products_import
ON CONFLICT (canonical_name) DO UPDATE SET
category = COALESCE(EXCLUDED.category, products.category),
calories_per_100g = COALESCE(EXCLUDED.calories_per_100g, products.calories_per_100g),
protein_per_100g = COALESCE(EXCLUDED.protein_per_100g, products.protein_per_100g),
fat_per_100g = COALESCE(EXCLUDED.fat_per_100g, products.fat_per_100g),
carbs_per_100g = COALESCE(EXCLUDED.carbs_per_100g, products.carbs_per_100g),
fiber_per_100g = COALESCE(EXCLUDED.fiber_per_100g, products.fiber_per_100g),
updated_at = now()
RETURNING id, canonical_name`)
if upsertError != nil {
return 0, fmt.Errorf("upsert products: %w", upsertError)
}
// Build a canonical_name → product_id map so we can insert aliases below.
productIDByName := make(map[string]string, len(productBatch))
var insertedCount int64
for upsertRows.Next() {
var productID, canonicalName string
if scanError := upsertRows.Scan(&productID, &canonicalName); scanError != nil {
upsertRows.Close()
return 0, fmt.Errorf("scan upserted product id: %w", scanError)
}
productIDByName[canonicalName] = productID
insertedCount++
}
upsertRows.Close()
if rowsError := upsertRows.Err(); rowsError != nil {
return 0, fmt.Errorf("iterate upsert results: %w", rowsError)
}
// Insert multilingual aliases collected during parsing.
// ON CONFLICT DO NOTHING so re-imports are safe.
aliasBatch := &pgx.Batch{}
for _, row := range productBatch {
productID, exists := productIDByName[row.canonicalName]
if !exists || len(row.aliases) == 0 {
continue
}
for _, aliasEntry := range row.aliases {
aliasBatch.Queue(
`INSERT INTO product_aliases (product_id, lang, alias)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING`,
productID, aliasEntry.lang, aliasEntry.alias,
)
}
}
if aliasBatch.Len() > 0 {
batchResults := pgxConn.SendBatch(requestContext, aliasBatch)
for range aliasBatch.Len() {
if _, execError := batchResults.Exec(); execError != nil {
batchResults.Close()
return 0, fmt.Errorf("insert product alias: %w", execError)
}
}
if closeError := batchResults.Close(); closeError != nil {
return 0, fmt.Errorf("close alias batch: %w", closeError)
}
}
return insertedCount, nil
}