init_producer_id_response.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package sarama
  2. import "time"
  3. type InitProducerIDResponse struct {
  4. ThrottleTime time.Duration
  5. Err KError
  6. Version int16
  7. ProducerID int64
  8. ProducerEpoch int16
  9. }
  10. func (i *InitProducerIDResponse) encode(pe packetEncoder) error {
  11. pe.putInt32(int32(i.ThrottleTime / time.Millisecond))
  12. pe.putInt16(int16(i.Err))
  13. pe.putInt64(i.ProducerID)
  14. pe.putInt16(i.ProducerEpoch)
  15. if i.Version >= 2 {
  16. pe.putEmptyTaggedFieldArray()
  17. }
  18. return nil
  19. }
  20. func (i *InitProducerIDResponse) decode(pd packetDecoder, version int16) (err error) {
  21. i.Version = version
  22. throttleTime, err := pd.getInt32()
  23. if err != nil {
  24. return err
  25. }
  26. i.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  27. kerr, err := pd.getInt16()
  28. if err != nil {
  29. return err
  30. }
  31. i.Err = KError(kerr)
  32. if i.ProducerID, err = pd.getInt64(); err != nil {
  33. return err
  34. }
  35. if i.ProducerEpoch, err = pd.getInt16(); err != nil {
  36. return err
  37. }
  38. if i.Version >= 2 {
  39. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  40. return err
  41. }
  42. }
  43. return nil
  44. }
  45. func (i *InitProducerIDResponse) key() int16 {
  46. return 22
  47. }
  48. func (i *InitProducerIDResponse) version() int16 {
  49. return i.Version
  50. }
  51. func (i *InitProducerIDResponse) headerVersion() int16 {
  52. if i.Version >= 2 {
  53. return 1
  54. }
  55. return 0
  56. }
  57. func (i *InitProducerIDResponse) isValidVersion() bool {
  58. return i.Version >= 0 && i.Version <= 4
  59. }
  60. func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
  61. switch i.Version {
  62. case 4:
  63. return V2_7_0_0
  64. case 3:
  65. return V2_5_0_0
  66. case 2:
  67. return V2_4_0_0
  68. case 1:
  69. return V2_0_0_0
  70. default:
  71. return V0_11_0_0
  72. }
  73. }
  74. func (r *InitProducerIDResponse) throttleTime() time.Duration {
  75. return r.ThrottleTime
  76. }