transaction_manager.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. // ProducerTxnStatusFlag mark current transaction status.
  10. type ProducerTxnStatusFlag int16
  11. const (
  12. // ProducerTxnFlagUninitialized when txnmgr is created
  13. ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
  14. // ProducerTxnFlagInitializing when txnmgr is initializing
  15. ProducerTxnFlagInitializing
  16. // ProducerTxnFlagReady when is ready to receive transaction
  17. ProducerTxnFlagReady
  18. // ProducerTxnFlagInTransaction when transaction is started
  19. ProducerTxnFlagInTransaction
  20. // ProducerTxnFlagEndTransaction when transaction will be committed
  21. ProducerTxnFlagEndTransaction
  22. // ProducerTxnFlagInError when having abortable or fatal error
  23. ProducerTxnFlagInError
  24. // ProducerTxnFlagCommittingTransaction when committing txn
  25. ProducerTxnFlagCommittingTransaction
  26. // ProducerTxnFlagAbortingTransaction when committing txn
  27. ProducerTxnFlagAbortingTransaction
  28. // ProducerTxnFlagAbortableError when producer encounter an abortable error
  29. // Must call AbortTxn in this case.
  30. ProducerTxnFlagAbortableError
  31. // ProducerTxnFlagFatalError when producer encounter an fatal error
  32. // Must Close an recreate it.
  33. ProducerTxnFlagFatalError
  34. )
  35. func (s ProducerTxnStatusFlag) String() string {
  36. status := make([]string, 0)
  37. if s&ProducerTxnFlagUninitialized != 0 {
  38. status = append(status, "ProducerTxnStateUninitialized")
  39. }
  40. if s&ProducerTxnFlagInitializing != 0 {
  41. status = append(status, "ProducerTxnStateInitializing")
  42. }
  43. if s&ProducerTxnFlagReady != 0 {
  44. status = append(status, "ProducerTxnStateReady")
  45. }
  46. if s&ProducerTxnFlagInTransaction != 0 {
  47. status = append(status, "ProducerTxnStateInTransaction")
  48. }
  49. if s&ProducerTxnFlagEndTransaction != 0 {
  50. status = append(status, "ProducerTxnStateEndTransaction")
  51. }
  52. if s&ProducerTxnFlagInError != 0 {
  53. status = append(status, "ProducerTxnStateInError")
  54. }
  55. if s&ProducerTxnFlagCommittingTransaction != 0 {
  56. status = append(status, "ProducerTxnStateCommittingTransaction")
  57. }
  58. if s&ProducerTxnFlagAbortingTransaction != 0 {
  59. status = append(status, "ProducerTxnStateAbortingTransaction")
  60. }
  61. if s&ProducerTxnFlagAbortableError != 0 {
  62. status = append(status, "ProducerTxnStateAbortableError")
  63. }
  64. if s&ProducerTxnFlagFatalError != 0 {
  65. status = append(status, "ProducerTxnStateFatalError")
  66. }
  67. return strings.Join(status, "|")
  68. }
  69. // transactionManager keeps the state necessary to ensure idempotent production
  70. type transactionManager struct {
  71. producerID int64
  72. producerEpoch int16
  73. sequenceNumbers map[string]int32
  74. mutex sync.Mutex
  75. transactionalID string
  76. transactionTimeout time.Duration
  77. client Client
  78. // when kafka cluster is at least 2.5.0.
  79. // used to recover when producer failed.
  80. coordinatorSupportsBumpingEpoch bool
  81. // When producer need to bump it's epoch.
  82. epochBumpRequired bool
  83. // Record last seen error.
  84. lastError error
  85. // Ensure that status is never accessed with a race-condition.
  86. statusLock sync.RWMutex
  87. status ProducerTxnStatusFlag
  88. // Ensure that only one goroutine will update partitions in current transaction.
  89. partitionInTxnLock sync.Mutex
  90. pendingPartitionsInCurrentTxn topicPartitionSet
  91. partitionsInCurrentTxn topicPartitionSet
  92. // Offsets to add to transaction.
  93. offsetsInCurrentTxn map[string]topicPartitionOffsets
  94. }
  95. const (
  96. noProducerID = -1
  97. noProducerEpoch = -1
  98. // see publishTxnPartitions comment.
  99. addPartitionsRetryBackoff = 20 * time.Millisecond
  100. )
  101. // txnmngr allowed transitions.
  102. var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{
  103. ProducerTxnFlagUninitialized: {
  104. ProducerTxnFlagReady,
  105. ProducerTxnFlagInError,
  106. },
  107. // When we need are initializing
  108. ProducerTxnFlagInitializing: {
  109. ProducerTxnFlagInitializing,
  110. ProducerTxnFlagReady,
  111. ProducerTxnFlagInError,
  112. },
  113. // When we have initialized transactional producer
  114. ProducerTxnFlagReady: {
  115. ProducerTxnFlagInTransaction,
  116. },
  117. // When beginTxn has been called
  118. ProducerTxnFlagInTransaction: {
  119. // When calling commit or abort
  120. ProducerTxnFlagEndTransaction,
  121. // When got an error
  122. ProducerTxnFlagInError,
  123. },
  124. ProducerTxnFlagEndTransaction: {
  125. // When epoch bump
  126. ProducerTxnFlagInitializing,
  127. // When commit is good
  128. ProducerTxnFlagReady,
  129. // When got an error
  130. ProducerTxnFlagInError,
  131. },
  132. // Need to abort transaction
  133. ProducerTxnFlagAbortableError: {
  134. // Call AbortTxn
  135. ProducerTxnFlagAbortingTransaction,
  136. // When got an error
  137. ProducerTxnFlagInError,
  138. },
  139. // Need to close producer
  140. ProducerTxnFlagFatalError: {
  141. ProducerTxnFlagFatalError,
  142. },
  143. }
  144. type topicPartition struct {
  145. topic string
  146. partition int32
  147. }
  148. // to ensure that we don't do a full scan every time a partition or an offset is added.
  149. type (
  150. topicPartitionSet map[topicPartition]struct{}
  151. topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
  152. )
  153. func (s topicPartitionSet) mapToRequest() map[string][]int32 {
  154. result := make(map[string][]int32, len(s))
  155. for tp := range s {
  156. result[tp.topic] = append(result[tp.topic], tp.partition)
  157. }
  158. return result
  159. }
  160. func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata {
  161. result := make(map[string][]*PartitionOffsetMetadata, len(s))
  162. for tp, offset := range s {
  163. result[tp.topic] = append(result[tp.topic], offset)
  164. }
  165. return result
  166. }
  167. // Return true if current transition is allowed.
  168. func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool {
  169. for status, allowedTransitions := range producerTxnTransitions {
  170. if status&t.status != 0 {
  171. for _, allowedTransition := range allowedTransitions {
  172. if allowedTransition&target != 0 {
  173. return true
  174. }
  175. }
  176. }
  177. }
  178. return false
  179. }
  180. // Get current transaction status.
  181. func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag {
  182. t.statusLock.RLock()
  183. defer t.statusLock.RUnlock()
  184. return t.status
  185. }
  186. // Try to transition to a valid status and return an error otherwise.
  187. func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error {
  188. t.statusLock.Lock()
  189. defer t.statusLock.Unlock()
  190. if !t.isTransitionValid(target) {
  191. return ErrTransitionNotAllowed
  192. }
  193. if target&ProducerTxnFlagInError != 0 {
  194. if err == nil {
  195. return ErrCannotTransitionNilError
  196. }
  197. t.lastError = err
  198. } else {
  199. t.lastError = nil
  200. }
  201. DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target)
  202. t.status = target
  203. return err
  204. }
  205. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
  206. key := fmt.Sprintf("%s-%d", topic, partition)
  207. t.mutex.Lock()
  208. defer t.mutex.Unlock()
  209. sequence := t.sequenceNumbers[key]
  210. t.sequenceNumbers[key] = sequence + 1
  211. return sequence, t.producerEpoch
  212. }
  213. func (t *transactionManager) bumpEpoch() {
  214. t.mutex.Lock()
  215. defer t.mutex.Unlock()
  216. t.producerEpoch++
  217. for k := range t.sequenceNumbers {
  218. t.sequenceNumbers[k] = 0
  219. }
  220. }
  221. func (t *transactionManager) getProducerID() (int64, int16) {
  222. t.mutex.Lock()
  223. defer t.mutex.Unlock()
  224. return t.producerID, t.producerEpoch
  225. }
  226. // Compute retry backoff considered current attempts.
  227. func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration {
  228. if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
  229. maxRetries := t.client.Config().Producer.Transaction.Retry.Max
  230. retries := maxRetries - attemptsRemaining
  231. return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
  232. }
  233. return t.client.Config().Producer.Transaction.Retry.Backoff
  234. }
  235. // return true is txnmngr is transactinal.
  236. func (t *transactionManager) isTransactional() bool {
  237. return t.transactionalID != ""
  238. }
  239. // add specified offsets to current transaction.
  240. func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error {
  241. t.mutex.Lock()
  242. defer t.mutex.Unlock()
  243. if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
  244. return ErrTransactionNotReady
  245. }
  246. if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
  247. return t.lastError
  248. }
  249. if _, ok := t.offsetsInCurrentTxn[groupId]; !ok {
  250. t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{}
  251. }
  252. for topic, offsets := range offsetsToAdd {
  253. for _, offset := range offsets {
  254. tp := topicPartition{topic: topic, partition: offset.Partition}
  255. t.offsetsInCurrentTxn[groupId][tp] = offset
  256. }
  257. }
  258. return nil
  259. }
  260. // send txnmgnr save offsets to transaction coordinator.
  261. func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
  262. // First AddOffsetsToTxn
  263. attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
  264. exec := func(run func() (bool, error), err error) error {
  265. for attemptsRemaining >= 0 {
  266. var retry bool
  267. retry, err = run()
  268. if !retry {
  269. return err
  270. }
  271. backoff := t.computeBackoff(attemptsRemaining)
  272. Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
  273. t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
  274. time.Sleep(backoff)
  275. attemptsRemaining--
  276. }
  277. return err
  278. }
  279. lastError := exec(func() (bool, error) {
  280. coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
  281. if err != nil {
  282. return true, err
  283. }
  284. request := &AddOffsetsToTxnRequest{
  285. TransactionalID: t.transactionalID,
  286. ProducerEpoch: t.producerEpoch,
  287. ProducerID: t.producerID,
  288. GroupID: groupId,
  289. }
  290. if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
  291. // Version 2 adds the support for new error code PRODUCER_FENCED.
  292. request.Version = 2
  293. } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
  294. // Version 1 is the same as version 0.
  295. request.Version = 1
  296. }
  297. response, err := coordinator.AddOffsetsToTxn(request)
  298. if err != nil {
  299. // If an error occurred try to refresh current transaction coordinator.
  300. _ = coordinator.Close()
  301. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  302. return true, err
  303. }
  304. if response == nil {
  305. // If no response is returned just retry.
  306. return true, ErrTxnUnableToParseResponse
  307. }
  308. if response.Err == ErrNoError {
  309. DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
  310. t.transactionalID, groupId, response)
  311. // If no error, just exit.
  312. return false, nil
  313. }
  314. switch response.Err {
  315. case ErrConsumerCoordinatorNotAvailable:
  316. fallthrough
  317. case ErrNotCoordinatorForConsumer:
  318. _ = coordinator.Close()
  319. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  320. fallthrough
  321. case ErrOffsetsLoadInProgress:
  322. fallthrough
  323. case ErrConcurrentTransactions:
  324. // Retry
  325. case ErrUnknownProducerID:
  326. fallthrough
  327. case ErrInvalidProducerIDMapping:
  328. return false, t.abortableErrorIfPossible(response.Err)
  329. case ErrGroupAuthorizationFailed:
  330. return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
  331. default:
  332. // Others are fatal
  333. return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
  334. }
  335. return true, response.Err
  336. }, nil)
  337. if lastError != nil {
  338. return offsets, lastError
  339. }
  340. resultOffsets := offsets
  341. // Then TxnOffsetCommit
  342. // note the result is not completed until the TxnOffsetCommit returns
  343. attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
  344. execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
  345. var r topicPartitionOffsets
  346. for attemptsRemaining >= 0 {
  347. var retry bool
  348. r, retry, err = run()
  349. if !retry {
  350. return r, err
  351. }
  352. backoff := t.computeBackoff(attemptsRemaining)
  353. Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
  354. t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
  355. time.Sleep(backoff)
  356. attemptsRemaining--
  357. }
  358. return r, err
  359. }
  360. return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
  361. consumerGroupCoordinator, err := t.client.Coordinator(groupId)
  362. if err != nil {
  363. return resultOffsets, true, err
  364. }
  365. request := &TxnOffsetCommitRequest{
  366. TransactionalID: t.transactionalID,
  367. ProducerEpoch: t.producerEpoch,
  368. ProducerID: t.producerID,
  369. GroupID: groupId,
  370. Topics: offsets.mapToRequest(),
  371. }
  372. if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
  373. // Version 2 adds the committed leader epoch.
  374. request.Version = 2
  375. } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
  376. // Version 1 is the same as version 0.
  377. request.Version = 1
  378. }
  379. responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
  380. if err != nil {
  381. _ = consumerGroupCoordinator.Close()
  382. _ = t.client.RefreshCoordinator(groupId)
  383. return resultOffsets, true, err
  384. }
  385. if responses == nil {
  386. return resultOffsets, true, ErrTxnUnableToParseResponse
  387. }
  388. var responseErrors []error
  389. failedTxn := topicPartitionOffsets{}
  390. for topic, partitionErrors := range responses.Topics {
  391. for _, partitionError := range partitionErrors {
  392. switch partitionError.Err {
  393. case ErrNoError:
  394. continue
  395. // If the topic is unknown or the coordinator is loading, retry with the current coordinator
  396. case ErrRequestTimedOut:
  397. fallthrough
  398. case ErrConsumerCoordinatorNotAvailable:
  399. fallthrough
  400. case ErrNotCoordinatorForConsumer:
  401. _ = consumerGroupCoordinator.Close()
  402. _ = t.client.RefreshCoordinator(groupId)
  403. fallthrough
  404. case ErrUnknownTopicOrPartition:
  405. fallthrough
  406. case ErrOffsetsLoadInProgress:
  407. // Do nothing just retry
  408. case ErrIllegalGeneration:
  409. fallthrough
  410. case ErrUnknownMemberId:
  411. fallthrough
  412. case ErrFencedInstancedId:
  413. fallthrough
  414. case ErrGroupAuthorizationFailed:
  415. return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
  416. default:
  417. // Others are fatal
  418. return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
  419. }
  420. tp := topicPartition{topic: topic, partition: partitionError.Partition}
  421. failedTxn[tp] = offsets[tp]
  422. responseErrors = append(responseErrors, partitionError.Err)
  423. }
  424. }
  425. resultOffsets = failedTxn
  426. if len(resultOffsets) == 0 {
  427. DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s %+v\n",
  428. t.transactionalID, groupId)
  429. return resultOffsets, false, nil
  430. }
  431. return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
  432. }, nil)
  433. }
  434. func (t *transactionManager) initProducerId() (int64, int16, error) {
  435. isEpochBump := false
  436. req := &InitProducerIDRequest{}
  437. if t.isTransactional() {
  438. req.TransactionalID = &t.transactionalID
  439. req.TransactionTimeout = t.transactionTimeout
  440. }
  441. if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
  442. if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
  443. // Version 4 adds the support for new error code PRODUCER_FENCED.
  444. req.Version = 4
  445. } else {
  446. // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
  447. // to resume after an INVALID_PRODUCER_EPOCH error
  448. req.Version = 3
  449. }
  450. isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
  451. t.coordinatorSupportsBumpingEpoch = true
  452. req.ProducerID = t.producerID
  453. req.ProducerEpoch = t.producerEpoch
  454. } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
  455. // Version 2 is the first flexible version.
  456. req.Version = 2
  457. } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
  458. // Version 1 is the same as version 0.
  459. req.Version = 1
  460. }
  461. if isEpochBump {
  462. err := t.transitionTo(ProducerTxnFlagInitializing, nil)
  463. if err != nil {
  464. return -1, -1, err
  465. }
  466. DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
  467. t.transactionalID)
  468. } else {
  469. DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
  470. t.transactionalID, t.producerID, t.producerEpoch)
  471. }
  472. attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
  473. exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
  474. pid := int64(-1)
  475. pepoch := int16(-1)
  476. for attemptsRemaining >= 0 {
  477. var retry bool
  478. pid, pepoch, retry, err = run()
  479. if !retry {
  480. return pid, pepoch, err
  481. }
  482. backoff := t.computeBackoff(attemptsRemaining)
  483. Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
  484. t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
  485. time.Sleep(backoff)
  486. attemptsRemaining--
  487. }
  488. return -1, -1, err
  489. }
  490. return exec(func() (int64, int16, bool, error) {
  491. var err error
  492. var coordinator *Broker
  493. if t.isTransactional() {
  494. coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
  495. } else {
  496. coordinator = t.client.LeastLoadedBroker()
  497. }
  498. if err != nil {
  499. return -1, -1, true, err
  500. }
  501. response, err := coordinator.InitProducerID(req)
  502. if err != nil {
  503. if t.isTransactional() {
  504. _ = coordinator.Close()
  505. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  506. }
  507. return -1, -1, true, err
  508. }
  509. if response == nil {
  510. return -1, -1, true, ErrTxnUnableToParseResponse
  511. }
  512. if response.Err == ErrNoError {
  513. if isEpochBump {
  514. t.sequenceNumbers = make(map[string]int32)
  515. }
  516. err := t.transitionTo(ProducerTxnFlagReady, nil)
  517. if err != nil {
  518. return -1, -1, true, err
  519. }
  520. DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
  521. t.transactionalID, response)
  522. return response.ProducerID, response.ProducerEpoch, false, nil
  523. }
  524. switch response.Err {
  525. // Retriable errors
  526. case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
  527. if t.isTransactional() {
  528. _ = coordinator.Close()
  529. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  530. }
  531. // Fatal errors
  532. default:
  533. return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
  534. }
  535. return -1, -1, true, response.Err
  536. }, nil)
  537. }
  538. // if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal.
  539. func (t *transactionManager) abortableErrorIfPossible(err error) error {
  540. if t.coordinatorSupportsBumpingEpoch {
  541. t.epochBumpRequired = true
  542. return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
  543. }
  544. return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
  545. }
  546. // End current transaction.
  547. func (t *transactionManager) completeTransaction() error {
  548. if t.epochBumpRequired {
  549. err := t.transitionTo(ProducerTxnFlagInitializing, nil)
  550. if err != nil {
  551. return err
  552. }
  553. } else {
  554. err := t.transitionTo(ProducerTxnFlagReady, nil)
  555. if err != nil {
  556. return err
  557. }
  558. }
  559. t.lastError = nil
  560. t.epochBumpRequired = false
  561. t.partitionsInCurrentTxn = topicPartitionSet{}
  562. t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
  563. t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{}
  564. return nil
  565. }
  566. // send EndTxn request with commit flag. (true when committing false otherwise)
  567. func (t *transactionManager) endTxn(commit bool) error {
  568. attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
  569. exec := func(run func() (bool, error), err error) error {
  570. for attemptsRemaining >= 0 {
  571. var retry bool
  572. retry, err = run()
  573. if !retry {
  574. return err
  575. }
  576. backoff := t.computeBackoff(attemptsRemaining)
  577. Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
  578. t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
  579. time.Sleep(backoff)
  580. attemptsRemaining--
  581. }
  582. return err
  583. }
  584. return exec(func() (bool, error) {
  585. coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
  586. if err != nil {
  587. return true, err
  588. }
  589. request := &EndTxnRequest{
  590. TransactionalID: t.transactionalID,
  591. ProducerEpoch: t.producerEpoch,
  592. ProducerID: t.producerID,
  593. TransactionResult: commit,
  594. }
  595. if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
  596. // Version 2 adds the support for new error code PRODUCER_FENCED.
  597. request.Version = 2
  598. } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
  599. // Version 1 is the same as version 0.
  600. request.Version = 1
  601. }
  602. response, err := coordinator.EndTxn(request)
  603. if err != nil {
  604. // Always retry on network error
  605. _ = coordinator.Close()
  606. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  607. return true, err
  608. }
  609. if response == nil {
  610. return true, ErrTxnUnableToParseResponse
  611. }
  612. if response.Err == ErrNoError {
  613. DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
  614. t.transactionalID, response)
  615. return false, t.completeTransaction()
  616. }
  617. switch response.Err {
  618. // Need to refresh coordinator
  619. case ErrConsumerCoordinatorNotAvailable:
  620. fallthrough
  621. case ErrNotCoordinatorForConsumer:
  622. _ = coordinator.Close()
  623. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  624. fallthrough
  625. case ErrOffsetsLoadInProgress:
  626. fallthrough
  627. case ErrConcurrentTransactions:
  628. // Just retry
  629. case ErrUnknownProducerID:
  630. fallthrough
  631. case ErrInvalidProducerIDMapping:
  632. return false, t.abortableErrorIfPossible(response.Err)
  633. // Fatal errors
  634. default:
  635. return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
  636. }
  637. return true, response.Err
  638. }, nil)
  639. }
  640. // We will try to publish associated offsets for each groups
  641. // then send endtxn request to mark transaction as finished.
  642. func (t *transactionManager) finishTransaction(commit bool) error {
  643. t.mutex.Lock()
  644. defer t.mutex.Unlock()
  645. // Ensure no error when committing or aborting
  646. if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
  647. return t.lastError
  648. } else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
  649. return t.lastError
  650. }
  651. // if no records has been sent don't do anything.
  652. if len(t.partitionsInCurrentTxn) == 0 {
  653. return t.completeTransaction()
  654. }
  655. epochBump := t.epochBumpRequired
  656. // If we're aborting the transaction, so there should be no need to add offsets.
  657. if commit && len(t.offsetsInCurrentTxn) > 0 {
  658. for group, offsets := range t.offsetsInCurrentTxn {
  659. newOffsets, err := t.publishOffsetsToTxn(offsets, group)
  660. if err != nil {
  661. t.offsetsInCurrentTxn[group] = newOffsets
  662. return err
  663. }
  664. delete(t.offsetsInCurrentTxn, group)
  665. }
  666. }
  667. if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
  668. return t.lastError
  669. }
  670. if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
  671. err := t.endTxn(commit)
  672. if err != nil {
  673. return err
  674. }
  675. if !epochBump {
  676. return nil
  677. }
  678. }
  679. // reset pid and epoch if needed.
  680. return t.initializeTransactions()
  681. }
  682. // called before sending any transactional record
  683. // won't do anything if current topic-partition is already added to transaction.
  684. func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
  685. if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
  686. return
  687. }
  688. tp := topicPartition{topic: topic, partition: partition}
  689. t.partitionInTxnLock.Lock()
  690. defer t.partitionInTxnLock.Unlock()
  691. if _, ok := t.partitionsInCurrentTxn[tp]; ok {
  692. // partition is already added
  693. return
  694. }
  695. t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
  696. }
  697. // Makes a request to kafka to add a list of partitions ot the current transaction.
  698. func (t *transactionManager) publishTxnPartitions() error {
  699. t.partitionInTxnLock.Lock()
  700. defer t.partitionInTxnLock.Unlock()
  701. if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
  702. return t.lastError
  703. }
  704. if len(t.pendingPartitionsInCurrentTxn) == 0 {
  705. return nil
  706. }
  707. // Remove the partitions from the pending set regardless of the result. We use the presence
  708. // of partitions in the pending set to know when it is not safe to send batches. However, if
  709. // the partitions failed to be added and we enter an error state, we expect the batches to be
  710. // aborted anyway. In this case, we must be able to continue sending the batches which are in
  711. // retry for partitions that were successfully added.
  712. removeAllPartitionsOnFatalOrAbortedError := func() {
  713. t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
  714. }
  715. // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
  716. // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
  717. // we don't want to wait too long before trying to start the new one.
  718. //
  719. // This is only a temporary fix, the long term solution is being tracked in
  720. // https://issues.apache.org/jira/browse/KAFKA-5482
  721. retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
  722. computeBackoff := func(attemptsRemaining int) time.Duration {
  723. if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
  724. maxRetries := t.client.Config().Producer.Transaction.Retry.Max
  725. retries := maxRetries - attemptsRemaining
  726. return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
  727. }
  728. return retryBackoff
  729. }
  730. attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
  731. exec := func(run func() (bool, error), err error) error {
  732. for attemptsRemaining >= 0 {
  733. var retry bool
  734. retry, err = run()
  735. if !retry {
  736. return err
  737. }
  738. backoff := computeBackoff(attemptsRemaining)
  739. Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
  740. time.Sleep(backoff)
  741. attemptsRemaining--
  742. }
  743. return err
  744. }
  745. return exec(func() (bool, error) {
  746. coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
  747. if err != nil {
  748. return true, err
  749. }
  750. request := &AddPartitionsToTxnRequest{
  751. TransactionalID: t.transactionalID,
  752. ProducerID: t.producerID,
  753. ProducerEpoch: t.producerEpoch,
  754. TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
  755. }
  756. if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
  757. // Version 2 adds the support for new error code PRODUCER_FENCED.
  758. request.Version = 2
  759. } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
  760. // Version 1 is the same as version 0.
  761. request.Version = 1
  762. }
  763. addPartResponse, err := coordinator.AddPartitionsToTxn(request)
  764. if err != nil {
  765. _ = coordinator.Close()
  766. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  767. return true, err
  768. }
  769. if addPartResponse == nil {
  770. return true, ErrTxnUnableToParseResponse
  771. }
  772. // remove from the list partitions that have been successfully updated
  773. var responseErrors []error
  774. for topic, results := range addPartResponse.Errors {
  775. for _, response := range results {
  776. tp := topicPartition{topic: topic, partition: response.Partition}
  777. switch response.Err {
  778. case ErrNoError:
  779. // Mark partition as added to transaction
  780. t.partitionsInCurrentTxn[tp] = struct{}{}
  781. delete(t.pendingPartitionsInCurrentTxn, tp)
  782. continue
  783. case ErrConsumerCoordinatorNotAvailable:
  784. fallthrough
  785. case ErrNotCoordinatorForConsumer:
  786. _ = coordinator.Close()
  787. _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
  788. fallthrough
  789. case ErrUnknownTopicOrPartition:
  790. fallthrough
  791. case ErrOffsetsLoadInProgress:
  792. // Retry topicPartition
  793. case ErrConcurrentTransactions:
  794. if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
  795. retryBackoff = addPartitionsRetryBackoff
  796. }
  797. case ErrOperationNotAttempted:
  798. fallthrough
  799. case ErrTopicAuthorizationFailed:
  800. removeAllPartitionsOnFatalOrAbortedError()
  801. return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
  802. case ErrUnknownProducerID:
  803. fallthrough
  804. case ErrInvalidProducerIDMapping:
  805. removeAllPartitionsOnFatalOrAbortedError()
  806. return false, t.abortableErrorIfPossible(response.Err)
  807. // Fatal errors
  808. default:
  809. removeAllPartitionsOnFatalOrAbortedError()
  810. return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
  811. }
  812. responseErrors = append(responseErrors, response.Err)
  813. }
  814. }
  815. // handle end
  816. if len(t.pendingPartitionsInCurrentTxn) == 0 {
  817. DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
  818. t.transactionalID, addPartResponse)
  819. return false, nil
  820. }
  821. return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
  822. }, nil)
  823. }
  824. // Build a new transaction manager sharing producer client.
  825. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  826. txnmgr := &transactionManager{
  827. producerID: noProducerID,
  828. producerEpoch: noProducerEpoch,
  829. client: client,
  830. pendingPartitionsInCurrentTxn: topicPartitionSet{},
  831. partitionsInCurrentTxn: topicPartitionSet{},
  832. offsetsInCurrentTxn: make(map[string]topicPartitionOffsets),
  833. status: ProducerTxnFlagUninitialized,
  834. }
  835. if conf.Producer.Idempotent {
  836. txnmgr.transactionalID = conf.Producer.Transaction.ID
  837. txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout
  838. txnmgr.sequenceNumbers = make(map[string]int32)
  839. txnmgr.mutex = sync.Mutex{}
  840. var err error
  841. txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId()
  842. if err != nil {
  843. return nil, err
  844. }
  845. Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n",
  846. txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch)
  847. }
  848. return txnmgr, nil
  849. }
  850. // re-init producer-id and producer-epoch if needed.
  851. func (t *transactionManager) initializeTransactions() (err error) {
  852. t.producerID, t.producerEpoch, err = t.initProducerId()
  853. return
  854. }