package kafka import ( "context" "log/slog" "github.com/ThreeDotsLabs/watermill" wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" ) // Consumer wraps a Watermill Kafka subscriber for consuming a single topic within a consumer group. type Consumer struct { subscriber *wmkafka.Subscriber topic string } // NewConsumer creates a Consumer subscribed to the given topic within a consumer group. func NewConsumer(brokers []string, groupID, topic string) (*Consumer, error) { subscriber, createError := wmkafka.NewSubscriber( wmkafka.SubscriberConfig{ Brokers: brokers, ConsumerGroup: groupID, Unmarshaler: wmkafka.DefaultMarshaler{}, OverwriteSaramaConfig: wmkafka.DefaultSaramaSubscriberConfig(), }, watermill.NopLogger{}, ) if createError != nil { return nil, createError } return &Consumer{subscriber: subscriber, topic: topic}, nil } // Run subscribes to the Kafka topic and writes job IDs to the out channel until runContext is cancelled. // Call this in a dedicated goroutine — it blocks until the context is done. // Each message is Ack'd after its job ID is successfully forwarded to the channel, // or Nack'd when the context is cancelled before forwarding completes. func (consumer *Consumer) Run(runContext context.Context, out chan<- string) { messageChannel, subscribeError := consumer.subscriber.Subscribe(runContext, consumer.topic) if subscribeError != nil { slog.Error("kafka consumer subscribe", "topic", consumer.topic, "err", subscribeError) return } for { select { case msg, ok := <-messageChannel: if !ok { return } select { case out <- string(msg.Payload): msg.Ack() case <-runContext.Done(): msg.Nack() return } case <-runContext.Done(): return } } } // Close shuts down the underlying Kafka subscriber. func (consumer *Consumer) Close() { _ = consumer.subscriber.Close() }