end_txn_request.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package sarama
  2. type EndTxnRequest struct {
  3. Version int16
  4. TransactionalID string
  5. ProducerID int64
  6. ProducerEpoch int16
  7. TransactionResult bool
  8. }
  9. func (a *EndTxnRequest) encode(pe packetEncoder) error {
  10. if err := pe.putString(a.TransactionalID); err != nil {
  11. return err
  12. }
  13. pe.putInt64(a.ProducerID)
  14. pe.putInt16(a.ProducerEpoch)
  15. pe.putBool(a.TransactionResult)
  16. return nil
  17. }
  18. func (a *EndTxnRequest) decode(pd packetDecoder, version int16) (err error) {
  19. if a.TransactionalID, err = pd.getString(); err != nil {
  20. return err
  21. }
  22. if a.ProducerID, err = pd.getInt64(); err != nil {
  23. return err
  24. }
  25. if a.ProducerEpoch, err = pd.getInt16(); err != nil {
  26. return err
  27. }
  28. if a.TransactionResult, err = pd.getBool(); err != nil {
  29. return err
  30. }
  31. return nil
  32. }
  33. func (a *EndTxnRequest) key() int16 {
  34. return 26
  35. }
  36. func (a *EndTxnRequest) version() int16 {
  37. return a.Version
  38. }
  39. func (r *EndTxnRequest) headerVersion() int16 {
  40. return 1
  41. }
  42. func (a *EndTxnRequest) isValidVersion() bool {
  43. return a.Version >= 0 && a.Version <= 2
  44. }
  45. func (a *EndTxnRequest) requiredVersion() KafkaVersion {
  46. switch a.Version {
  47. case 2:
  48. return V2_7_0_0
  49. case 1:
  50. return V2_0_0_0
  51. default:
  52. return V0_11_0_0
  53. }
  54. }