async_producer.go 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "sync"
  8. "time"
  9. "github.com/eapache/go-resiliency/breaker"
  10. "github.com/eapache/queue"
  11. "github.com/rcrowley/go-metrics"
  12. )
  13. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  14. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  15. // and parses responses for errors. You must read from the Errors() channel or the
  16. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  17. // leaks and message lost: it will not be garbage-collected automatically when it passes
  18. // out of scope and buffered messages may not be flushed.
  19. type AsyncProducer interface {
  20. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  21. // when both the Errors and Successes channels have been closed. When calling
  22. // AsyncClose, you *must* continue to read from those channels in order to
  23. // drain the results of any messages in flight.
  24. AsyncClose()
  25. // Close shuts down the producer and waits for any buffered messages to be
  26. // flushed. You must call this function before a producer object passes out of
  27. // scope, as it may otherwise leak memory. You must call this before process
  28. // shutting down, or you may lose messages. You must call this before calling
  29. // Close on the underlying client.
  30. Close() error
  31. // Input is the input channel for the user to write messages to that they
  32. // wish to send.
  33. Input() chan<- *ProducerMessage
  34. // Successes is the success output channel back to the user when Return.Successes is
  35. // enabled. If Return.Successes is true, you MUST read from this channel or the
  36. // Producer will deadlock. It is suggested that you send and read messages
  37. // together in a single select statement.
  38. Successes() <-chan *ProducerMessage
  39. // Errors is the error output channel back to the user. You MUST read from this
  40. // channel or the Producer will deadlock when the channel is full. Alternatively,
  41. // you can set Producer.Return.Errors in your config to false, which prevents
  42. // errors to be returned.
  43. Errors() <-chan *ProducerError
  44. // IsTransactional return true when current producer is transactional.
  45. IsTransactional() bool
  46. // TxnStatus return current producer transaction status.
  47. TxnStatus() ProducerTxnStatusFlag
  48. // BeginTxn mark current transaction as ready.
  49. BeginTxn() error
  50. // CommitTxn commit current transaction.
  51. CommitTxn() error
  52. // AbortTxn abort current transaction.
  53. AbortTxn() error
  54. // AddOffsetsToTxn add associated offsets to current transaction.
  55. AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
  56. // AddMessageToTxn add message offsets to current transaction.
  57. AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
  58. }
  59. type asyncProducer struct {
  60. client Client
  61. conf *Config
  62. errors chan *ProducerError
  63. input, successes, retries chan *ProducerMessage
  64. inFlight sync.WaitGroup
  65. brokers map[*Broker]*brokerProducer
  66. brokerRefs map[*brokerProducer]int
  67. brokerLock sync.Mutex
  68. txnmgr *transactionManager
  69. txLock sync.Mutex
  70. metricsRegistry metrics.Registry
  71. }
  72. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  73. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  74. client, err := NewClient(addrs, conf)
  75. if err != nil {
  76. return nil, err
  77. }
  78. return newAsyncProducer(client)
  79. }
  80. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  81. // necessary to call Close() on the underlying client when shutting down this producer.
  82. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  83. // For clients passed in by the client, ensure we don't
  84. // call Close() on it.
  85. cli := &nopCloserClient{client}
  86. return newAsyncProducer(cli)
  87. }
  88. func newAsyncProducer(client Client) (AsyncProducer, error) {
  89. // Check that we are not dealing with a closed Client before processing any other arguments
  90. if client.Closed() {
  91. return nil, ErrClosedClient
  92. }
  93. txnmgr, err := newTransactionManager(client.Config(), client)
  94. if err != nil {
  95. return nil, err
  96. }
  97. p := &asyncProducer{
  98. client: client,
  99. conf: client.Config(),
  100. errors: make(chan *ProducerError),
  101. input: make(chan *ProducerMessage),
  102. successes: make(chan *ProducerMessage),
  103. retries: make(chan *ProducerMessage),
  104. brokers: make(map[*Broker]*brokerProducer),
  105. brokerRefs: make(map[*brokerProducer]int),
  106. txnmgr: txnmgr,
  107. metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry),
  108. }
  109. // launch our singleton dispatchers
  110. go withRecover(p.dispatcher)
  111. go withRecover(p.retryHandler)
  112. return p, nil
  113. }
  114. type flagSet int8
  115. const (
  116. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  117. fin // final message from partitionProducer to brokerProducer and back
  118. shutdown // start the shutdown process
  119. endtxn // endtxn
  120. committxn // endtxn
  121. aborttxn // endtxn
  122. )
  123. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  124. type ProducerMessage struct {
  125. Topic string // The Kafka topic for this message.
  126. // The partitioning key for this message. Pre-existing Encoders include
  127. // StringEncoder and ByteEncoder.
  128. Key Encoder
  129. // The actual message to store in Kafka. Pre-existing Encoders include
  130. // StringEncoder and ByteEncoder.
  131. Value Encoder
  132. // The headers are key-value pairs that are transparently passed
  133. // by Kafka between producers and consumers.
  134. Headers []RecordHeader
  135. // This field is used to hold arbitrary data you wish to include so it
  136. // will be available when receiving on the Successes and Errors channels.
  137. // Sarama completely ignores this field and is only to be used for
  138. // pass-through data.
  139. Metadata interface{}
  140. // Below this point are filled in by the producer as the message is processed
  141. // Offset is the offset of the message stored on the broker. This is only
  142. // guaranteed to be defined if the message was successfully delivered and
  143. // RequiredAcks is not NoResponse.
  144. Offset int64
  145. // Partition is the partition that the message was sent to. This is only
  146. // guaranteed to be defined if the message was successfully delivered.
  147. Partition int32
  148. // Timestamp can vary in behavior depending on broker configuration, being
  149. // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
  150. // and requiring version at least 0.10.0.
  151. //
  152. // When configured to CreateTime, the timestamp is specified by the producer
  153. // either by explicitly setting this field, or when the message is added
  154. // to a produce set.
  155. //
  156. // When configured to LogAppendTime, the timestamp assigned to the message
  157. // by the broker. This is only guaranteed to be defined if the message was
  158. // successfully delivered and RequiredAcks is not NoResponse.
  159. Timestamp time.Time
  160. retries int
  161. flags flagSet
  162. expectation chan *ProducerError
  163. sequenceNumber int32
  164. producerEpoch int16
  165. hasSequence bool
  166. }
  167. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  168. func (m *ProducerMessage) ByteSize(version int) int {
  169. var size int
  170. if version >= 2 {
  171. size = maximumRecordOverhead
  172. for _, h := range m.Headers {
  173. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  174. }
  175. } else {
  176. size = producerMessageOverhead
  177. }
  178. if m.Key != nil {
  179. size += m.Key.Length()
  180. }
  181. if m.Value != nil {
  182. size += m.Value.Length()
  183. }
  184. return size
  185. }
  186. func (m *ProducerMessage) clear() {
  187. m.flags = 0
  188. m.retries = 0
  189. m.sequenceNumber = 0
  190. m.producerEpoch = 0
  191. m.hasSequence = false
  192. }
  193. // ProducerError is the type of error generated when the producer fails to deliver a message.
  194. // It contains the original ProducerMessage as well as the actual error value.
  195. type ProducerError struct {
  196. Msg *ProducerMessage
  197. Err error
  198. }
  199. func (pe ProducerError) Error() string {
  200. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  201. }
  202. func (pe ProducerError) Unwrap() error {
  203. return pe.Err
  204. }
  205. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  206. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  207. // when closing a producer.
  208. type ProducerErrors []*ProducerError
  209. func (pe ProducerErrors) Error() string {
  210. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  211. }
  212. func (p *asyncProducer) IsTransactional() bool {
  213. return p.txnmgr.isTransactional()
  214. }
  215. func (p *asyncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
  216. offsets := make(map[string][]*PartitionOffsetMetadata)
  217. offsets[msg.Topic] = []*PartitionOffsetMetadata{
  218. {
  219. Partition: msg.Partition,
  220. Offset: msg.Offset + 1,
  221. Metadata: metadata,
  222. },
  223. }
  224. return p.AddOffsetsToTxn(offsets, groupId)
  225. }
  226. func (p *asyncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
  227. p.txLock.Lock()
  228. defer p.txLock.Unlock()
  229. if !p.IsTransactional() {
  230. DebugLogger.Printf("producer/txnmgr [%s] attempt to call AddOffsetsToTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
  231. return ErrNonTransactedProducer
  232. }
  233. DebugLogger.Printf("producer/txnmgr [%s] add offsets to transaction\n", p.txnmgr.transactionalID)
  234. return p.txnmgr.addOffsetsToTxn(offsets, groupId)
  235. }
  236. func (p *asyncProducer) TxnStatus() ProducerTxnStatusFlag {
  237. return p.txnmgr.currentTxnStatus()
  238. }
  239. func (p *asyncProducer) BeginTxn() error {
  240. p.txLock.Lock()
  241. defer p.txLock.Unlock()
  242. if !p.IsTransactional() {
  243. DebugLogger.Println("producer/txnmgr attempt to call BeginTxn on a non-transactional producer")
  244. return ErrNonTransactedProducer
  245. }
  246. return p.txnmgr.transitionTo(ProducerTxnFlagInTransaction, nil)
  247. }
  248. func (p *asyncProducer) CommitTxn() error {
  249. p.txLock.Lock()
  250. defer p.txLock.Unlock()
  251. if !p.IsTransactional() {
  252. DebugLogger.Printf("producer/txnmgr [%s] attempt to call CommitTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
  253. return ErrNonTransactedProducer
  254. }
  255. DebugLogger.Printf("producer/txnmgr [%s] committing transaction\n", p.txnmgr.transactionalID)
  256. err := p.finishTransaction(true)
  257. if err != nil {
  258. return err
  259. }
  260. DebugLogger.Printf("producer/txnmgr [%s] transaction committed\n", p.txnmgr.transactionalID)
  261. return nil
  262. }
  263. func (p *asyncProducer) AbortTxn() error {
  264. p.txLock.Lock()
  265. defer p.txLock.Unlock()
  266. if !p.IsTransactional() {
  267. DebugLogger.Printf("producer/txnmgr [%s] attempt to call AbortTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
  268. return ErrNonTransactedProducer
  269. }
  270. DebugLogger.Printf("producer/txnmgr [%s] aborting transaction\n", p.txnmgr.transactionalID)
  271. err := p.finishTransaction(false)
  272. if err != nil {
  273. return err
  274. }
  275. DebugLogger.Printf("producer/txnmgr [%s] transaction aborted\n", p.txnmgr.transactionalID)
  276. return nil
  277. }
  278. func (p *asyncProducer) finishTransaction(commit bool) error {
  279. p.inFlight.Add(1)
  280. if commit {
  281. p.input <- &ProducerMessage{flags: endtxn | committxn}
  282. } else {
  283. p.input <- &ProducerMessage{flags: endtxn | aborttxn}
  284. }
  285. p.inFlight.Wait()
  286. return p.txnmgr.finishTransaction(commit)
  287. }
  288. func (p *asyncProducer) Errors() <-chan *ProducerError {
  289. return p.errors
  290. }
  291. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  292. return p.successes
  293. }
  294. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  295. return p.input
  296. }
  297. func (p *asyncProducer) Close() error {
  298. p.AsyncClose()
  299. if p.conf.Producer.Return.Successes {
  300. go withRecover(func() {
  301. for range p.successes {
  302. }
  303. })
  304. }
  305. var pErrs ProducerErrors
  306. if p.conf.Producer.Return.Errors {
  307. for event := range p.errors {
  308. pErrs = append(pErrs, event)
  309. }
  310. } else {
  311. <-p.errors
  312. }
  313. if len(pErrs) > 0 {
  314. return pErrs
  315. }
  316. return nil
  317. }
  318. func (p *asyncProducer) AsyncClose() {
  319. go withRecover(p.shutdown)
  320. }
  321. // singleton
  322. // dispatches messages by topic
  323. func (p *asyncProducer) dispatcher() {
  324. handlers := make(map[string]chan<- *ProducerMessage)
  325. shuttingDown := false
  326. for msg := range p.input {
  327. if msg == nil {
  328. Logger.Println("Something tried to send a nil message, it was ignored.")
  329. continue
  330. }
  331. if msg.flags&endtxn != 0 {
  332. var err error
  333. if msg.flags&committxn != 0 {
  334. err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
  335. } else {
  336. err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
  337. }
  338. if err != nil {
  339. Logger.Printf("producer/txnmgr unable to end transaction %s", err)
  340. }
  341. p.inFlight.Done()
  342. continue
  343. }
  344. if msg.flags&shutdown != 0 {
  345. shuttingDown = true
  346. p.inFlight.Done()
  347. continue
  348. }
  349. if msg.retries == 0 {
  350. if shuttingDown {
  351. // we can't just call returnError here because that decrements the wait group,
  352. // which hasn't been incremented yet for this message, and shouldn't be
  353. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  354. if p.conf.Producer.Return.Errors {
  355. p.errors <- pErr
  356. } else {
  357. Logger.Println(pErr)
  358. }
  359. continue
  360. }
  361. p.inFlight.Add(1)
  362. // Ignore retried msg, there are already in txn.
  363. // Can't produce new record when transaction is not started.
  364. if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
  365. Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
  366. p.returnError(msg, ErrTransactionNotReady)
  367. continue
  368. }
  369. }
  370. for _, interceptor := range p.conf.Producer.Interceptors {
  371. msg.safelyApplyInterceptor(interceptor)
  372. }
  373. version := 1
  374. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  375. version = 2
  376. } else if msg.Headers != nil {
  377. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  378. continue
  379. }
  380. size := msg.ByteSize(version)
  381. if size > p.conf.Producer.MaxMessageBytes {
  382. p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
  383. continue
  384. }
  385. handler := handlers[msg.Topic]
  386. if handler == nil {
  387. handler = p.newTopicProducer(msg.Topic)
  388. handlers[msg.Topic] = handler
  389. }
  390. handler <- msg
  391. }
  392. for _, handler := range handlers {
  393. close(handler)
  394. }
  395. }
  396. // one per topic
  397. // partitions messages, then dispatches them by partition
  398. type topicProducer struct {
  399. parent *asyncProducer
  400. topic string
  401. input <-chan *ProducerMessage
  402. breaker *breaker.Breaker
  403. handlers map[int32]chan<- *ProducerMessage
  404. partitioner Partitioner
  405. }
  406. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  407. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  408. tp := &topicProducer{
  409. parent: p,
  410. topic: topic,
  411. input: input,
  412. breaker: breaker.New(3, 1, 10*time.Second),
  413. handlers: make(map[int32]chan<- *ProducerMessage),
  414. partitioner: p.conf.Producer.Partitioner(topic),
  415. }
  416. go withRecover(tp.dispatch)
  417. return input
  418. }
  419. func (tp *topicProducer) dispatch() {
  420. for msg := range tp.input {
  421. if msg.retries == 0 {
  422. if err := tp.partitionMessage(msg); err != nil {
  423. tp.parent.returnError(msg, err)
  424. continue
  425. }
  426. }
  427. handler := tp.handlers[msg.Partition]
  428. if handler == nil {
  429. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  430. tp.handlers[msg.Partition] = handler
  431. }
  432. handler <- msg
  433. }
  434. for _, handler := range tp.handlers {
  435. close(handler)
  436. }
  437. }
  438. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  439. var partitions []int32
  440. err := tp.breaker.Run(func() (err error) {
  441. requiresConsistency := false
  442. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  443. requiresConsistency = ep.MessageRequiresConsistency(msg)
  444. } else {
  445. requiresConsistency = tp.partitioner.RequiresConsistency()
  446. }
  447. if requiresConsistency {
  448. partitions, err = tp.parent.client.Partitions(msg.Topic)
  449. } else {
  450. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  451. }
  452. return
  453. })
  454. if err != nil {
  455. return err
  456. }
  457. numPartitions := int32(len(partitions))
  458. if numPartitions == 0 {
  459. return ErrLeaderNotAvailable
  460. }
  461. choice, err := tp.partitioner.Partition(msg, numPartitions)
  462. if err != nil {
  463. return err
  464. } else if choice < 0 || choice >= numPartitions {
  465. return ErrInvalidPartition
  466. }
  467. msg.Partition = partitions[choice]
  468. return nil
  469. }
  470. // one per partition per topic
  471. // dispatches messages to the appropriate broker
  472. // also responsible for maintaining message order during retries
  473. type partitionProducer struct {
  474. parent *asyncProducer
  475. topic string
  476. partition int32
  477. input <-chan *ProducerMessage
  478. leader *Broker
  479. breaker *breaker.Breaker
  480. brokerProducer *brokerProducer
  481. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  482. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  483. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  484. // therefore whether our buffer is complete and safe to flush)
  485. highWatermark int
  486. retryState []partitionRetryState
  487. }
  488. type partitionRetryState struct {
  489. buf []*ProducerMessage
  490. expectChaser bool
  491. }
  492. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  493. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  494. pp := &partitionProducer{
  495. parent: p,
  496. topic: topic,
  497. partition: partition,
  498. input: input,
  499. breaker: breaker.New(3, 1, 10*time.Second),
  500. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  501. }
  502. go withRecover(pp.dispatch)
  503. return input
  504. }
  505. func (pp *partitionProducer) backoff(retries int) {
  506. var backoff time.Duration
  507. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  508. maxRetries := pp.parent.conf.Producer.Retry.Max
  509. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  510. } else {
  511. backoff = pp.parent.conf.Producer.Retry.Backoff
  512. }
  513. if backoff > 0 {
  514. time.Sleep(backoff)
  515. }
  516. }
  517. func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMessage) error {
  518. if pp.brokerProducer == nil {
  519. if err := pp.updateLeader(); err != nil {
  520. pp.parent.returnError(msg, err)
  521. pp.backoff(msg.retries)
  522. return err
  523. }
  524. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  525. }
  526. return nil
  527. }
  528. func (pp *partitionProducer) dispatch() {
  529. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  530. // on the first message
  531. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  532. if pp.leader != nil {
  533. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  534. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  535. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  536. }
  537. defer func() {
  538. if pp.brokerProducer != nil {
  539. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  540. }
  541. }()
  542. for msg := range pp.input {
  543. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  544. select {
  545. case <-pp.brokerProducer.abandoned:
  546. // a message on the abandoned channel means that our current broker selection is out of date
  547. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  548. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  549. pp.brokerProducer = nil
  550. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  551. default:
  552. // producer connection is still open.
  553. }
  554. }
  555. if msg.retries > pp.highWatermark {
  556. if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
  557. continue
  558. }
  559. // a new, higher, retry level; handle it and then back off
  560. pp.newHighWatermark(msg.retries)
  561. pp.backoff(msg.retries)
  562. } else if pp.highWatermark > 0 {
  563. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  564. if msg.retries < pp.highWatermark {
  565. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  566. if msg.flags&fin == fin {
  567. pp.retryState[msg.retries].expectChaser = false
  568. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  569. } else {
  570. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  571. }
  572. continue
  573. } else if msg.flags&fin == fin {
  574. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  575. // meaning this retry level is done and we can go down (at least) one level and flush that
  576. pp.retryState[pp.highWatermark].expectChaser = false
  577. pp.flushRetryBuffers()
  578. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  579. continue
  580. }
  581. }
  582. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  583. // without breaking any of our ordering guarantees
  584. if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
  585. continue
  586. }
  587. // Now that we know we have a broker to actually try and send this message to, generate the sequence
  588. // number for it.
  589. // All messages being retried (sent or not) have already had their retry count updated
  590. // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
  591. if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
  592. msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  593. msg.hasSequence = true
  594. }
  595. if pp.parent.IsTransactional() {
  596. pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
  597. }
  598. pp.brokerProducer.input <- msg
  599. }
  600. }
  601. func (pp *partitionProducer) newHighWatermark(hwm int) {
  602. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  603. pp.highWatermark = hwm
  604. // send off a fin so that we know when everything "in between" has made it
  605. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  606. pp.retryState[pp.highWatermark].expectChaser = true
  607. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  608. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  609. // a new HWM means that our current broker selection is out of date
  610. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  611. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  612. pp.brokerProducer = nil
  613. }
  614. func (pp *partitionProducer) flushRetryBuffers() {
  615. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  616. for {
  617. pp.highWatermark--
  618. if pp.brokerProducer == nil {
  619. if err := pp.updateLeader(); err != nil {
  620. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  621. goto flushDone
  622. }
  623. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  624. }
  625. for _, msg := range pp.retryState[pp.highWatermark].buf {
  626. pp.brokerProducer.input <- msg
  627. }
  628. flushDone:
  629. pp.retryState[pp.highWatermark].buf = nil
  630. if pp.retryState[pp.highWatermark].expectChaser {
  631. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  632. break
  633. } else if pp.highWatermark == 0 {
  634. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  635. break
  636. }
  637. }
  638. }
  639. func (pp *partitionProducer) updateLeader() error {
  640. return pp.breaker.Run(func() (err error) {
  641. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  642. return err
  643. }
  644. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  645. return err
  646. }
  647. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  648. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  649. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  650. return nil
  651. })
  652. }
  653. // one per broker; also constructs an associated flusher
  654. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  655. var (
  656. input = make(chan *ProducerMessage)
  657. bridge = make(chan *produceSet)
  658. pending = make(chan *brokerProducerResponse)
  659. responses = make(chan *brokerProducerResponse)
  660. )
  661. bp := &brokerProducer{
  662. parent: p,
  663. broker: broker,
  664. input: input,
  665. output: bridge,
  666. responses: responses,
  667. buffer: newProduceSet(p),
  668. currentRetries: make(map[string]map[int32]error),
  669. }
  670. go withRecover(bp.run)
  671. // minimal bridge to make the network response `select`able
  672. go withRecover(func() {
  673. // Use a wait group to know if we still have in flight requests
  674. var wg sync.WaitGroup
  675. for set := range bridge {
  676. request := set.buildRequest()
  677. // Count the in flight requests to know when we can close the pending channel safely
  678. wg.Add(1)
  679. // Capture the current set to forward in the callback
  680. sendResponse := func(set *produceSet) ProduceCallback {
  681. return func(response *ProduceResponse, err error) {
  682. // Forward the response to make sure we do not block the responseReceiver
  683. pending <- &brokerProducerResponse{
  684. set: set,
  685. err: err,
  686. res: response,
  687. }
  688. wg.Done()
  689. }
  690. }(set)
  691. if p.IsTransactional() {
  692. // Add partition to tx before sending current batch
  693. err := p.txnmgr.publishTxnPartitions()
  694. if err != nil {
  695. // Request failed to be sent
  696. sendResponse(nil, err)
  697. continue
  698. }
  699. }
  700. // Use AsyncProduce vs Produce to not block waiting for the response
  701. // so that we can pipeline multiple produce requests and achieve higher throughput, see:
  702. // https://kafka.apache.org/protocol#protocol_network
  703. err := broker.AsyncProduce(request, sendResponse)
  704. if err != nil {
  705. // Request failed to be sent
  706. sendResponse(nil, err)
  707. continue
  708. }
  709. // Callback is not called when using NoResponse
  710. if p.conf.Producer.RequiredAcks == NoResponse {
  711. // Provide the expected nil response
  712. sendResponse(nil, nil)
  713. }
  714. }
  715. // Wait for all in flight requests to close the pending channel safely
  716. wg.Wait()
  717. close(pending)
  718. })
  719. // In order to avoid a deadlock when closing the broker on network or malformed response error
  720. // we use an intermediate channel to buffer and send pending responses in order
  721. // This is because the AsyncProduce callback inside the bridge is invoked from the broker
  722. // responseReceiver goroutine and closing the broker requires such goroutine to be finished
  723. go withRecover(func() {
  724. buf := queue.New()
  725. for {
  726. if buf.Length() == 0 {
  727. res, ok := <-pending
  728. if !ok {
  729. // We are done forwarding the last pending response
  730. close(responses)
  731. return
  732. }
  733. buf.Add(res)
  734. }
  735. // Send the head pending response or buffer another one
  736. // so that we never block the callback
  737. headRes := buf.Peek().(*brokerProducerResponse)
  738. select {
  739. case res, ok := <-pending:
  740. if !ok {
  741. continue
  742. }
  743. buf.Add(res)
  744. continue
  745. case responses <- headRes:
  746. buf.Remove()
  747. continue
  748. }
  749. }
  750. })
  751. if p.conf.Producer.Retry.Max <= 0 {
  752. bp.abandoned = make(chan struct{})
  753. }
  754. return bp
  755. }
  756. type brokerProducerResponse struct {
  757. set *produceSet
  758. err error
  759. res *ProduceResponse
  760. }
  761. // groups messages together into appropriately-sized batches for sending to the broker
  762. // handles state related to retries etc
  763. type brokerProducer struct {
  764. parent *asyncProducer
  765. broker *Broker
  766. input chan *ProducerMessage
  767. output chan<- *produceSet
  768. responses <-chan *brokerProducerResponse
  769. abandoned chan struct{}
  770. buffer *produceSet
  771. timer *time.Timer
  772. timerFired bool
  773. closing error
  774. currentRetries map[string]map[int32]error
  775. }
  776. func (bp *brokerProducer) run() {
  777. var output chan<- *produceSet
  778. var timerChan <-chan time.Time
  779. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  780. for {
  781. select {
  782. case msg, ok := <-bp.input:
  783. if !ok {
  784. Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
  785. bp.shutdown()
  786. return
  787. }
  788. if msg == nil {
  789. continue
  790. }
  791. if msg.flags&syn == syn {
  792. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  793. bp.broker.ID(), msg.Topic, msg.Partition)
  794. if bp.currentRetries[msg.Topic] == nil {
  795. bp.currentRetries[msg.Topic] = make(map[int32]error)
  796. }
  797. bp.currentRetries[msg.Topic][msg.Partition] = nil
  798. bp.parent.inFlight.Done()
  799. continue
  800. }
  801. if reason := bp.needsRetry(msg); reason != nil {
  802. bp.parent.retryMessage(msg, reason)
  803. if bp.closing == nil && msg.flags&fin == fin {
  804. // we were retrying this partition but we can start processing again
  805. delete(bp.currentRetries[msg.Topic], msg.Partition)
  806. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  807. bp.broker.ID(), msg.Topic, msg.Partition)
  808. }
  809. continue
  810. }
  811. if msg.flags&fin == fin {
  812. // New broker producer that was caught up by the retry loop
  813. bp.parent.retryMessage(msg, ErrShuttingDown)
  814. DebugLogger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
  815. bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
  816. continue
  817. }
  818. if bp.buffer.wouldOverflow(msg) {
  819. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  820. if err := bp.waitForSpace(msg, false); err != nil {
  821. bp.parent.retryMessage(msg, err)
  822. continue
  823. }
  824. }
  825. if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
  826. // The epoch was reset, need to roll the buffer over
  827. Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
  828. if err := bp.waitForSpace(msg, true); err != nil {
  829. bp.parent.retryMessage(msg, err)
  830. continue
  831. }
  832. }
  833. if err := bp.buffer.add(msg); err != nil {
  834. bp.parent.returnError(msg, err)
  835. continue
  836. }
  837. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  838. bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency)
  839. timerChan = bp.timer.C
  840. }
  841. case <-timerChan:
  842. bp.timerFired = true
  843. case output <- bp.buffer:
  844. bp.rollOver()
  845. timerChan = nil
  846. case response, ok := <-bp.responses:
  847. if ok {
  848. bp.handleResponse(response)
  849. }
  850. }
  851. if bp.timerFired || bp.buffer.readyToFlush() {
  852. output = bp.output
  853. } else {
  854. output = nil
  855. }
  856. }
  857. }
  858. func (bp *brokerProducer) shutdown() {
  859. for !bp.buffer.empty() {
  860. select {
  861. case response := <-bp.responses:
  862. bp.handleResponse(response)
  863. case bp.output <- bp.buffer:
  864. bp.rollOver()
  865. }
  866. }
  867. close(bp.output)
  868. // Drain responses from the bridge goroutine
  869. for response := range bp.responses {
  870. bp.handleResponse(response)
  871. }
  872. // No more brokerProducer related goroutine should be running
  873. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  874. }
  875. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  876. if bp.closing != nil {
  877. return bp.closing
  878. }
  879. return bp.currentRetries[msg.Topic][msg.Partition]
  880. }
  881. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
  882. for {
  883. select {
  884. case response := <-bp.responses:
  885. bp.handleResponse(response)
  886. // handling a response can change our state, so re-check some things
  887. if reason := bp.needsRetry(msg); reason != nil {
  888. return reason
  889. } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
  890. return nil
  891. }
  892. case bp.output <- bp.buffer:
  893. bp.rollOver()
  894. return nil
  895. }
  896. }
  897. }
  898. func (bp *brokerProducer) rollOver() {
  899. if bp.timer != nil {
  900. bp.timer.Stop()
  901. }
  902. bp.timer = nil
  903. bp.timerFired = false
  904. bp.buffer = newProduceSet(bp.parent)
  905. }
  906. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  907. if response.err != nil {
  908. bp.handleError(response.set, response.err)
  909. } else {
  910. bp.handleSuccess(response.set, response.res)
  911. }
  912. if bp.buffer.empty() {
  913. bp.rollOver() // this can happen if the response invalidated our buffer
  914. }
  915. }
  916. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  917. // we iterate through the blocks in the request set, not the response, so that we notice
  918. // if the response is missing a block completely
  919. var retryTopics []string
  920. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  921. if response == nil {
  922. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  923. bp.parent.returnSuccesses(pSet.msgs)
  924. return
  925. }
  926. block := response.GetBlock(topic, partition)
  927. if block == nil {
  928. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  929. return
  930. }
  931. switch block.Err {
  932. // Success
  933. case ErrNoError:
  934. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  935. for _, msg := range pSet.msgs {
  936. msg.Timestamp = block.Timestamp
  937. }
  938. }
  939. for i, msg := range pSet.msgs {
  940. msg.Offset = block.Offset + int64(i)
  941. }
  942. bp.parent.returnSuccesses(pSet.msgs)
  943. // Duplicate
  944. case ErrDuplicateSequenceNumber:
  945. bp.parent.returnSuccesses(pSet.msgs)
  946. // Retriable errors
  947. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  948. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  949. if bp.parent.conf.Producer.Retry.Max <= 0 {
  950. bp.parent.abandonBrokerConnection(bp.broker)
  951. bp.parent.returnErrors(pSet.msgs, block.Err)
  952. } else {
  953. retryTopics = append(retryTopics, topic)
  954. }
  955. // Other non-retriable errors
  956. default:
  957. if bp.parent.conf.Producer.Retry.Max <= 0 {
  958. bp.parent.abandonBrokerConnection(bp.broker)
  959. }
  960. bp.parent.returnErrors(pSet.msgs, block.Err)
  961. }
  962. })
  963. if len(retryTopics) > 0 {
  964. if bp.parent.conf.Producer.Idempotent {
  965. err := bp.parent.client.RefreshMetadata(retryTopics...)
  966. if err != nil {
  967. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  968. }
  969. }
  970. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  971. block := response.GetBlock(topic, partition)
  972. if block == nil {
  973. // handled in the previous "eachPartition" loop
  974. return
  975. }
  976. switch block.Err {
  977. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  978. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  979. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  980. bp.broker.ID(), topic, partition, block.Err)
  981. if bp.currentRetries[topic] == nil {
  982. bp.currentRetries[topic] = make(map[int32]error)
  983. }
  984. bp.currentRetries[topic][partition] = block.Err
  985. if bp.parent.conf.Producer.Idempotent {
  986. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  987. } else {
  988. bp.parent.retryMessages(pSet.msgs, block.Err)
  989. }
  990. // dropping the following messages has the side effect of incrementing their retry count
  991. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  992. }
  993. })
  994. }
  995. }
  996. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  997. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  998. produceSet := newProduceSet(p)
  999. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  1000. produceSet.msgs[topic][partition] = pSet
  1001. produceSet.bufferBytes += pSet.bufferBytes
  1002. produceSet.bufferCount += len(pSet.msgs)
  1003. for _, msg := range pSet.msgs {
  1004. if msg.retries >= p.conf.Producer.Retry.Max {
  1005. p.returnErrors(pSet.msgs, kerr)
  1006. return
  1007. }
  1008. msg.retries++
  1009. }
  1010. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  1011. leader, err := p.client.Leader(topic, partition)
  1012. if err != nil {
  1013. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  1014. for _, msg := range pSet.msgs {
  1015. p.returnError(msg, kerr)
  1016. }
  1017. return
  1018. }
  1019. bp := p.getBrokerProducer(leader)
  1020. bp.output <- produceSet
  1021. p.unrefBrokerProducer(leader, bp)
  1022. }
  1023. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  1024. var target PacketEncodingError
  1025. if errors.As(err, &target) {
  1026. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  1027. bp.parent.returnErrors(pSet.msgs, err)
  1028. })
  1029. } else {
  1030. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  1031. bp.parent.abandonBrokerConnection(bp.broker)
  1032. _ = bp.broker.Close()
  1033. bp.closing = err
  1034. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  1035. bp.parent.retryMessages(pSet.msgs, err)
  1036. })
  1037. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  1038. bp.parent.retryMessages(pSet.msgs, err)
  1039. })
  1040. bp.rollOver()
  1041. }
  1042. }
  1043. // singleton
  1044. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  1045. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  1046. func (p *asyncProducer) retryHandler() {
  1047. var msg *ProducerMessage
  1048. buf := queue.New()
  1049. for {
  1050. if buf.Length() == 0 {
  1051. msg = <-p.retries
  1052. } else {
  1053. select {
  1054. case msg = <-p.retries:
  1055. case p.input <- buf.Peek().(*ProducerMessage):
  1056. buf.Remove()
  1057. continue
  1058. }
  1059. }
  1060. if msg == nil {
  1061. return
  1062. }
  1063. buf.Add(msg)
  1064. }
  1065. }
  1066. // utility functions
  1067. func (p *asyncProducer) shutdown() {
  1068. Logger.Println("Producer shutting down.")
  1069. p.inFlight.Add(1)
  1070. p.input <- &ProducerMessage{flags: shutdown}
  1071. p.inFlight.Wait()
  1072. err := p.client.Close()
  1073. if err != nil {
  1074. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  1075. }
  1076. close(p.input)
  1077. close(p.retries)
  1078. close(p.errors)
  1079. close(p.successes)
  1080. p.metricsRegistry.UnregisterAll()
  1081. }
  1082. func (p *asyncProducer) bumpIdempotentProducerEpoch() {
  1083. _, epoch := p.txnmgr.getProducerID()
  1084. if epoch == math.MaxInt16 {
  1085. Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID")
  1086. txnmgr, err := newTransactionManager(p.conf, p.client)
  1087. if err != nil {
  1088. Logger.Println(err)
  1089. return
  1090. }
  1091. p.txnmgr = txnmgr
  1092. } else {
  1093. p.txnmgr.bumpEpoch()
  1094. }
  1095. }
  1096. func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
  1097. if errors.Is(err, ErrClusterAuthorizationFailed) ||
  1098. errors.Is(err, ErrProducerFenced) ||
  1099. errors.Is(err, ErrUnsupportedVersion) ||
  1100. errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
  1101. return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
  1102. }
  1103. if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
  1104. p.txnmgr.epochBumpRequired = true
  1105. }
  1106. return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
  1107. }
  1108. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  1109. if p.IsTransactional() {
  1110. _ = p.maybeTransitionToErrorState(err)
  1111. }
  1112. // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
  1113. // will never see a message with this number, so we can never continue the sequence.
  1114. if !p.IsTransactional() && msg.hasSequence {
  1115. Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
  1116. p.bumpIdempotentProducerEpoch()
  1117. }
  1118. msg.clear()
  1119. pErr := &ProducerError{Msg: msg, Err: err}
  1120. if p.conf.Producer.Return.Errors {
  1121. p.errors <- pErr
  1122. } else {
  1123. Logger.Println(pErr)
  1124. }
  1125. p.inFlight.Done()
  1126. }
  1127. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  1128. for _, msg := range batch {
  1129. p.returnError(msg, err)
  1130. }
  1131. }
  1132. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  1133. for _, msg := range batch {
  1134. if p.conf.Producer.Return.Successes {
  1135. msg.clear()
  1136. p.successes <- msg
  1137. }
  1138. p.inFlight.Done()
  1139. }
  1140. }
  1141. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  1142. if msg.retries >= p.conf.Producer.Retry.Max {
  1143. p.returnError(msg, err)
  1144. } else {
  1145. msg.retries++
  1146. p.retries <- msg
  1147. }
  1148. }
  1149. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  1150. for _, msg := range batch {
  1151. p.retryMessage(msg, err)
  1152. }
  1153. }
  1154. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  1155. p.brokerLock.Lock()
  1156. defer p.brokerLock.Unlock()
  1157. bp := p.brokers[broker]
  1158. if bp == nil {
  1159. bp = p.newBrokerProducer(broker)
  1160. p.brokers[broker] = bp
  1161. p.brokerRefs[bp] = 0
  1162. }
  1163. p.brokerRefs[bp]++
  1164. return bp
  1165. }
  1166. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  1167. p.brokerLock.Lock()
  1168. defer p.brokerLock.Unlock()
  1169. p.brokerRefs[bp]--
  1170. if p.brokerRefs[bp] == 0 {
  1171. close(bp.input)
  1172. delete(p.brokerRefs, bp)
  1173. if p.brokers[broker] == bp {
  1174. delete(p.brokers, broker)
  1175. }
  1176. }
  1177. }
  1178. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  1179. p.brokerLock.Lock()
  1180. defer p.brokerLock.Unlock()
  1181. bc, ok := p.brokers[broker]
  1182. if ok && bc.abandoned != nil {
  1183. close(bc.abandoned)
  1184. }
  1185. delete(p.brokers, broker)
  1186. }