heartbeat_response.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package sarama
  2. import "time"
  3. type HeartbeatResponse struct {
  4. Version int16
  5. ThrottleTime int32
  6. Err KError
  7. }
  8. func (r *HeartbeatResponse) encode(pe packetEncoder) error {
  9. if r.Version >= 1 {
  10. pe.putInt32(r.ThrottleTime)
  11. }
  12. pe.putInt16(int16(r.Err))
  13. return nil
  14. }
  15. func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error {
  16. var err error
  17. r.Version = version
  18. if r.Version >= 1 {
  19. if r.ThrottleTime, err = pd.getInt32(); err != nil {
  20. return err
  21. }
  22. }
  23. kerr, err := pd.getInt16()
  24. if err != nil {
  25. return err
  26. }
  27. r.Err = KError(kerr)
  28. return nil
  29. }
  30. func (r *HeartbeatResponse) key() int16 {
  31. return 12
  32. }
  33. func (r *HeartbeatResponse) version() int16 {
  34. return r.Version
  35. }
  36. func (r *HeartbeatResponse) headerVersion() int16 {
  37. return 0
  38. }
  39. func (r *HeartbeatResponse) isValidVersion() bool {
  40. return r.Version >= 0 && r.Version <= 3
  41. }
  42. func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
  43. switch r.Version {
  44. case 3:
  45. return V2_3_0_0
  46. case 2:
  47. return V2_0_0_0
  48. case 1:
  49. return V0_11_0_0
  50. case 0:
  51. return V0_8_2_0
  52. default:
  53. return V2_3_0_0
  54. }
  55. }
  56. func (r *HeartbeatResponse) throttleTime() time.Duration {
  57. return time.Duration(r.ThrottleTime) * time.Millisecond
  58. }