init_producer_id_request.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package sarama
  2. import "time"
  3. type InitProducerIDRequest struct {
  4. Version int16
  5. TransactionalID *string
  6. TransactionTimeout time.Duration
  7. ProducerID int64
  8. ProducerEpoch int16
  9. }
  10. func (i *InitProducerIDRequest) encode(pe packetEncoder) error {
  11. if i.Version < 2 {
  12. if err := pe.putNullableString(i.TransactionalID); err != nil {
  13. return err
  14. }
  15. } else {
  16. if err := pe.putNullableCompactString(i.TransactionalID); err != nil {
  17. return err
  18. }
  19. }
  20. pe.putInt32(int32(i.TransactionTimeout / time.Millisecond))
  21. if i.Version >= 3 {
  22. pe.putInt64(i.ProducerID)
  23. pe.putInt16(i.ProducerEpoch)
  24. }
  25. if i.Version >= 2 {
  26. pe.putEmptyTaggedFieldArray()
  27. }
  28. return nil
  29. }
  30. func (i *InitProducerIDRequest) decode(pd packetDecoder, version int16) (err error) {
  31. i.Version = version
  32. if i.Version < 2 {
  33. if i.TransactionalID, err = pd.getNullableString(); err != nil {
  34. return err
  35. }
  36. } else {
  37. if i.TransactionalID, err = pd.getCompactNullableString(); err != nil {
  38. return err
  39. }
  40. }
  41. timeout, err := pd.getInt32()
  42. if err != nil {
  43. return err
  44. }
  45. i.TransactionTimeout = time.Duration(timeout) * time.Millisecond
  46. if i.Version >= 3 {
  47. if i.ProducerID, err = pd.getInt64(); err != nil {
  48. return err
  49. }
  50. if i.ProducerEpoch, err = pd.getInt16(); err != nil {
  51. return err
  52. }
  53. }
  54. if i.Version >= 2 {
  55. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  56. return err
  57. }
  58. }
  59. return nil
  60. }
  61. func (i *InitProducerIDRequest) key() int16 {
  62. return 22
  63. }
  64. func (i *InitProducerIDRequest) version() int16 {
  65. return i.Version
  66. }
  67. func (i *InitProducerIDRequest) headerVersion() int16 {
  68. if i.Version >= 2 {
  69. return 2
  70. }
  71. return 1
  72. }
  73. func (i *InitProducerIDRequest) isValidVersion() bool {
  74. return i.Version >= 0 && i.Version <= 4
  75. }
  76. func (i *InitProducerIDRequest) requiredVersion() KafkaVersion {
  77. switch i.Version {
  78. case 4:
  79. return V2_7_0_0
  80. case 3:
  81. return V2_5_0_0
  82. case 2:
  83. return V2_4_0_0
  84. case 1:
  85. return V2_0_0_0
  86. case 0:
  87. return V0_11_0_0
  88. default:
  89. return V2_7_0_0
  90. }
  91. }