sync_group_response.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package sarama
  2. import "time"
  3. type SyncGroupResponse struct {
  4. // Version defines the protocol version to use for encode and decode
  5. Version int16
  6. // ThrottleTime contains the duration in milliseconds for which the
  7. // request was throttled due to a quota violation, or zero if the request
  8. // did not violate any quota.
  9. ThrottleTime int32
  10. // Err contains the error code, or 0 if there was no error.
  11. Err KError
  12. // MemberAssignment contains the member assignment.
  13. MemberAssignment []byte
  14. }
  15. func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
  16. assignment := new(ConsumerGroupMemberAssignment)
  17. err := decode(r.MemberAssignment, assignment, nil)
  18. return assignment, err
  19. }
  20. func (r *SyncGroupResponse) encode(pe packetEncoder) error {
  21. if r.Version >= 1 {
  22. pe.putInt32(r.ThrottleTime)
  23. }
  24. pe.putInt16(int16(r.Err))
  25. return pe.putBytes(r.MemberAssignment)
  26. }
  27. func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) {
  28. r.Version = version
  29. if r.Version >= 1 {
  30. if r.ThrottleTime, err = pd.getInt32(); err != nil {
  31. return err
  32. }
  33. }
  34. kerr, err := pd.getInt16()
  35. if err != nil {
  36. return err
  37. }
  38. r.Err = KError(kerr)
  39. r.MemberAssignment, err = pd.getBytes()
  40. return
  41. }
  42. func (r *SyncGroupResponse) key() int16 {
  43. return 14
  44. }
  45. func (r *SyncGroupResponse) version() int16 {
  46. return r.Version
  47. }
  48. func (r *SyncGroupResponse) headerVersion() int16 {
  49. return 0
  50. }
  51. func (r *SyncGroupResponse) isValidVersion() bool {
  52. return r.Version >= 0 && r.Version <= 3
  53. }
  54. func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
  55. switch r.Version {
  56. case 3:
  57. return V2_3_0_0
  58. case 2:
  59. return V2_0_0_0
  60. case 1:
  61. return V0_11_0_0
  62. case 0:
  63. return V0_9_0_0
  64. default:
  65. return V2_3_0_0
  66. }
  67. }
  68. func (r *SyncGroupResponse) throttleTime() time.Duration {
  69. return time.Duration(r.ThrottleTime) * time.Millisecond
  70. }