123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package sarama
- import "sync"
- // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
- // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
- // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
- //
- // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
- // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
- // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
- //
- // For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
- // be set to true in its configuration.
- type SyncProducer interface {
- // SendMessage produces a given message, and returns only when it either has
- // succeeded or failed to produce. It will return the partition and the offset
- // of the produced message, or an error if the message failed to produce.
- SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
- // SendMessages produces a given set of messages, and returns only when all
- // messages in the set have either succeeded or failed. Note that messages
- // can succeed and fail individually; if some succeed and some fail,
- // SendMessages will return an error.
- SendMessages(msgs []*ProducerMessage) error
- // Close shuts down the producer; you must call this function before a producer
- // object passes out of scope, as it may otherwise leak memory.
- // You must call this before calling Close on the underlying client.
- Close() error
- // TxnStatus return current producer transaction status.
- TxnStatus() ProducerTxnStatusFlag
- // IsTransactional return true when current producer is transactional.
- IsTransactional() bool
- // BeginTxn mark current transaction as ready.
- BeginTxn() error
- // CommitTxn commit current transaction.
- CommitTxn() error
- // AbortTxn abort current transaction.
- AbortTxn() error
- // AddOffsetsToTxn add associated offsets to current transaction.
- AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
- // AddMessageToTxn add message offsets to current transaction.
- AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
- }
- type syncProducer struct {
- producer *asyncProducer
- wg sync.WaitGroup
- }
- // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
- func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
- if config == nil {
- config = NewConfig()
- config.Producer.Return.Successes = true
- }
- if err := verifyProducerConfig(config); err != nil {
- return nil, err
- }
- p, err := NewAsyncProducer(addrs, config)
- if err != nil {
- return nil, err
- }
- return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
- }
- // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
- // necessary to call Close() on the underlying client when shutting down this producer.
- func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
- if err := verifyProducerConfig(client.Config()); err != nil {
- return nil, err
- }
- p, err := NewAsyncProducerFromClient(client)
- if err != nil {
- return nil, err
- }
- return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
- }
- func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
- sp := &syncProducer{producer: p}
- sp.wg.Add(2)
- go withRecover(sp.handleSuccesses)
- go withRecover(sp.handleErrors)
- return sp
- }
- func verifyProducerConfig(config *Config) error {
- if !config.Producer.Return.Errors {
- return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
- }
- if !config.Producer.Return.Successes {
- return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
- }
- return nil
- }
- func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
- expectation := make(chan *ProducerError, 1)
- msg.expectation = expectation
- sp.producer.Input() <- msg
- if pErr := <-expectation; pErr != nil {
- return -1, -1, pErr.Err
- }
- return msg.Partition, msg.Offset, nil
- }
- func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
- expectations := make(chan chan *ProducerError, len(msgs))
- go func() {
- for _, msg := range msgs {
- expectation := make(chan *ProducerError, 1)
- msg.expectation = expectation
- sp.producer.Input() <- msg
- expectations <- expectation
- }
- close(expectations)
- }()
- var errors ProducerErrors
- for expectation := range expectations {
- if pErr := <-expectation; pErr != nil {
- errors = append(errors, pErr)
- }
- }
- if len(errors) > 0 {
- return errors
- }
- return nil
- }
- func (sp *syncProducer) handleSuccesses() {
- defer sp.wg.Done()
- for msg := range sp.producer.Successes() {
- expectation := msg.expectation
- expectation <- nil
- }
- }
- func (sp *syncProducer) handleErrors() {
- defer sp.wg.Done()
- for err := range sp.producer.Errors() {
- expectation := err.Msg.expectation
- expectation <- err
- }
- }
- func (sp *syncProducer) Close() error {
- sp.producer.AsyncClose()
- sp.wg.Wait()
- return nil
- }
- func (sp *syncProducer) IsTransactional() bool {
- return sp.producer.IsTransactional()
- }
- func (sp *syncProducer) BeginTxn() error {
- return sp.producer.BeginTxn()
- }
- func (sp *syncProducer) CommitTxn() error {
- return sp.producer.CommitTxn()
- }
- func (sp *syncProducer) AbortTxn() error {
- return sp.producer.AbortTxn()
- }
- func (sp *syncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
- return sp.producer.AddOffsetsToTxn(offsets, groupId)
- }
- func (sp *syncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
- return sp.producer.AddMessageToTxn(msg, groupId, metadata)
- }
- func (p *syncProducer) TxnStatus() ProducerTxnStatusFlag {
- return p.producer.TxnStatus()
- }
|