package recognition import ( "context" "encoding/json" "time" "github.com/food-ai/backend/internal/adapters/ai" "github.com/jackc/pgx/v5/pgxpool" ) // JobRepository provides all DB operations on dish_recognition_jobs. type JobRepository interface { InsertJob(ctx context.Context, job *Job) error GetJobByID(ctx context.Context, jobID string) (*Job, error) UpdateJobStatus(ctx context.Context, jobID, status string, result *ai.DishResult, errMsg *string) error QueuePosition(ctx context.Context, userPlan string, createdAt time.Time) (int, error) NotifyJobUpdate(ctx context.Context, jobID string) error ListTodayUnlinked(ctx context.Context, userID string) ([]*JobSummary, error) ListAll(ctx context.Context, userID string) ([]*JobSummary, error) } // PostgresJobRepository implements JobRepository using a pgxpool. type PostgresJobRepository struct { pool *pgxpool.Pool } // NewJobRepository creates a new PostgresJobRepository. func NewJobRepository(pool *pgxpool.Pool) *PostgresJobRepository { return &PostgresJobRepository{pool: pool} } // InsertJob inserts a new recognition job and populates the ID and CreatedAt fields. func (repository *PostgresJobRepository) InsertJob(queryContext context.Context, job *Job) error { return repository.pool.QueryRow(queryContext, `INSERT INTO dish_recognition_jobs (user_id, user_plan, image_base64, mime_type, lang, target_date, target_meal_type) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id, created_at`, job.UserID, job.UserPlan, job.ImageBase64, job.MimeType, job.Lang, job.TargetDate, job.TargetMealType, ).Scan(&job.ID, &job.CreatedAt) } // GetJobByID fetches a single job by primary key. func (repository *PostgresJobRepository) GetJobByID(queryContext context.Context, jobID string) (*Job, error) { var job Job var resultJSON []byte queryError := repository.pool.QueryRow(queryContext, `SELECT id, user_id, user_plan, image_base64, mime_type, lang, target_date::text, target_meal_type, status, result, error, created_at, started_at, completed_at FROM dish_recognition_jobs WHERE id = $1`, jobID, ).Scan( &job.ID, &job.UserID, &job.UserPlan, &job.ImageBase64, &job.MimeType, &job.Lang, &job.TargetDate, &job.TargetMealType, &job.Status, &resultJSON, &job.Error, &job.CreatedAt, &job.StartedAt, &job.CompletedAt, ) if queryError != nil { return nil, queryError } if resultJSON != nil { var dishResult ai.DishResult if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil { job.Result = &dishResult } } return &job, nil } // UpdateJobStatus transitions a job to a new status and records the result or error. func (repository *PostgresJobRepository) UpdateJobStatus( queryContext context.Context, jobID, status string, result *ai.DishResult, errMsg *string, ) error { var resultJSON []byte if result != nil { marshalledBytes, marshalError := json.Marshal(result) if marshalError != nil { return marshalError } resultJSON = marshalledBytes } switch status { case JobStatusProcessing: _, updateError := repository.pool.Exec(queryContext, `UPDATE dish_recognition_jobs SET status = $1, started_at = now() WHERE id = $2`, status, jobID, ) return updateError default: _, updateError := repository.pool.Exec(queryContext, `UPDATE dish_recognition_jobs SET status = $1, result = $2, error = $3, completed_at = now() WHERE id = $4`, status, resultJSON, errMsg, jobID, ) return updateError } } // QueuePosition counts jobs ahead of createdAt in the same plan's queue. func (repository *PostgresJobRepository) QueuePosition( queryContext context.Context, userPlan string, createdAt time.Time, ) (int, error) { var position int queryError := repository.pool.QueryRow(queryContext, `SELECT COUNT(*) FROM dish_recognition_jobs WHERE status IN ('pending', 'processing') AND user_plan = $1 AND created_at < $2`, userPlan, createdAt, ).Scan(&position) return position, queryError } // NotifyJobUpdate sends a PostgreSQL NOTIFY on the job_update channel. func (repository *PostgresJobRepository) NotifyJobUpdate(queryContext context.Context, jobID string) error { _, notifyError := repository.pool.Exec(queryContext, `SELECT pg_notify('job_update', $1)`, jobID) return notifyError } // ListAll returns all recognition jobs for the given user, newest first. func (repository *PostgresJobRepository) ListAll(queryContext context.Context, userID string) ([]*JobSummary, error) { rows, queryError := repository.pool.Query(queryContext, `SELECT id, status, target_date::text, target_meal_type, result, error, created_at FROM dish_recognition_jobs WHERE user_id = $1 ORDER BY created_at DESC`, userID, ) if queryError != nil { return nil, queryError } defer rows.Close() var summaries []*JobSummary for rows.Next() { var summary JobSummary var resultJSON []byte scanError := rows.Scan( &summary.ID, &summary.Status, &summary.TargetDate, &summary.TargetMealType, &resultJSON, &summary.Error, &summary.CreatedAt, ) if scanError != nil { return nil, scanError } if resultJSON != nil { var dishResult ai.DishResult if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil { summary.Result = &dishResult } } summaries = append(summaries, &summary) } if rowsError := rows.Err(); rowsError != nil { return nil, rowsError } return summaries, nil } // ListTodayUnlinked returns today's jobs for the given user that have not yet been // linked to any meal_diary entry. func (repository *PostgresJobRepository) ListTodayUnlinked(queryContext context.Context, userID string) ([]*JobSummary, error) { rows, queryError := repository.pool.Query(queryContext, `SELECT id, status, target_date::text, target_meal_type, result, error, created_at FROM dish_recognition_jobs WHERE user_id = $1 AND created_at::date = CURRENT_DATE AND id NOT IN ( SELECT job_id FROM meal_diary WHERE job_id IS NOT NULL ) ORDER BY created_at DESC`, userID, ) if queryError != nil { return nil, queryError } defer rows.Close() var summaries []*JobSummary for rows.Next() { var summary JobSummary var resultJSON []byte scanError := rows.Scan( &summary.ID, &summary.Status, &summary.TargetDate, &summary.TargetMealType, &resultJSON, &summary.Error, &summary.CreatedAt, ) if scanError != nil { return nil, scanError } if resultJSON != nil { var dishResult ai.DishResult if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil { summary.Result = &dishResult } } summaries = append(summaries, &summary) } if rowsError := rows.Err(); rowsError != nil { return nil, rowsError } return summaries, nil }