Files
food-ai/client/lib/features/scan/recognition_service.dart
dbastrikin c7317c4335 feat: async product/receipt recognition via Kafka
Backend:
- Migration 002: product_recognition_jobs table with JSONB images column
  and job_type CHECK ('receipt' | 'products')
- New Kafka topics: ai.products.paid / ai.products.free
- ProductJob model, ProductJobRepository (mirrors dish job pattern)
- itemEnricher extracted from Handler — shared by HTTP handler and worker
- ProductSSEBroker: PG LISTEN on product_job_update channel
- ProductWorkerPool: 5 workers, branches on job_type to call
  RecognizeReceipt or RecognizeProducts per image in parallel
- Handler: RecognizeReceipt and RecognizeProducts now return 202 Accepted
  instead of blocking; 4 new endpoints: GET /ai/product-jobs,
  /product-jobs/history, /product-jobs/{id}, /product-jobs/{id}/stream
- cmd/worker: extended to run ProductWorkerPool alongside dish WorkerPool
- cmd/server: wires productJobRepository + productSSEBroker; both SSE
  brokers started in App.Start()

Flutter client:
- ProductJobCreated, ProductJobResult, ProductJobSummary, ProductJobEvent
  models + submitReceiptRecognition/submitProductsRecognition/stream methods
- Shared _openSseStream helper eliminates duplicate SSE parsing loop
- ScanScreen: replace blocking AI calls with async submit + navigate to
  ProductJobWatchScreen
- ProductJobWatchScreen: watches SSE stream, navigates to /scan/confirm
  when done, shows error on failure
- ProductsScreen: prepends _RecentScansSection (hidden when empty); compact
  horizontal list of recent scans with "See all" → history
- ProductJobHistoryScreen: full list of all product recognition jobs
- New routes: /scan/product-job-watch, /products/job-history
- L10n: 7 new keys in all 12 ARB files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 23:01:30 +02:00

550 lines
17 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import 'dart:async';
import 'dart:convert';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:http/http.dart' as http;
import 'package:image_picker/image_picker.dart';
import '../../core/api/api_client.dart';
import '../../core/auth/auth_provider.dart';
import '../../core/auth/secure_storage.dart';
import '../../core/config/app_config.dart';
import '../../core/locale/language_provider.dart';
// ---------------------------------------------------------------------------
// Models
// ---------------------------------------------------------------------------
class RecognizedItem {
final String name;
double quantity;
String unit;
final String category;
final double confidence;
final String? primaryProductId;
final int storageDays;
RecognizedItem({
required this.name,
required this.quantity,
required this.unit,
required this.category,
required this.confidence,
this.primaryProductId,
required this.storageDays,
});
factory RecognizedItem.fromJson(Map<String, dynamic> json) {
return RecognizedItem(
name: json['name'] as String? ?? '',
quantity: (json['quantity'] as num?)?.toDouble() ?? 1.0,
unit: json['unit'] as String? ?? 'pcs',
category: json['category'] as String? ?? 'other',
confidence: (json['confidence'] as num?)?.toDouble() ?? 0.0,
primaryProductId: json['mapping_id'] as String?,
storageDays: json['storage_days'] as int? ?? 7,
);
}
}
class UnrecognizedItem {
final String rawText;
final double? price;
const UnrecognizedItem({required this.rawText, this.price});
factory UnrecognizedItem.fromJson(Map<String, dynamic> json) {
return UnrecognizedItem(
rawText: json['raw_text'] as String? ?? '',
price: (json['price'] as num?)?.toDouble(),
);
}
}
class ReceiptResult {
final List<RecognizedItem> items;
final List<UnrecognizedItem> unrecognized;
const ReceiptResult({required this.items, required this.unrecognized});
}
/// A single dish recognition candidate with estimated nutrition for the portion in the photo.
class DishCandidate {
final String? dishId;
final String dishName;
final int weightGrams;
final double calories;
final double proteinG;
final double fatG;
final double carbsG;
final double confidence;
const DishCandidate({
this.dishId,
required this.dishName,
required this.weightGrams,
required this.calories,
required this.proteinG,
required this.fatG,
required this.carbsG,
required this.confidence,
});
factory DishCandidate.fromJson(Map<String, dynamic> json) {
return DishCandidate(
dishId: json['dish_id'] as String?,
dishName: json['dish_name'] as String? ?? '',
weightGrams: json['weight_grams'] as int? ?? 0,
calories: (json['calories'] as num?)?.toDouble() ?? 0,
proteinG: (json['protein_g'] as num?)?.toDouble() ?? 0,
fatG: (json['fat_g'] as num?)?.toDouble() ?? 0,
carbsG: (json['carbs_g'] as num?)?.toDouble() ?? 0,
confidence: (json['confidence'] as num?)?.toDouble() ?? 0,
);
}
}
/// Result of dish recognition: ordered list of candidates (best match first).
class DishResult {
final List<DishCandidate> candidates;
const DishResult({required this.candidates});
/// The best matching candidate.
DishCandidate get best => candidates.first;
// Convenience getters delegating to the best candidate.
String get dishName => best.dishName;
int get weightGrams => best.weightGrams;
double get calories => best.calories;
double get proteinG => best.proteinG;
double get fatG => best.fatG;
double get carbsG => best.carbsG;
double get confidence => best.confidence;
factory DishResult.fromJson(Map<String, dynamic> json) {
// New format: {"candidates": [...]}
if (json['candidates'] is List) {
final candidatesList = (json['candidates'] as List<dynamic>)
.map((element) => DishCandidate.fromJson(element as Map<String, dynamic>))
.toList();
return DishResult(candidates: candidatesList);
}
// Legacy flat format: {"dish_name": "...", "calories": ..., ...}
if (json['dish_name'] != null) {
return DishResult(candidates: [DishCandidate.fromJson(json)]);
}
return const DishResult(candidates: []);
}
}
// ---------------------------------------------------------------------------
// Product job models
// ---------------------------------------------------------------------------
/// Result of a completed product or receipt recognition job.
class ProductJobResult {
final String jobType;
final List<RecognizedItem> items;
final List<UnrecognizedItem> unrecognized;
const ProductJobResult({
required this.jobType,
required this.items,
required this.unrecognized,
});
factory ProductJobResult.fromJson(Map<String, dynamic> json) {
return ProductJobResult(
jobType: json['job_type'] as String? ?? '',
items: (json['items'] as List<dynamic>? ?? [])
.map((element) => RecognizedItem.fromJson(element as Map<String, dynamic>))
.toList(),
unrecognized: (json['unrecognized'] as List<dynamic>? ?? [])
.map((element) => UnrecognizedItem.fromJson(element as Map<String, dynamic>))
.toList(),
);
}
}
/// The 202 response from POST /ai/recognize-receipt or /ai/recognize-products.
class ProductJobCreated {
final String jobId;
final int queuePosition;
final int estimatedSeconds;
const ProductJobCreated({
required this.jobId,
required this.queuePosition,
required this.estimatedSeconds,
});
factory ProductJobCreated.fromJson(Map<String, dynamic> json) {
return ProductJobCreated(
jobId: json['job_id'] as String,
queuePosition: json['queue_position'] as int? ?? 0,
estimatedSeconds: json['estimated_seconds'] as int? ?? 0,
);
}
}
/// A lightweight summary of a product recognition job for list endpoints.
class ProductJobSummary {
final String id;
final String jobType;
final String status;
final ProductJobResult? result;
final String? error;
final DateTime createdAt;
const ProductJobSummary({
required this.id,
required this.jobType,
required this.status,
this.result,
this.error,
required this.createdAt,
});
factory ProductJobSummary.fromJson(Map<String, dynamic> json) {
return ProductJobSummary(
id: json['id'] as String,
jobType: json['job_type'] as String? ?? '',
status: json['status'] as String? ?? '',
result: json['result'] != null
? ProductJobResult.fromJson(json['result'] as Map<String, dynamic>)
: null,
error: json['error'] as String?,
createdAt: DateTime.parse(json['created_at'] as String),
);
}
}
/// Events emitted by the SSE stream for a product recognition job.
sealed class ProductJobEvent {}
class ProductJobQueued extends ProductJobEvent {
final int position;
final int estimatedSeconds;
ProductJobQueued({required this.position, required this.estimatedSeconds});
}
class ProductJobProcessing extends ProductJobEvent {}
class ProductJobDone extends ProductJobEvent {
final ProductJobResult result;
ProductJobDone(this.result);
}
class ProductJobFailed extends ProductJobEvent {
final String error;
ProductJobFailed(this.error);
}
// ---------------------------------------------------------------------------
// Dish job models
// ---------------------------------------------------------------------------
/// A lightweight summary of a dish recognition job (no image payload).
class DishJobSummary {
final String id;
final String status;
final String? targetDate;
final String? targetMealType;
final DishResult? result;
final String? error;
final DateTime createdAt;
const DishJobSummary({
required this.id,
required this.status,
this.targetDate,
this.targetMealType,
this.result,
this.error,
required this.createdAt,
});
factory DishJobSummary.fromJson(Map<String, dynamic> json) {
return DishJobSummary(
id: json['id'] as String,
status: json['status'] as String? ?? '',
targetDate: json['target_date'] as String?,
targetMealType: json['target_meal_type'] as String?,
result: json['result'] != null
? DishResult.fromJson(json['result'] as Map<String, dynamic>)
: null,
error: json['error'] as String?,
createdAt: DateTime.parse(json['created_at'] as String),
);
}
}
/// The 202 response from POST /ai/recognize-dish.
class DishJobCreated {
final String jobId;
final int queuePosition;
final int estimatedSeconds;
const DishJobCreated({
required this.jobId,
required this.queuePosition,
required this.estimatedSeconds,
});
factory DishJobCreated.fromJson(Map<String, dynamic> json) {
return DishJobCreated(
jobId: json['job_id'] as String,
queuePosition: json['queue_position'] as int? ?? 0,
estimatedSeconds: json['estimated_seconds'] as int? ?? 0,
);
}
}
/// Events emitted by the SSE stream for a dish recognition job.
sealed class DishJobEvent {}
class DishJobQueued extends DishJobEvent {
final int position;
final int estimatedSeconds;
DishJobQueued({required this.position, required this.estimatedSeconds});
}
class DishJobProcessing extends DishJobEvent {}
class DishJobDone extends DishJobEvent {
final DishResult result;
DishJobDone(this.result);
}
class DishJobFailed extends DishJobEvent {
final String error;
DishJobFailed(this.error);
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
class RecognitionService {
const RecognitionService(
this._client,
this._storage,
this._appConfig,
this._languageGetter,
);
final ApiClient _client;
final SecureStorageService _storage;
final AppConfig _appConfig;
final String Function() _languageGetter;
/// Submits a receipt image for async recognition.
/// Returns immediately with a [ProductJobCreated] containing the job ID.
Future<ProductJobCreated> submitReceiptRecognition(XFile image) async {
final payload = await _buildImagePayload(image);
final data = await _client.post('/ai/recognize-receipt', data: payload);
return ProductJobCreated.fromJson(data);
}
/// Submits 13 product images for async recognition.
/// Returns immediately with a [ProductJobCreated] containing the job ID.
Future<ProductJobCreated> submitProductsRecognition(List<XFile> images) async {
final imageList = await Future.wait(images.map(_buildImagePayload));
final data = await _client.post(
'/ai/recognize-products',
data: {'images': imageList},
);
return ProductJobCreated.fromJson(data);
}
/// Returns product recognition jobs from the last 7 days.
Future<List<ProductJobSummary>> listRecentProductJobs() async {
final data = await _client.getList('/ai/product-jobs');
return data
.map((element) =>
ProductJobSummary.fromJson(element as Map<String, dynamic>))
.toList();
}
/// Returns all product recognition jobs for the current user, newest first.
Future<List<ProductJobSummary>> listAllProductJobs() async {
final data = await _client.getList('/ai/product-jobs/history');
return data
.map((element) =>
ProductJobSummary.fromJson(element as Map<String, dynamic>))
.toList();
}
/// Opens an SSE stream for product job [jobId] and emits [ProductJobEvent]s
/// until the job reaches a terminal state or the stream is cancelled.
Stream<ProductJobEvent> streamProductJobEvents(String jobId) async* {
final streamUri = Uri.parse('${_appConfig.apiBaseUrl}/ai/product-jobs/$jobId/stream');
await for (final parsed in _openSseStream(streamUri)) {
final eventName = parsed.$1;
final json = parsed.$2;
ProductJobEvent? event;
switch (eventName) {
case 'queued':
event = ProductJobQueued(
position: json['position'] as int? ?? 0,
estimatedSeconds: json['estimated_seconds'] as int? ?? 0,
);
case 'processing':
event = ProductJobProcessing();
case 'done':
event = ProductJobDone(ProductJobResult.fromJson(json));
case 'failed':
event = ProductJobFailed(json['error'] as String? ?? 'Recognition failed');
}
if (event != null) {
yield event;
if (event is ProductJobDone || event is ProductJobFailed) return;
}
}
}
/// Submits a dish image for async recognition.
/// Returns a [DishJobCreated] with the job ID and queue position.
Future<DishJobCreated> submitDishRecognition(
XFile image, {
String? targetDate,
String? targetMealType,
}) async {
final imagePayload = await _buildImagePayload(image);
final payload = <String, dynamic>{...imagePayload};
if (targetDate != null) payload['target_date'] = targetDate;
if (targetMealType != null) payload['target_meal_type'] = targetMealType;
final data = await _client.post('/ai/recognize-dish', data: payload);
return DishJobCreated.fromJson(data);
}
/// Returns today's recognition jobs that have not yet been linked to a diary entry.
Future<List<DishJobSummary>> listTodayUnlinkedJobs() async {
final data = await _client.getList('/ai/jobs');
return data
.map((element) =>
DishJobSummary.fromJson(element as Map<String, dynamic>))
.toList();
}
/// Returns all recognition jobs for the current user, newest first.
Future<List<DishJobSummary>> listAllJobs() async {
final data = await _client.getList('/ai/jobs/history');
return data
.map((element) =>
DishJobSummary.fromJson(element as Map<String, dynamic>))
.toList();
}
/// Opens an SSE stream for dish job [jobId] and emits [DishJobEvent]s until
/// the job reaches a terminal state (done or failed) or the stream is cancelled.
Stream<DishJobEvent> streamJobEvents(String jobId) async* {
final streamUri = Uri.parse('${_appConfig.apiBaseUrl}/ai/jobs/$jobId/stream');
await for (final parsed in _openSseStream(streamUri)) {
final eventName = parsed.$1;
final json = parsed.$2;
DishJobEvent? event;
switch (eventName) {
case 'queued':
event = DishJobQueued(
position: json['position'] as int? ?? 0,
estimatedSeconds: json['estimated_seconds'] as int? ?? 0,
);
case 'processing':
event = DishJobProcessing();
case 'done':
event = DishJobDone(DishResult.fromJson(json));
case 'failed':
event = DishJobFailed(json['error'] as String? ?? 'Recognition failed');
}
if (event != null) {
yield event;
if (event is DishJobDone || event is DishJobFailed) return;
}
}
}
/// Opens a raw SSE connection and emits (eventName, jsonData) pairs.
///
/// Uses [http.Client] instead of Dio because on Flutter Web Dio relies on
/// XHR which does not support SSE streaming.
Stream<(String, Map<String, dynamic>)> _openSseStream(Uri streamUri) async* {
final token = await _storage.getAccessToken();
final language = _languageGetter();
final httpClient = http.Client();
try {
final request = http.Request('GET', streamUri)
..headers['Authorization'] = token != null ? 'Bearer $token' : ''
..headers['Accept'] = 'text/event-stream'
..headers['Accept-Language'] = language
..headers['Cache-Control'] = 'no-cache';
final response = await httpClient.send(request).timeout(
const Duration(seconds: 30),
);
final buffer = StringBuffer();
String? currentEventName;
await for (final chunk in response.stream.map(utf8.decode)) {
buffer.write(chunk);
final text = buffer.toString();
int doubleNewlineIndex;
var remaining = text;
while ((doubleNewlineIndex = remaining.indexOf('\n\n')) != -1) {
final message = remaining.substring(0, doubleNewlineIndex);
remaining = remaining.substring(doubleNewlineIndex + 2);
for (final line in message.split('\n')) {
if (line.startsWith('event:')) {
currentEventName = line.substring(6).trim();
} else if (line.startsWith('data:')) {
final dataPayload = line.substring(5).trim();
try {
final jsonData = jsonDecode(dataPayload) as Map<String, dynamic>;
if (currentEventName != null) {
yield (currentEventName, jsonData);
}
} catch (_) {
// Malformed JSON — skip this message.
}
currentEventName = null;
}
}
}
buffer
..clear()
..write(remaining);
}
} finally {
httpClient.close();
}
}
Future<Map<String, String>> _buildImagePayload(XFile image) async {
final bytes = await image.readAsBytes();
final base64Data = base64Encode(bytes);
// XFile.mimeType may be null on some platforms; fall back to path extension.
final mimeType = image.mimeType ??
(image.path.toLowerCase().endsWith('.png') ? 'image/png' : 'image/jpeg');
return {'image_base64': base64Data, 'mime_type': mimeType};
}
}
final recognitionServiceProvider = Provider<RecognitionService>((ref) {
final config = ref.read(appConfigProvider);
final storage = ref.read(secureStorageProvider);
return RecognitionService(
ref.read(apiClientProvider),
storage,
config,
() => ref.read(languageProvider),
);
});