123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930 |
- package sarama
- import (
- "errors"
- "fmt"
- "strings"
- "sync"
- "time"
- )
- // ProducerTxnStatusFlag mark current transaction status.
- type ProducerTxnStatusFlag int16
- const (
- // ProducerTxnFlagUninitialized when txnmgr is created
- ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
- // ProducerTxnFlagInitializing when txnmgr is initializing
- ProducerTxnFlagInitializing
- // ProducerTxnFlagReady when is ready to receive transaction
- ProducerTxnFlagReady
- // ProducerTxnFlagInTransaction when transaction is started
- ProducerTxnFlagInTransaction
- // ProducerTxnFlagEndTransaction when transaction will be committed
- ProducerTxnFlagEndTransaction
- // ProducerTxnFlagInError when having abortable or fatal error
- ProducerTxnFlagInError
- // ProducerTxnFlagCommittingTransaction when committing txn
- ProducerTxnFlagCommittingTransaction
- // ProducerTxnFlagAbortingTransaction when committing txn
- ProducerTxnFlagAbortingTransaction
- // ProducerTxnFlagAbortableError when producer encounter an abortable error
- // Must call AbortTxn in this case.
- ProducerTxnFlagAbortableError
- // ProducerTxnFlagFatalError when producer encounter an fatal error
- // Must Close an recreate it.
- ProducerTxnFlagFatalError
- )
- func (s ProducerTxnStatusFlag) String() string {
- status := make([]string, 0)
- if s&ProducerTxnFlagUninitialized != 0 {
- status = append(status, "ProducerTxnStateUninitialized")
- }
- if s&ProducerTxnFlagInitializing != 0 {
- status = append(status, "ProducerTxnStateInitializing")
- }
- if s&ProducerTxnFlagReady != 0 {
- status = append(status, "ProducerTxnStateReady")
- }
- if s&ProducerTxnFlagInTransaction != 0 {
- status = append(status, "ProducerTxnStateInTransaction")
- }
- if s&ProducerTxnFlagEndTransaction != 0 {
- status = append(status, "ProducerTxnStateEndTransaction")
- }
- if s&ProducerTxnFlagInError != 0 {
- status = append(status, "ProducerTxnStateInError")
- }
- if s&ProducerTxnFlagCommittingTransaction != 0 {
- status = append(status, "ProducerTxnStateCommittingTransaction")
- }
- if s&ProducerTxnFlagAbortingTransaction != 0 {
- status = append(status, "ProducerTxnStateAbortingTransaction")
- }
- if s&ProducerTxnFlagAbortableError != 0 {
- status = append(status, "ProducerTxnStateAbortableError")
- }
- if s&ProducerTxnFlagFatalError != 0 {
- status = append(status, "ProducerTxnStateFatalError")
- }
- return strings.Join(status, "|")
- }
- // transactionManager keeps the state necessary to ensure idempotent production
- type transactionManager struct {
- producerID int64
- producerEpoch int16
- sequenceNumbers map[string]int32
- mutex sync.Mutex
- transactionalID string
- transactionTimeout time.Duration
- client Client
- // when kafka cluster is at least 2.5.0.
- // used to recover when producer failed.
- coordinatorSupportsBumpingEpoch bool
- // When producer need to bump it's epoch.
- epochBumpRequired bool
- // Record last seen error.
- lastError error
- // Ensure that status is never accessed with a race-condition.
- statusLock sync.RWMutex
- status ProducerTxnStatusFlag
- // Ensure that only one goroutine will update partitions in current transaction.
- partitionInTxnLock sync.Mutex
- pendingPartitionsInCurrentTxn topicPartitionSet
- partitionsInCurrentTxn topicPartitionSet
- // Offsets to add to transaction.
- offsetsInCurrentTxn map[string]topicPartitionOffsets
- }
- const (
- noProducerID = -1
- noProducerEpoch = -1
- // see publishTxnPartitions comment.
- addPartitionsRetryBackoff = 20 * time.Millisecond
- )
- // txnmngr allowed transitions.
- var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{
- ProducerTxnFlagUninitialized: {
- ProducerTxnFlagReady,
- ProducerTxnFlagInError,
- },
- // When we need are initializing
- ProducerTxnFlagInitializing: {
- ProducerTxnFlagInitializing,
- ProducerTxnFlagReady,
- ProducerTxnFlagInError,
- },
- // When we have initialized transactional producer
- ProducerTxnFlagReady: {
- ProducerTxnFlagInTransaction,
- },
- // When beginTxn has been called
- ProducerTxnFlagInTransaction: {
- // When calling commit or abort
- ProducerTxnFlagEndTransaction,
- // When got an error
- ProducerTxnFlagInError,
- },
- ProducerTxnFlagEndTransaction: {
- // When epoch bump
- ProducerTxnFlagInitializing,
- // When commit is good
- ProducerTxnFlagReady,
- // When got an error
- ProducerTxnFlagInError,
- },
- // Need to abort transaction
- ProducerTxnFlagAbortableError: {
- // Call AbortTxn
- ProducerTxnFlagAbortingTransaction,
- // When got an error
- ProducerTxnFlagInError,
- },
- // Need to close producer
- ProducerTxnFlagFatalError: {
- ProducerTxnFlagFatalError,
- },
- }
- type topicPartition struct {
- topic string
- partition int32
- }
- // to ensure that we don't do a full scan every time a partition or an offset is added.
- type (
- topicPartitionSet map[topicPartition]struct{}
- topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
- )
- func (s topicPartitionSet) mapToRequest() map[string][]int32 {
- result := make(map[string][]int32, len(s))
- for tp := range s {
- result[tp.topic] = append(result[tp.topic], tp.partition)
- }
- return result
- }
- func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata {
- result := make(map[string][]*PartitionOffsetMetadata, len(s))
- for tp, offset := range s {
- result[tp.topic] = append(result[tp.topic], offset)
- }
- return result
- }
- // Return true if current transition is allowed.
- func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool {
- for status, allowedTransitions := range producerTxnTransitions {
- if status&t.status != 0 {
- for _, allowedTransition := range allowedTransitions {
- if allowedTransition&target != 0 {
- return true
- }
- }
- }
- }
- return false
- }
- // Get current transaction status.
- func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag {
- t.statusLock.RLock()
- defer t.statusLock.RUnlock()
- return t.status
- }
- // Try to transition to a valid status and return an error otherwise.
- func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error {
- t.statusLock.Lock()
- defer t.statusLock.Unlock()
- if !t.isTransitionValid(target) {
- return ErrTransitionNotAllowed
- }
- if target&ProducerTxnFlagInError != 0 {
- if err == nil {
- return ErrCannotTransitionNilError
- }
- t.lastError = err
- } else {
- t.lastError = nil
- }
- DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target)
- t.status = target
- return err
- }
- func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
- key := fmt.Sprintf("%s-%d", topic, partition)
- t.mutex.Lock()
- defer t.mutex.Unlock()
- sequence := t.sequenceNumbers[key]
- t.sequenceNumbers[key] = sequence + 1
- return sequence, t.producerEpoch
- }
- func (t *transactionManager) bumpEpoch() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- t.producerEpoch++
- for k := range t.sequenceNumbers {
- t.sequenceNumbers[k] = 0
- }
- }
- func (t *transactionManager) getProducerID() (int64, int16) {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- return t.producerID, t.producerEpoch
- }
- // Compute retry backoff considered current attempts.
- func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration {
- if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
- maxRetries := t.client.Config().Producer.Transaction.Retry.Max
- retries := maxRetries - attemptsRemaining
- return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
- }
- return t.client.Config().Producer.Transaction.Retry.Backoff
- }
- // return true is txnmngr is transactinal.
- func (t *transactionManager) isTransactional() bool {
- return t.transactionalID != ""
- }
- // add specified offsets to current transaction.
- func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
- return ErrTransactionNotReady
- }
- if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
- return t.lastError
- }
- if _, ok := t.offsetsInCurrentTxn[groupId]; !ok {
- t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{}
- }
- for topic, offsets := range offsetsToAdd {
- for _, offset := range offsets {
- tp := topicPartition{topic: topic, partition: offset.Partition}
- t.offsetsInCurrentTxn[groupId][tp] = offset
- }
- }
- return nil
- }
- // send txnmgnr save offsets to transaction coordinator.
- func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
- // First AddOffsetsToTxn
- attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
- exec := func(run func() (bool, error), err error) error {
- for attemptsRemaining >= 0 {
- var retry bool
- retry, err = run()
- if !retry {
- return err
- }
- backoff := t.computeBackoff(attemptsRemaining)
- Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
- t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
- time.Sleep(backoff)
- attemptsRemaining--
- }
- return err
- }
- lastError := exec(func() (bool, error) {
- coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
- if err != nil {
- return true, err
- }
- request := &AddOffsetsToTxnRequest{
- TransactionalID: t.transactionalID,
- ProducerEpoch: t.producerEpoch,
- ProducerID: t.producerID,
- GroupID: groupId,
- }
- if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
- // Version 2 adds the support for new error code PRODUCER_FENCED.
- request.Version = 2
- } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
- // Version 1 is the same as version 0.
- request.Version = 1
- }
- response, err := coordinator.AddOffsetsToTxn(request)
- if err != nil {
- // If an error occurred try to refresh current transaction coordinator.
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- return true, err
- }
- if response == nil {
- // If no response is returned just retry.
- return true, ErrTxnUnableToParseResponse
- }
- if response.Err == ErrNoError {
- DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
- t.transactionalID, groupId, response)
- // If no error, just exit.
- return false, nil
- }
- switch response.Err {
- case ErrConsumerCoordinatorNotAvailable:
- fallthrough
- case ErrNotCoordinatorForConsumer:
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- fallthrough
- case ErrOffsetsLoadInProgress:
- fallthrough
- case ErrConcurrentTransactions:
- // Retry
- case ErrUnknownProducerID:
- fallthrough
- case ErrInvalidProducerIDMapping:
- return false, t.abortableErrorIfPossible(response.Err)
- case ErrGroupAuthorizationFailed:
- return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
- default:
- // Others are fatal
- return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
- }
- return true, response.Err
- }, nil)
- if lastError != nil {
- return offsets, lastError
- }
- resultOffsets := offsets
- // Then TxnOffsetCommit
- // note the result is not completed until the TxnOffsetCommit returns
- attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
- execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
- var r topicPartitionOffsets
- for attemptsRemaining >= 0 {
- var retry bool
- r, retry, err = run()
- if !retry {
- return r, err
- }
- backoff := t.computeBackoff(attemptsRemaining)
- Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
- t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
- time.Sleep(backoff)
- attemptsRemaining--
- }
- return r, err
- }
- return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
- consumerGroupCoordinator, err := t.client.Coordinator(groupId)
- if err != nil {
- return resultOffsets, true, err
- }
- request := &TxnOffsetCommitRequest{
- TransactionalID: t.transactionalID,
- ProducerEpoch: t.producerEpoch,
- ProducerID: t.producerID,
- GroupID: groupId,
- Topics: offsets.mapToRequest(),
- }
- if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
- // Version 2 adds the committed leader epoch.
- request.Version = 2
- } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
- // Version 1 is the same as version 0.
- request.Version = 1
- }
- responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
- if err != nil {
- _ = consumerGroupCoordinator.Close()
- _ = t.client.RefreshCoordinator(groupId)
- return resultOffsets, true, err
- }
- if responses == nil {
- return resultOffsets, true, ErrTxnUnableToParseResponse
- }
- var responseErrors []error
- failedTxn := topicPartitionOffsets{}
- for topic, partitionErrors := range responses.Topics {
- for _, partitionError := range partitionErrors {
- switch partitionError.Err {
- case ErrNoError:
- continue
- // If the topic is unknown or the coordinator is loading, retry with the current coordinator
- case ErrRequestTimedOut:
- fallthrough
- case ErrConsumerCoordinatorNotAvailable:
- fallthrough
- case ErrNotCoordinatorForConsumer:
- _ = consumerGroupCoordinator.Close()
- _ = t.client.RefreshCoordinator(groupId)
- fallthrough
- case ErrUnknownTopicOrPartition:
- fallthrough
- case ErrOffsetsLoadInProgress:
- // Do nothing just retry
- case ErrIllegalGeneration:
- fallthrough
- case ErrUnknownMemberId:
- fallthrough
- case ErrFencedInstancedId:
- fallthrough
- case ErrGroupAuthorizationFailed:
- return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
- default:
- // Others are fatal
- return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
- }
- tp := topicPartition{topic: topic, partition: partitionError.Partition}
- failedTxn[tp] = offsets[tp]
- responseErrors = append(responseErrors, partitionError.Err)
- }
- }
- resultOffsets = failedTxn
- if len(resultOffsets) == 0 {
- DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s %+v\n",
- t.transactionalID, groupId)
- return resultOffsets, false, nil
- }
- return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
- }, nil)
- }
- func (t *transactionManager) initProducerId() (int64, int16, error) {
- isEpochBump := false
- req := &InitProducerIDRequest{}
- if t.isTransactional() {
- req.TransactionalID = &t.transactionalID
- req.TransactionTimeout = t.transactionTimeout
- }
- if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
- if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
- // Version 4 adds the support for new error code PRODUCER_FENCED.
- req.Version = 4
- } else {
- // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
- // to resume after an INVALID_PRODUCER_EPOCH error
- req.Version = 3
- }
- isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
- t.coordinatorSupportsBumpingEpoch = true
- req.ProducerID = t.producerID
- req.ProducerEpoch = t.producerEpoch
- } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
- // Version 2 is the first flexible version.
- req.Version = 2
- } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
- // Version 1 is the same as version 0.
- req.Version = 1
- }
- if isEpochBump {
- err := t.transitionTo(ProducerTxnFlagInitializing, nil)
- if err != nil {
- return -1, -1, err
- }
- DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
- t.transactionalID)
- } else {
- DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
- t.transactionalID, t.producerID, t.producerEpoch)
- }
- attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
- exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
- pid := int64(-1)
- pepoch := int16(-1)
- for attemptsRemaining >= 0 {
- var retry bool
- pid, pepoch, retry, err = run()
- if !retry {
- return pid, pepoch, err
- }
- backoff := t.computeBackoff(attemptsRemaining)
- Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
- t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
- time.Sleep(backoff)
- attemptsRemaining--
- }
- return -1, -1, err
- }
- return exec(func() (int64, int16, bool, error) {
- var err error
- var coordinator *Broker
- if t.isTransactional() {
- coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
- } else {
- coordinator = t.client.LeastLoadedBroker()
- }
- if err != nil {
- return -1, -1, true, err
- }
- response, err := coordinator.InitProducerID(req)
- if err != nil {
- if t.isTransactional() {
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- }
- return -1, -1, true, err
- }
- if response == nil {
- return -1, -1, true, ErrTxnUnableToParseResponse
- }
- if response.Err == ErrNoError {
- if isEpochBump {
- t.sequenceNumbers = make(map[string]int32)
- }
- err := t.transitionTo(ProducerTxnFlagReady, nil)
- if err != nil {
- return -1, -1, true, err
- }
- DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
- t.transactionalID, response)
- return response.ProducerID, response.ProducerEpoch, false, nil
- }
- switch response.Err {
- // Retriable errors
- case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
- if t.isTransactional() {
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- }
- // Fatal errors
- default:
- return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
- }
- return -1, -1, true, response.Err
- }, nil)
- }
- // if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal.
- func (t *transactionManager) abortableErrorIfPossible(err error) error {
- if t.coordinatorSupportsBumpingEpoch {
- t.epochBumpRequired = true
- return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
- }
- return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
- }
- // End current transaction.
- func (t *transactionManager) completeTransaction() error {
- if t.epochBumpRequired {
- err := t.transitionTo(ProducerTxnFlagInitializing, nil)
- if err != nil {
- return err
- }
- } else {
- err := t.transitionTo(ProducerTxnFlagReady, nil)
- if err != nil {
- return err
- }
- }
- t.lastError = nil
- t.epochBumpRequired = false
- t.partitionsInCurrentTxn = topicPartitionSet{}
- t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
- t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{}
- return nil
- }
- // send EndTxn request with commit flag. (true when committing false otherwise)
- func (t *transactionManager) endTxn(commit bool) error {
- attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
- exec := func(run func() (bool, error), err error) error {
- for attemptsRemaining >= 0 {
- var retry bool
- retry, err = run()
- if !retry {
- return err
- }
- backoff := t.computeBackoff(attemptsRemaining)
- Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
- t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
- time.Sleep(backoff)
- attemptsRemaining--
- }
- return err
- }
- return exec(func() (bool, error) {
- coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
- if err != nil {
- return true, err
- }
- request := &EndTxnRequest{
- TransactionalID: t.transactionalID,
- ProducerEpoch: t.producerEpoch,
- ProducerID: t.producerID,
- TransactionResult: commit,
- }
- if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
- // Version 2 adds the support for new error code PRODUCER_FENCED.
- request.Version = 2
- } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
- // Version 1 is the same as version 0.
- request.Version = 1
- }
- response, err := coordinator.EndTxn(request)
- if err != nil {
- // Always retry on network error
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- return true, err
- }
- if response == nil {
- return true, ErrTxnUnableToParseResponse
- }
- if response.Err == ErrNoError {
- DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
- t.transactionalID, response)
- return false, t.completeTransaction()
- }
- switch response.Err {
- // Need to refresh coordinator
- case ErrConsumerCoordinatorNotAvailable:
- fallthrough
- case ErrNotCoordinatorForConsumer:
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- fallthrough
- case ErrOffsetsLoadInProgress:
- fallthrough
- case ErrConcurrentTransactions:
- // Just retry
- case ErrUnknownProducerID:
- fallthrough
- case ErrInvalidProducerIDMapping:
- return false, t.abortableErrorIfPossible(response.Err)
- // Fatal errors
- default:
- return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
- }
- return true, response.Err
- }, nil)
- }
- // We will try to publish associated offsets for each groups
- // then send endtxn request to mark transaction as finished.
- func (t *transactionManager) finishTransaction(commit bool) error {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- // Ensure no error when committing or aborting
- if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
- return t.lastError
- } else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
- return t.lastError
- }
- // if no records has been sent don't do anything.
- if len(t.partitionsInCurrentTxn) == 0 {
- return t.completeTransaction()
- }
- epochBump := t.epochBumpRequired
- // If we're aborting the transaction, so there should be no need to add offsets.
- if commit && len(t.offsetsInCurrentTxn) > 0 {
- for group, offsets := range t.offsetsInCurrentTxn {
- newOffsets, err := t.publishOffsetsToTxn(offsets, group)
- if err != nil {
- t.offsetsInCurrentTxn[group] = newOffsets
- return err
- }
- delete(t.offsetsInCurrentTxn, group)
- }
- }
- if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
- return t.lastError
- }
- if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
- err := t.endTxn(commit)
- if err != nil {
- return err
- }
- if !epochBump {
- return nil
- }
- }
- // reset pid and epoch if needed.
- return t.initializeTransactions()
- }
- // called before sending any transactional record
- // won't do anything if current topic-partition is already added to transaction.
- func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
- if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
- return
- }
- tp := topicPartition{topic: topic, partition: partition}
- t.partitionInTxnLock.Lock()
- defer t.partitionInTxnLock.Unlock()
- if _, ok := t.partitionsInCurrentTxn[tp]; ok {
- // partition is already added
- return
- }
- t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
- }
- // Makes a request to kafka to add a list of partitions ot the current transaction.
- func (t *transactionManager) publishTxnPartitions() error {
- t.partitionInTxnLock.Lock()
- defer t.partitionInTxnLock.Unlock()
- if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
- return t.lastError
- }
- if len(t.pendingPartitionsInCurrentTxn) == 0 {
- return nil
- }
- // Remove the partitions from the pending set regardless of the result. We use the presence
- // of partitions in the pending set to know when it is not safe to send batches. However, if
- // the partitions failed to be added and we enter an error state, we expect the batches to be
- // aborted anyway. In this case, we must be able to continue sending the batches which are in
- // retry for partitions that were successfully added.
- removeAllPartitionsOnFatalOrAbortedError := func() {
- t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
- }
- // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
- // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
- // we don't want to wait too long before trying to start the new one.
- //
- // This is only a temporary fix, the long term solution is being tracked in
- // https://issues.apache.org/jira/browse/KAFKA-5482
- retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
- computeBackoff := func(attemptsRemaining int) time.Duration {
- if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
- maxRetries := t.client.Config().Producer.Transaction.Retry.Max
- retries := maxRetries - attemptsRemaining
- return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
- }
- return retryBackoff
- }
- attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
- exec := func(run func() (bool, error), err error) error {
- for attemptsRemaining >= 0 {
- var retry bool
- retry, err = run()
- if !retry {
- return err
- }
- backoff := computeBackoff(attemptsRemaining)
- Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
- time.Sleep(backoff)
- attemptsRemaining--
- }
- return err
- }
- return exec(func() (bool, error) {
- coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
- if err != nil {
- return true, err
- }
- request := &AddPartitionsToTxnRequest{
- TransactionalID: t.transactionalID,
- ProducerID: t.producerID,
- ProducerEpoch: t.producerEpoch,
- TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
- }
- if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
- // Version 2 adds the support for new error code PRODUCER_FENCED.
- request.Version = 2
- } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
- // Version 1 is the same as version 0.
- request.Version = 1
- }
- addPartResponse, err := coordinator.AddPartitionsToTxn(request)
- if err != nil {
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- return true, err
- }
- if addPartResponse == nil {
- return true, ErrTxnUnableToParseResponse
- }
- // remove from the list partitions that have been successfully updated
- var responseErrors []error
- for topic, results := range addPartResponse.Errors {
- for _, response := range results {
- tp := topicPartition{topic: topic, partition: response.Partition}
- switch response.Err {
- case ErrNoError:
- // Mark partition as added to transaction
- t.partitionsInCurrentTxn[tp] = struct{}{}
- delete(t.pendingPartitionsInCurrentTxn, tp)
- continue
- case ErrConsumerCoordinatorNotAvailable:
- fallthrough
- case ErrNotCoordinatorForConsumer:
- _ = coordinator.Close()
- _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
- fallthrough
- case ErrUnknownTopicOrPartition:
- fallthrough
- case ErrOffsetsLoadInProgress:
- // Retry topicPartition
- case ErrConcurrentTransactions:
- if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
- retryBackoff = addPartitionsRetryBackoff
- }
- case ErrOperationNotAttempted:
- fallthrough
- case ErrTopicAuthorizationFailed:
- removeAllPartitionsOnFatalOrAbortedError()
- return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
- case ErrUnknownProducerID:
- fallthrough
- case ErrInvalidProducerIDMapping:
- removeAllPartitionsOnFatalOrAbortedError()
- return false, t.abortableErrorIfPossible(response.Err)
- // Fatal errors
- default:
- removeAllPartitionsOnFatalOrAbortedError()
- return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
- }
- responseErrors = append(responseErrors, response.Err)
- }
- }
- // handle end
- if len(t.pendingPartitionsInCurrentTxn) == 0 {
- DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
- t.transactionalID, addPartResponse)
- return false, nil
- }
- return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
- }, nil)
- }
- // Build a new transaction manager sharing producer client.
- func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
- txnmgr := &transactionManager{
- producerID: noProducerID,
- producerEpoch: noProducerEpoch,
- client: client,
- pendingPartitionsInCurrentTxn: topicPartitionSet{},
- partitionsInCurrentTxn: topicPartitionSet{},
- offsetsInCurrentTxn: make(map[string]topicPartitionOffsets),
- status: ProducerTxnFlagUninitialized,
- }
- if conf.Producer.Idempotent {
- txnmgr.transactionalID = conf.Producer.Transaction.ID
- txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout
- txnmgr.sequenceNumbers = make(map[string]int32)
- txnmgr.mutex = sync.Mutex{}
- var err error
- txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId()
- if err != nil {
- return nil, err
- }
- Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n",
- txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch)
- }
- return txnmgr, nil
- }
- // re-init producer-id and producer-epoch if needed.
- func (t *transactionManager) initializeTransactions() (err error) {
- t.producerID, t.producerEpoch, err = t.initProducerId()
- return
- }
|