sync_producer.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
  4. // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
  5. // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
  6. //
  7. // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
  8. // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
  9. // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
  10. //
  11. // For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
  12. // be set to true in its configuration.
  13. type SyncProducer interface {
  14. // SendMessage produces a given message, and returns only when it either has
  15. // succeeded or failed to produce. It will return the partition and the offset
  16. // of the produced message, or an error if the message failed to produce.
  17. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  18. // SendMessages produces a given set of messages, and returns only when all
  19. // messages in the set have either succeeded or failed. Note that messages
  20. // can succeed and fail individually; if some succeed and some fail,
  21. // SendMessages will return an error.
  22. SendMessages(msgs []*ProducerMessage) error
  23. // Close shuts down the producer; you must call this function before a producer
  24. // object passes out of scope, as it may otherwise leak memory.
  25. // You must call this before calling Close on the underlying client.
  26. Close() error
  27. // TxnStatus return current producer transaction status.
  28. TxnStatus() ProducerTxnStatusFlag
  29. // IsTransactional return true when current producer is transactional.
  30. IsTransactional() bool
  31. // BeginTxn mark current transaction as ready.
  32. BeginTxn() error
  33. // CommitTxn commit current transaction.
  34. CommitTxn() error
  35. // AbortTxn abort current transaction.
  36. AbortTxn() error
  37. // AddOffsetsToTxn add associated offsets to current transaction.
  38. AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
  39. // AddMessageToTxn add message offsets to current transaction.
  40. AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
  41. }
  42. type syncProducer struct {
  43. producer *asyncProducer
  44. wg sync.WaitGroup
  45. }
  46. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  47. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  48. if config == nil {
  49. config = NewConfig()
  50. config.Producer.Return.Successes = true
  51. }
  52. if err := verifyProducerConfig(config); err != nil {
  53. return nil, err
  54. }
  55. p, err := NewAsyncProducer(addrs, config)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  60. }
  61. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  62. // necessary to call Close() on the underlying client when shutting down this producer.
  63. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  64. if err := verifyProducerConfig(client.Config()); err != nil {
  65. return nil, err
  66. }
  67. p, err := NewAsyncProducerFromClient(client)
  68. if err != nil {
  69. return nil, err
  70. }
  71. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  72. }
  73. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  74. sp := &syncProducer{producer: p}
  75. sp.wg.Add(2)
  76. go withRecover(sp.handleSuccesses)
  77. go withRecover(sp.handleErrors)
  78. return sp
  79. }
  80. func verifyProducerConfig(config *Config) error {
  81. if !config.Producer.Return.Errors {
  82. return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
  83. }
  84. if !config.Producer.Return.Successes {
  85. return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
  86. }
  87. return nil
  88. }
  89. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  90. expectation := make(chan *ProducerError, 1)
  91. msg.expectation = expectation
  92. sp.producer.Input() <- msg
  93. if pErr := <-expectation; pErr != nil {
  94. return -1, -1, pErr.Err
  95. }
  96. return msg.Partition, msg.Offset, nil
  97. }
  98. func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
  99. expectations := make(chan chan *ProducerError, len(msgs))
  100. go func() {
  101. for _, msg := range msgs {
  102. expectation := make(chan *ProducerError, 1)
  103. msg.expectation = expectation
  104. sp.producer.Input() <- msg
  105. expectations <- expectation
  106. }
  107. close(expectations)
  108. }()
  109. var errors ProducerErrors
  110. for expectation := range expectations {
  111. if pErr := <-expectation; pErr != nil {
  112. errors = append(errors, pErr)
  113. }
  114. }
  115. if len(errors) > 0 {
  116. return errors
  117. }
  118. return nil
  119. }
  120. func (sp *syncProducer) handleSuccesses() {
  121. defer sp.wg.Done()
  122. for msg := range sp.producer.Successes() {
  123. expectation := msg.expectation
  124. expectation <- nil
  125. }
  126. }
  127. func (sp *syncProducer) handleErrors() {
  128. defer sp.wg.Done()
  129. for err := range sp.producer.Errors() {
  130. expectation := err.Msg.expectation
  131. expectation <- err
  132. }
  133. }
  134. func (sp *syncProducer) Close() error {
  135. sp.producer.AsyncClose()
  136. sp.wg.Wait()
  137. return nil
  138. }
  139. func (sp *syncProducer) IsTransactional() bool {
  140. return sp.producer.IsTransactional()
  141. }
  142. func (sp *syncProducer) BeginTxn() error {
  143. return sp.producer.BeginTxn()
  144. }
  145. func (sp *syncProducer) CommitTxn() error {
  146. return sp.producer.CommitTxn()
  147. }
  148. func (sp *syncProducer) AbortTxn() error {
  149. return sp.producer.AbortTxn()
  150. }
  151. func (sp *syncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
  152. return sp.producer.AddOffsetsToTxn(offsets, groupId)
  153. }
  154. func (sp *syncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
  155. return sp.producer.AddMessageToTxn(msg, groupId, metadata)
  156. }
  157. func (p *syncProducer) TxnStatus() ProducerTxnStatusFlag {
  158. return p.producer.TxnStatus()
  159. }