feat: rename ingredients→products, products→user_products; add barcode/OFF import
- Rename catalog: ingredient/* → product/* (canonical_name, barcode, nutrition per 100g)
- Rename pantry: product/* → userproduct/* (user-owned items with expiry)
- Squash migrations into single 001_initial_schema.sql (clean-db baseline)
- product_categories: add English canonical name column; fix COALESCE in queries
- Remove product_translations: product names are stored in their original language
- Add default_unit_name to product API responses via unit_translations JOIN
- Add cmd/importoff: bulk import from OpenFoodFacts JSONL dump (COPY + ON CONFLICT)
- Diary: support product_id entries alongside dish_id (CHECK num_nonnulls = 1)
- Home: getLoggedCalories joins both recipes and catalog products
- Flutter: rename models/providers/services to match backend rename
- Flutter: add barcode scan flow for diary (mobile_scanner, product_portion_sheet)
- Flutter: localise 6 new keys across 12 languages (barcode scan, portion weight)
- Routes: GET /products/search, GET /products/barcode/{barcode}, /user-products
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
359
backend/cmd/importoff/main.go
Normal file
359
backend/cmd/importoff/main.go
Normal file
@@ -0,0 +1,359 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
// offRecord is the JSON shape of one line in the OpenFoodFacts JSONL dump.
|
||||
type offRecord struct {
|
||||
Code string `json:"code"`
|
||||
ProductName string `json:"product_name"`
|
||||
ProductNameEN string `json:"product_name_en"`
|
||||
CategoriesTags []string `json:"categories_tags"`
|
||||
Nutriments offNutriments `json:"nutriments"`
|
||||
UniqueScansN int `json:"unique_scans_n"`
|
||||
}
|
||||
|
||||
type offNutriments struct {
|
||||
EnergyKcal100g *float64 `json:"energy-kcal_100g"`
|
||||
Proteins100g *float64 `json:"proteins_100g"`
|
||||
Fat100g *float64 `json:"fat_100g"`
|
||||
Carbohydrates100g *float64 `json:"carbohydrates_100g"`
|
||||
Fiber100g *float64 `json:"fiber_100g"`
|
||||
}
|
||||
|
||||
type productImportRow struct {
|
||||
canonicalName string
|
||||
barcode string
|
||||
category *string
|
||||
calories float64
|
||||
protein *float64
|
||||
fat *float64
|
||||
carbs *float64
|
||||
fiber *float64
|
||||
}
|
||||
|
||||
// categoryPrefixes maps OpenFoodFacts category tag prefixes to our product_categories slugs.
|
||||
// Entries are checked in order; the first match wins.
|
||||
var categoryPrefixes = []struct {
|
||||
prefix string
|
||||
slug string
|
||||
}{
|
||||
{"en:dairies", "dairy"},
|
||||
{"en:dairy-products", "dairy"},
|
||||
{"en:meats", "meat"},
|
||||
{"en:poultry", "meat"},
|
||||
{"en:fish-and-seafood", "meat"},
|
||||
{"en:fruits", "produce"},
|
||||
{"en:vegetables", "produce"},
|
||||
{"en:plant-based-foods", "produce"},
|
||||
{"en:breads", "bakery"},
|
||||
{"en:pastries", "bakery"},
|
||||
{"en:baked-goods", "bakery"},
|
||||
{"en:frozen-foods", "frozen"},
|
||||
{"en:beverages", "beverages"},
|
||||
{"en:drinks", "beverages"},
|
||||
{"en:sodas", "beverages"},
|
||||
{"en:waters", "beverages"},
|
||||
}
|
||||
|
||||
func resolveCategory(categoriesTags []string) *string {
|
||||
for _, tag := range categoriesTags {
|
||||
for _, mapping := range categoryPrefixes {
|
||||
if strings.HasPrefix(tag, mapping.prefix) {
|
||||
slug := mapping.slug
|
||||
return &slug
|
||||
}
|
||||
}
|
||||
}
|
||||
other := "other"
|
||||
return &other
|
||||
}
|
||||
|
||||
func main() {
|
||||
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
||||
slog.SetDefault(logger)
|
||||
|
||||
if runError := run(); runError != nil {
|
||||
slog.Error("import failed", "error", runError)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run() error {
|
||||
filePathFlag := flag.String("file", "", "path to OFF JSONL or JSONL.GZ file (required)")
|
||||
dsnFlag := flag.String("dsn", os.Getenv("DATABASE_URL"), "postgres connection DSN")
|
||||
limitFlag := flag.Int("limit", 0, "stop after N accepted products (0 = no limit)")
|
||||
batchSizeFlag := flag.Int("batch", 500, "products per upsert batch")
|
||||
minScansFlag := flag.Int("min-scans", 1, "minimum unique_scans_n to include a product (0 = no filter)")
|
||||
flag.Parse()
|
||||
|
||||
if *filePathFlag == "" {
|
||||
return fmt.Errorf("-file is required")
|
||||
}
|
||||
if *dsnFlag == "" {
|
||||
return fmt.Errorf("-dsn or DATABASE_URL is required")
|
||||
}
|
||||
|
||||
importContext, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
pgxConn, connectError := pgx.Connect(importContext, *dsnFlag)
|
||||
if connectError != nil {
|
||||
return fmt.Errorf("connect to database: %w", connectError)
|
||||
}
|
||||
defer pgxConn.Close(importContext)
|
||||
slog.Info("connected to database")
|
||||
|
||||
// Create a temporary staging table for the COPY step.
|
||||
// It has no unique constraints so COPY never fails on duplicate data.
|
||||
if _, createError := pgxConn.Exec(importContext, `
|
||||
CREATE TEMP TABLE IF NOT EXISTS products_import (
|
||||
canonical_name TEXT,
|
||||
barcode TEXT,
|
||||
category TEXT,
|
||||
calories_per_100g DOUBLE PRECISION,
|
||||
protein_per_100g DOUBLE PRECISION,
|
||||
fat_per_100g DOUBLE PRECISION,
|
||||
carbs_per_100g DOUBLE PRECISION,
|
||||
fiber_per_100g DOUBLE PRECISION
|
||||
)`); createError != nil {
|
||||
return fmt.Errorf("create staging table: %w", createError)
|
||||
}
|
||||
|
||||
dataFile, openError := os.Open(*filePathFlag)
|
||||
if openError != nil {
|
||||
return fmt.Errorf("open file: %w", openError)
|
||||
}
|
||||
defer dataFile.Close()
|
||||
|
||||
var lineScanner *bufio.Scanner
|
||||
if strings.HasSuffix(*filePathFlag, ".gz") {
|
||||
gzipReader, gzipError := gzip.NewReader(dataFile)
|
||||
if gzipError != nil {
|
||||
return fmt.Errorf("open gzip reader: %w", gzipError)
|
||||
}
|
||||
defer gzipReader.Close()
|
||||
lineScanner = bufio.NewScanner(gzipReader)
|
||||
} else {
|
||||
lineScanner = bufio.NewScanner(dataFile)
|
||||
}
|
||||
// OFF lines can exceed the default 64 KB scanner buffer.
|
||||
lineScanner.Buffer(make([]byte, 16*1024*1024), 16*1024*1024)
|
||||
|
||||
var (
|
||||
productBatch []productImportRow
|
||||
seenInBatch = make(map[string]bool, *batchSizeFlag)
|
||||
totalAccepted int
|
||||
totalInserted int64
|
||||
totalSkipped int
|
||||
)
|
||||
|
||||
flushCurrent := func() error {
|
||||
inserted, flushError := flushBatch(importContext, pgxConn, productBatch)
|
||||
if flushError != nil {
|
||||
return flushError
|
||||
}
|
||||
totalInserted += inserted
|
||||
productBatch = productBatch[:0]
|
||||
for key := range seenInBatch {
|
||||
delete(seenInBatch, key)
|
||||
}
|
||||
slog.Info("progress",
|
||||
"accepted", totalAccepted,
|
||||
"inserted", totalInserted,
|
||||
"skipped", totalSkipped,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
for lineScanner.Scan() {
|
||||
if importContext.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
var record offRecord
|
||||
if decodeError := json.Unmarshal(lineScanner.Bytes(), &record); decodeError != nil {
|
||||
totalSkipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// Resolve canonical name: prefer English, fall back to any language.
|
||||
canonicalName := record.ProductNameEN
|
||||
if canonicalName == "" {
|
||||
canonicalName = record.ProductName
|
||||
}
|
||||
|
||||
// Apply filter rules.
|
||||
if record.Code == "" || canonicalName == "" {
|
||||
totalSkipped++
|
||||
continue
|
||||
}
|
||||
if record.Nutriments.EnergyKcal100g == nil || *record.Nutriments.EnergyKcal100g <= 0 {
|
||||
totalSkipped++
|
||||
continue
|
||||
}
|
||||
if *minScansFlag > 0 && record.UniqueScansN < *minScansFlag {
|
||||
totalSkipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// Deduplicate by canonical_name within the current batch.
|
||||
if seenInBatch[canonicalName] {
|
||||
totalSkipped++
|
||||
continue
|
||||
}
|
||||
seenInBatch[canonicalName] = true
|
||||
|
||||
totalAccepted++
|
||||
productBatch = append(productBatch, productImportRow{
|
||||
canonicalName: canonicalName,
|
||||
barcode: record.Code,
|
||||
category: resolveCategory(record.CategoriesTags),
|
||||
calories: *record.Nutriments.EnergyKcal100g,
|
||||
protein: record.Nutriments.Proteins100g,
|
||||
fat: record.Nutriments.Fat100g,
|
||||
carbs: record.Nutriments.Carbohydrates100g,
|
||||
fiber: record.Nutriments.Fiber100g,
|
||||
})
|
||||
|
||||
if len(productBatch) >= *batchSizeFlag {
|
||||
if flushError := flushCurrent(); flushError != nil {
|
||||
return flushError
|
||||
}
|
||||
}
|
||||
|
||||
if *limitFlag > 0 && totalAccepted >= *limitFlag {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if scanError := lineScanner.Err(); scanError != nil {
|
||||
return fmt.Errorf("read file: %w", scanError)
|
||||
}
|
||||
|
||||
// Flush any remaining rows.
|
||||
if len(productBatch) > 0 {
|
||||
if flushError := flushCurrent(); flushError != nil {
|
||||
return flushError
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("import complete",
|
||||
"accepted", totalAccepted,
|
||||
"inserted", totalInserted,
|
||||
"skipped", totalSkipped,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// flushBatch upserts productBatch into the catalog.
|
||||
// It uses a staging table + COPY for fast bulk loading, then INSERT … SELECT with
|
||||
// ON CONFLICT to safely merge into products.
|
||||
func flushBatch(
|
||||
requestContext context.Context,
|
||||
pgxConn *pgx.Conn,
|
||||
productBatch []productImportRow,
|
||||
) (int64, error) {
|
||||
if len(productBatch) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Truncate staging table so it is empty before each batch.
|
||||
if _, truncateError := pgxConn.Exec(requestContext, `TRUNCATE products_import`); truncateError != nil {
|
||||
return 0, fmt.Errorf("truncate staging table: %w", truncateError)
|
||||
}
|
||||
|
||||
// COPY product rows into the staging table.
|
||||
// No unique constraints here, so the COPY never errors on duplicate data.
|
||||
_, copyError := pgxConn.CopyFrom(
|
||||
requestContext,
|
||||
pgx.Identifier{"products_import"},
|
||||
[]string{
|
||||
"canonical_name", "barcode", "category",
|
||||
"calories_per_100g", "protein_per_100g", "fat_per_100g",
|
||||
"carbs_per_100g", "fiber_per_100g",
|
||||
},
|
||||
pgx.CopyFromSlice(len(productBatch), func(rowIndex int) ([]any, error) {
|
||||
row := productBatch[rowIndex]
|
||||
return []any{
|
||||
row.canonicalName, row.barcode, row.category,
|
||||
row.calories, row.protein, row.fat,
|
||||
row.carbs, row.fiber,
|
||||
}, nil
|
||||
}),
|
||||
)
|
||||
if copyError != nil {
|
||||
return 0, fmt.Errorf("copy products to staging table: %w", copyError)
|
||||
}
|
||||
|
||||
// Nullify any barcode in the staging table that is already assigned to a
|
||||
// different product in the catalog. This prevents unique-constraint violations
|
||||
// on the barcode column during the INSERT below.
|
||||
if _, updateError := pgxConn.Exec(requestContext, `
|
||||
UPDATE products_import pi
|
||||
SET barcode = NULL
|
||||
WHERE pi.barcode IS NOT NULL
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM products p
|
||||
WHERE p.barcode = pi.barcode
|
||||
)`); updateError != nil {
|
||||
return 0, fmt.Errorf("nullify conflicting barcodes: %w", updateError)
|
||||
}
|
||||
|
||||
// Insert from staging into the catalog.
|
||||
// ON CONFLICT (canonical_name): update nutritional columns but leave barcode
|
||||
// unchanged — we do not want to reassign a barcode that already belongs to this
|
||||
// canonical entry, nor steal one from another entry.
|
||||
upsertRows, upsertError := pgxConn.Query(requestContext, `
|
||||
INSERT INTO products (
|
||||
canonical_name, barcode, category,
|
||||
calories_per_100g, protein_per_100g, fat_per_100g,
|
||||
carbs_per_100g, fiber_per_100g
|
||||
)
|
||||
SELECT
|
||||
canonical_name, barcode, category,
|
||||
calories_per_100g, protein_per_100g, fat_per_100g,
|
||||
carbs_per_100g, fiber_per_100g
|
||||
FROM products_import
|
||||
ON CONFLICT (canonical_name) DO UPDATE SET
|
||||
category = COALESCE(EXCLUDED.category, products.category),
|
||||
calories_per_100g = COALESCE(EXCLUDED.calories_per_100g, products.calories_per_100g),
|
||||
protein_per_100g = COALESCE(EXCLUDED.protein_per_100g, products.protein_per_100g),
|
||||
fat_per_100g = COALESCE(EXCLUDED.fat_per_100g, products.fat_per_100g),
|
||||
carbs_per_100g = COALESCE(EXCLUDED.carbs_per_100g, products.carbs_per_100g),
|
||||
fiber_per_100g = COALESCE(EXCLUDED.fiber_per_100g, products.fiber_per_100g),
|
||||
updated_at = now()
|
||||
RETURNING id`)
|
||||
if upsertError != nil {
|
||||
return 0, fmt.Errorf("upsert products: %w", upsertError)
|
||||
}
|
||||
|
||||
var insertedCount int64
|
||||
for upsertRows.Next() {
|
||||
var productID string
|
||||
if scanError := upsertRows.Scan(&productID); scanError != nil {
|
||||
upsertRows.Close()
|
||||
return 0, fmt.Errorf("scan upserted product id: %w", scanError)
|
||||
}
|
||||
insertedCount++
|
||||
}
|
||||
upsertRows.Close()
|
||||
if rowsError := upsertRows.Err(); rowsError != nil {
|
||||
return 0, fmt.Errorf("iterate upsert results: %w", rowsError)
|
||||
}
|
||||
|
||||
return insertedCount, nil
|
||||
}
|
||||
Reference in New Issue
Block a user