package main import ( "bufio" "compress/gzip" "context" "encoding/json" "flag" "fmt" "log/slog" "os" "os/signal" "strings" "syscall" "github.com/jackc/pgx/v5" ) // offRecord is the JSON shape of one line in the OpenFoodFacts JSONL dump. type offRecord struct { Code string `json:"code"` ProductName string `json:"product_name"` ProductNameEN string `json:"product_name_en"` ProductNameRu string `json:"product_name_ru"` ProductNameDe string `json:"product_name_de"` ProductNameFr string `json:"product_name_fr"` ProductNameEs string `json:"product_name_es"` ProductNameIt string `json:"product_name_it"` ProductNamePt string `json:"product_name_pt"` ProductNameZh string `json:"product_name_zh"` ProductNameJa string `json:"product_name_ja"` ProductNameKo string `json:"product_name_ko"` ProductNameAr string `json:"product_name_ar"` ProductNameHi string `json:"product_name_hi"` CategoriesTags []string `json:"categories_tags"` Nutriments offNutriments `json:"nutriments"` UniqueScansN int `json:"unique_scans_n"` } type offNutriments struct { EnergyKcal100g *float64 `json:"energy-kcal_100g"` Proteins100g *float64 `json:"proteins_100g"` Fat100g *float64 `json:"fat_100g"` Carbohydrates100g *float64 `json:"carbohydrates_100g"` Fiber100g *float64 `json:"fiber_100g"` } // aliasRow holds one multilingual alias for a product. type aliasRow struct { lang string alias string } type productImportRow struct { canonicalName string barcode string category *string calories float64 protein *float64 fat *float64 carbs *float64 fiber *float64 aliases []aliasRow } // categoryPrefixes maps OpenFoodFacts category tag prefixes to our product_categories slugs. // Entries are checked in order; the first match wins. var categoryPrefixes = []struct { prefix string slug string }{ {"en:dairies", "dairy"}, {"en:dairy-products", "dairy"}, {"en:meats", "meat"}, {"en:poultry", "meat"}, {"en:fish-and-seafood", "meat"}, {"en:fruits", "produce"}, {"en:vegetables", "produce"}, {"en:plant-based-foods", "produce"}, {"en:breads", "bakery"}, {"en:pastries", "bakery"}, {"en:baked-goods", "bakery"}, {"en:frozen-foods", "frozen"}, {"en:beverages", "beverages"}, {"en:drinks", "beverages"}, {"en:sodas", "beverages"}, {"en:waters", "beverages"}, } func resolveCategory(categoriesTags []string) *string { for _, tag := range categoriesTags { for _, mapping := range categoryPrefixes { if strings.HasPrefix(tag, mapping.prefix) { slug := mapping.slug return &slug } } } other := "other" return &other } func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) slog.SetDefault(logger) if runError := run(); runError != nil { slog.Error("import failed", "error", runError) os.Exit(1) } } func run() error { filePathFlag := flag.String("file", "", "path to OFF JSONL or JSONL.GZ file (required)") dsnFlag := flag.String("dsn", os.Getenv("DATABASE_URL"), "postgres connection DSN") limitFlag := flag.Int("limit", 0, "stop after N accepted products (0 = no limit)") batchSizeFlag := flag.Int("batch", 500, "products per upsert batch") minScansFlag := flag.Int("min-scans", 1, "minimum unique_scans_n to include a product (0 = no filter)") flag.Parse() if *filePathFlag == "" { return fmt.Errorf("-file is required") } if *dsnFlag == "" { return fmt.Errorf("-dsn or DATABASE_URL is required") } importContext, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pgxConn, connectError := pgx.Connect(importContext, *dsnFlag) if connectError != nil { return fmt.Errorf("connect to database: %w", connectError) } defer pgxConn.Close(importContext) slog.Info("connected to database") // Create a temporary staging table for the COPY step. // It has no unique constraints so COPY never fails on duplicate data. if _, createError := pgxConn.Exec(importContext, ` CREATE TEMP TABLE IF NOT EXISTS products_import ( canonical_name TEXT, barcode TEXT, category TEXT, calories_per_100g DOUBLE PRECISION, protein_per_100g DOUBLE PRECISION, fat_per_100g DOUBLE PRECISION, carbs_per_100g DOUBLE PRECISION, fiber_per_100g DOUBLE PRECISION )`); createError != nil { return fmt.Errorf("create staging table: %w", createError) } dataFile, openError := os.Open(*filePathFlag) if openError != nil { return fmt.Errorf("open file: %w", openError) } defer dataFile.Close() var lineScanner *bufio.Scanner if strings.HasSuffix(*filePathFlag, ".gz") { gzipReader, gzipError := gzip.NewReader(dataFile) if gzipError != nil { return fmt.Errorf("open gzip reader: %w", gzipError) } defer gzipReader.Close() lineScanner = bufio.NewScanner(gzipReader) } else { lineScanner = bufio.NewScanner(dataFile) } // OFF lines can exceed the default 64 KB scanner buffer. lineScanner.Buffer(make([]byte, 16*1024*1024), 16*1024*1024) var ( productBatch []productImportRow seenInBatch = make(map[string]bool, *batchSizeFlag) totalAccepted int totalInserted int64 totalSkipped int ) flushCurrent := func() error { inserted, flushError := flushBatch(importContext, pgxConn, productBatch) if flushError != nil { return flushError } totalInserted += inserted productBatch = productBatch[:0] for key := range seenInBatch { delete(seenInBatch, key) } slog.Info("progress", "accepted", totalAccepted, "inserted", totalInserted, "skipped", totalSkipped, ) return nil } for lineScanner.Scan() { if importContext.Err() != nil { break } var record offRecord if decodeError := json.Unmarshal(lineScanner.Bytes(), &record); decodeError != nil { totalSkipped++ continue } // Resolve canonical name: prefer English, fall back to any language. canonicalName := strings.TrimSpace(record.ProductNameEN) if canonicalName == "" { canonicalName = strings.TrimSpace(record.ProductName) } // Apply filter rules. if record.Code == "" || canonicalName == "" { totalSkipped++ continue } if record.Nutriments.EnergyKcal100g == nil || *record.Nutriments.EnergyKcal100g <= 0 { totalSkipped++ continue } if *minScansFlag > 0 && record.UniqueScansN < *minScansFlag { totalSkipped++ continue } // Deduplicate by canonical_name within the current batch. if seenInBatch[canonicalName] { totalSkipped++ continue } seenInBatch[canonicalName] = true // Collect non-English localised names as aliases so that searches in // other languages (e.g. "сникерс" → "Snickers") can find the product. langNames := map[string]string{ "ru": strings.TrimSpace(record.ProductNameRu), "de": strings.TrimSpace(record.ProductNameDe), "fr": strings.TrimSpace(record.ProductNameFr), "es": strings.TrimSpace(record.ProductNameEs), "it": strings.TrimSpace(record.ProductNameIt), "pt": strings.TrimSpace(record.ProductNamePt), "zh": strings.TrimSpace(record.ProductNameZh), "ja": strings.TrimSpace(record.ProductNameJa), "ko": strings.TrimSpace(record.ProductNameKo), "ar": strings.TrimSpace(record.ProductNameAr), "hi": strings.TrimSpace(record.ProductNameHi), } var productAliases []aliasRow for lang, name := range langNames { if name != "" && name != canonicalName { productAliases = append(productAliases, aliasRow{lang: lang, alias: name}) } } totalAccepted++ productBatch = append(productBatch, productImportRow{ canonicalName: canonicalName, barcode: record.Code, category: resolveCategory(record.CategoriesTags), calories: *record.Nutriments.EnergyKcal100g, protein: record.Nutriments.Proteins100g, fat: record.Nutriments.Fat100g, carbs: record.Nutriments.Carbohydrates100g, fiber: record.Nutriments.Fiber100g, aliases: productAliases, }) if len(productBatch) >= *batchSizeFlag { if flushError := flushCurrent(); flushError != nil { return flushError } } if *limitFlag > 0 && totalAccepted >= *limitFlag { break } } if scanError := lineScanner.Err(); scanError != nil { return fmt.Errorf("read file: %w", scanError) } // Flush any remaining rows. if len(productBatch) > 0 { if flushError := flushCurrent(); flushError != nil { return flushError } } slog.Info("import complete", "accepted", totalAccepted, "inserted", totalInserted, "skipped", totalSkipped, ) return nil } // flushBatch upserts productBatch into the catalog. // It uses a staging table + COPY for fast bulk loading, then INSERT … SELECT with // ON CONFLICT to safely merge into products. func flushBatch( requestContext context.Context, pgxConn *pgx.Conn, productBatch []productImportRow, ) (int64, error) { if len(productBatch) == 0 { return 0, nil } // Truncate staging table so it is empty before each batch. if _, truncateError := pgxConn.Exec(requestContext, `TRUNCATE products_import`); truncateError != nil { return 0, fmt.Errorf("truncate staging table: %w", truncateError) } // COPY product rows into the staging table. // No unique constraints here, so the COPY never errors on duplicate data. _, copyError := pgxConn.CopyFrom( requestContext, pgx.Identifier{"products_import"}, []string{ "canonical_name", "barcode", "category", "calories_per_100g", "protein_per_100g", "fat_per_100g", "carbs_per_100g", "fiber_per_100g", }, pgx.CopyFromSlice(len(productBatch), func(rowIndex int) ([]any, error) { row := productBatch[rowIndex] return []any{ row.canonicalName, row.barcode, row.category, row.calories, row.protein, row.fat, row.carbs, row.fiber, }, nil }), ) if copyError != nil { return 0, fmt.Errorf("copy products to staging table: %w", copyError) } // Nullify any barcode in the staging table that is already assigned to a // different product in the catalog. This prevents unique-constraint violations // on the barcode column during the INSERT below. if _, updateError := pgxConn.Exec(requestContext, ` UPDATE products_import pi SET barcode = NULL WHERE pi.barcode IS NOT NULL AND EXISTS ( SELECT 1 FROM products p WHERE p.barcode = pi.barcode )`); updateError != nil { return 0, fmt.Errorf("nullify conflicting barcodes: %w", updateError) } // Insert from staging into the catalog. // ON CONFLICT (canonical_name): update nutritional columns but leave barcode // unchanged — we do not want to reassign a barcode that already belongs to this // canonical entry, nor steal one from another entry. upsertRows, upsertError := pgxConn.Query(requestContext, ` INSERT INTO products ( canonical_name, barcode, category, calories_per_100g, protein_per_100g, fat_per_100g, carbs_per_100g, fiber_per_100g ) SELECT canonical_name, barcode, category, calories_per_100g, protein_per_100g, fat_per_100g, carbs_per_100g, fiber_per_100g FROM products_import ON CONFLICT (canonical_name) DO UPDATE SET category = COALESCE(EXCLUDED.category, products.category), calories_per_100g = COALESCE(EXCLUDED.calories_per_100g, products.calories_per_100g), protein_per_100g = COALESCE(EXCLUDED.protein_per_100g, products.protein_per_100g), fat_per_100g = COALESCE(EXCLUDED.fat_per_100g, products.fat_per_100g), carbs_per_100g = COALESCE(EXCLUDED.carbs_per_100g, products.carbs_per_100g), fiber_per_100g = COALESCE(EXCLUDED.fiber_per_100g, products.fiber_per_100g), updated_at = now() RETURNING id, canonical_name`) if upsertError != nil { return 0, fmt.Errorf("upsert products: %w", upsertError) } // Build a canonical_name → product_id map so we can insert aliases below. productIDByName := make(map[string]string, len(productBatch)) var insertedCount int64 for upsertRows.Next() { var productID, canonicalName string if scanError := upsertRows.Scan(&productID, &canonicalName); scanError != nil { upsertRows.Close() return 0, fmt.Errorf("scan upserted product id: %w", scanError) } productIDByName[canonicalName] = productID insertedCount++ } upsertRows.Close() if rowsError := upsertRows.Err(); rowsError != nil { return 0, fmt.Errorf("iterate upsert results: %w", rowsError) } // Insert multilingual aliases collected during parsing. // ON CONFLICT DO NOTHING so re-imports are safe. aliasBatch := &pgx.Batch{} for _, row := range productBatch { productID, exists := productIDByName[row.canonicalName] if !exists || len(row.aliases) == 0 { continue } for _, aliasEntry := range row.aliases { aliasBatch.Queue( `INSERT INTO product_aliases (product_id, lang, alias) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING`, productID, aliasEntry.lang, aliasEntry.alias, ) } } if aliasBatch.Len() > 0 { batchResults := pgxConn.SendBatch(requestContext, aliasBatch) for range aliasBatch.Len() { if _, execError := batchResults.Exec(); execError != nil { batchResults.Close() return 0, fmt.Errorf("insert product alias: %w", execError) } } if closeError := batchResults.Close(); closeError != nil { return 0, fmt.Errorf("close alias batch: %w", closeError) } } return insertedCount, nil }