heartbeat_request.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package sarama
  2. type HeartbeatRequest struct {
  3. Version int16
  4. GroupId string
  5. GenerationId int32
  6. MemberId string
  7. GroupInstanceId *string
  8. }
  9. func (r *HeartbeatRequest) encode(pe packetEncoder) error {
  10. if err := pe.putString(r.GroupId); err != nil {
  11. return err
  12. }
  13. pe.putInt32(r.GenerationId)
  14. if err := pe.putString(r.MemberId); err != nil {
  15. return err
  16. }
  17. if r.Version >= 3 {
  18. if err := pe.putNullableString(r.GroupInstanceId); err != nil {
  19. return err
  20. }
  21. }
  22. return nil
  23. }
  24. func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) {
  25. r.Version = version
  26. if r.GroupId, err = pd.getString(); err != nil {
  27. return
  28. }
  29. if r.GenerationId, err = pd.getInt32(); err != nil {
  30. return
  31. }
  32. if r.MemberId, err = pd.getString(); err != nil {
  33. return
  34. }
  35. if r.Version >= 3 {
  36. if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
  37. return
  38. }
  39. }
  40. return nil
  41. }
  42. func (r *HeartbeatRequest) key() int16 {
  43. return 12
  44. }
  45. func (r *HeartbeatRequest) version() int16 {
  46. return r.Version
  47. }
  48. func (r *HeartbeatRequest) headerVersion() int16 {
  49. return 1
  50. }
  51. func (r *HeartbeatRequest) isValidVersion() bool {
  52. return r.Version >= 0 && r.Version <= 3
  53. }
  54. func (r *HeartbeatRequest) requiredVersion() KafkaVersion {
  55. switch r.Version {
  56. case 3:
  57. return V2_3_0_0
  58. case 2:
  59. return V2_0_0_0
  60. case 1:
  61. return V0_11_0_0
  62. case 0:
  63. return V0_8_2_0
  64. default:
  65. return V2_3_0_0
  66. }
  67. }