215 lines
6.7 KiB
Go
215 lines
6.7 KiB
Go
package recognition
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/food-ai/backend/internal/adapters/ai"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// JobRepository provides all DB operations on dish_recognition_jobs.
|
|
type JobRepository interface {
|
|
InsertJob(ctx context.Context, job *Job) error
|
|
GetJobByID(ctx context.Context, jobID string) (*Job, error)
|
|
UpdateJobStatus(ctx context.Context, jobID, status string, result *ai.DishResult, errMsg *string) error
|
|
QueuePosition(ctx context.Context, userPlan string, createdAt time.Time) (int, error)
|
|
NotifyJobUpdate(ctx context.Context, jobID string) error
|
|
ListTodayUnlinked(ctx context.Context, userID string) ([]*JobSummary, error)
|
|
ListAll(ctx context.Context, userID string) ([]*JobSummary, error)
|
|
}
|
|
|
|
// PostgresJobRepository implements JobRepository using a pgxpool.
|
|
type PostgresJobRepository struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// NewJobRepository creates a new PostgresJobRepository.
|
|
func NewJobRepository(pool *pgxpool.Pool) *PostgresJobRepository {
|
|
return &PostgresJobRepository{pool: pool}
|
|
}
|
|
|
|
// InsertJob inserts a new recognition job and populates the ID and CreatedAt fields.
|
|
func (repository *PostgresJobRepository) InsertJob(queryContext context.Context, job *Job) error {
|
|
return repository.pool.QueryRow(queryContext,
|
|
`INSERT INTO dish_recognition_jobs (user_id, user_plan, image_base64, mime_type, lang, target_date, target_meal_type)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
RETURNING id, created_at`,
|
|
job.UserID, job.UserPlan, job.ImageBase64, job.MimeType, job.Lang, job.TargetDate, job.TargetMealType,
|
|
).Scan(&job.ID, &job.CreatedAt)
|
|
}
|
|
|
|
// GetJobByID fetches a single job by primary key.
|
|
func (repository *PostgresJobRepository) GetJobByID(queryContext context.Context, jobID string) (*Job, error) {
|
|
var job Job
|
|
var resultJSON []byte
|
|
|
|
queryError := repository.pool.QueryRow(queryContext,
|
|
`SELECT id, user_id, user_plan, image_base64, mime_type, lang,
|
|
target_date::text, target_meal_type, status,
|
|
result, error, created_at, started_at, completed_at
|
|
FROM dish_recognition_jobs WHERE id = $1`,
|
|
jobID,
|
|
).Scan(
|
|
&job.ID, &job.UserID, &job.UserPlan,
|
|
&job.ImageBase64, &job.MimeType, &job.Lang,
|
|
&job.TargetDate, &job.TargetMealType, &job.Status,
|
|
&resultJSON, &job.Error, &job.CreatedAt, &job.StartedAt, &job.CompletedAt,
|
|
)
|
|
if queryError != nil {
|
|
return nil, queryError
|
|
}
|
|
|
|
if resultJSON != nil {
|
|
var dishResult ai.DishResult
|
|
if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil {
|
|
job.Result = &dishResult
|
|
}
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
// UpdateJobStatus transitions a job to a new status and records the result or error.
|
|
func (repository *PostgresJobRepository) UpdateJobStatus(
|
|
queryContext context.Context,
|
|
jobID, status string,
|
|
result *ai.DishResult,
|
|
errMsg *string,
|
|
) error {
|
|
var resultJSON []byte
|
|
if result != nil {
|
|
marshalledBytes, marshalError := json.Marshal(result)
|
|
if marshalError != nil {
|
|
return marshalError
|
|
}
|
|
resultJSON = marshalledBytes
|
|
}
|
|
|
|
switch status {
|
|
case JobStatusProcessing:
|
|
_, updateError := repository.pool.Exec(queryContext,
|
|
`UPDATE dish_recognition_jobs SET status = $1, started_at = now() WHERE id = $2`,
|
|
status, jobID,
|
|
)
|
|
return updateError
|
|
default:
|
|
_, updateError := repository.pool.Exec(queryContext,
|
|
`UPDATE dish_recognition_jobs
|
|
SET status = $1, result = $2, error = $3, completed_at = now()
|
|
WHERE id = $4`,
|
|
status, resultJSON, errMsg, jobID,
|
|
)
|
|
return updateError
|
|
}
|
|
}
|
|
|
|
// QueuePosition counts jobs ahead of createdAt in the same plan's queue.
|
|
func (repository *PostgresJobRepository) QueuePosition(
|
|
queryContext context.Context,
|
|
userPlan string,
|
|
createdAt time.Time,
|
|
) (int, error) {
|
|
var position int
|
|
queryError := repository.pool.QueryRow(queryContext,
|
|
`SELECT COUNT(*) FROM dish_recognition_jobs
|
|
WHERE status IN ('pending', 'processing')
|
|
AND user_plan = $1
|
|
AND created_at < $2`,
|
|
userPlan, createdAt,
|
|
).Scan(&position)
|
|
return position, queryError
|
|
}
|
|
|
|
// NotifyJobUpdate sends a PostgreSQL NOTIFY on the job_update channel.
|
|
func (repository *PostgresJobRepository) NotifyJobUpdate(queryContext context.Context, jobID string) error {
|
|
_, notifyError := repository.pool.Exec(queryContext, `SELECT pg_notify('job_update', $1)`, jobID)
|
|
return notifyError
|
|
}
|
|
|
|
// ListAll returns all recognition jobs for the given user, newest first.
|
|
func (repository *PostgresJobRepository) ListAll(queryContext context.Context, userID string) ([]*JobSummary, error) {
|
|
rows, queryError := repository.pool.Query(queryContext,
|
|
`SELECT id, status, target_date::text, target_meal_type,
|
|
result, error, created_at
|
|
FROM dish_recognition_jobs
|
|
WHERE user_id = $1
|
|
ORDER BY created_at DESC`,
|
|
userID,
|
|
)
|
|
if queryError != nil {
|
|
return nil, queryError
|
|
}
|
|
defer rows.Close()
|
|
|
|
var summaries []*JobSummary
|
|
for rows.Next() {
|
|
var summary JobSummary
|
|
var resultJSON []byte
|
|
scanError := rows.Scan(
|
|
&summary.ID, &summary.Status, &summary.TargetDate, &summary.TargetMealType,
|
|
&resultJSON, &summary.Error, &summary.CreatedAt,
|
|
)
|
|
if scanError != nil {
|
|
return nil, scanError
|
|
}
|
|
if resultJSON != nil {
|
|
var dishResult ai.DishResult
|
|
if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil {
|
|
summary.Result = &dishResult
|
|
}
|
|
}
|
|
summaries = append(summaries, &summary)
|
|
}
|
|
if rowsError := rows.Err(); rowsError != nil {
|
|
return nil, rowsError
|
|
}
|
|
return summaries, nil
|
|
}
|
|
|
|
// ListTodayUnlinked returns today's jobs for the given user that have not yet been
|
|
// linked to any meal_diary entry.
|
|
func (repository *PostgresJobRepository) ListTodayUnlinked(queryContext context.Context, userID string) ([]*JobSummary, error) {
|
|
rows, queryError := repository.pool.Query(queryContext,
|
|
`SELECT id, status, target_date::text, target_meal_type,
|
|
result, error, created_at
|
|
FROM dish_recognition_jobs
|
|
WHERE user_id = $1
|
|
AND created_at::date = CURRENT_DATE
|
|
AND id NOT IN (
|
|
SELECT job_id FROM meal_diary WHERE job_id IS NOT NULL
|
|
)
|
|
ORDER BY created_at DESC`,
|
|
userID,
|
|
)
|
|
if queryError != nil {
|
|
return nil, queryError
|
|
}
|
|
defer rows.Close()
|
|
|
|
var summaries []*JobSummary
|
|
for rows.Next() {
|
|
var summary JobSummary
|
|
var resultJSON []byte
|
|
scanError := rows.Scan(
|
|
&summary.ID, &summary.Status, &summary.TargetDate, &summary.TargetMealType,
|
|
&resultJSON, &summary.Error, &summary.CreatedAt,
|
|
)
|
|
if scanError != nil {
|
|
return nil, scanError
|
|
}
|
|
if resultJSON != nil {
|
|
var dishResult ai.DishResult
|
|
if unmarshalError := json.Unmarshal(resultJSON, &dishResult); unmarshalError == nil {
|
|
summary.Result = &dishResult
|
|
}
|
|
}
|
|
summaries = append(summaries, &summary)
|
|
}
|
|
if rowsError := rows.Err(); rowsError != nil {
|
|
return nil, rowsError
|
|
}
|
|
return summaries, nil
|
|
}
|