sync_group_request.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package sarama
  2. type SyncGroupRequestAssignment struct {
  3. // MemberId contains the ID of the member to assign.
  4. MemberId string
  5. // Assignment contains the member assignment.
  6. Assignment []byte
  7. }
  8. func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
  9. if err := pe.putString(a.MemberId); err != nil {
  10. return err
  11. }
  12. if err := pe.putBytes(a.Assignment); err != nil {
  13. return err
  14. }
  15. return nil
  16. }
  17. func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
  18. if a.MemberId, err = pd.getString(); err != nil {
  19. return err
  20. }
  21. if a.Assignment, err = pd.getBytes(); err != nil {
  22. return err
  23. }
  24. return nil
  25. }
  26. type SyncGroupRequest struct {
  27. // Version defines the protocol version to use for encode and decode
  28. Version int16
  29. // GroupId contains the unique group identifier.
  30. GroupId string
  31. // GenerationId contains the generation of the group.
  32. GenerationId int32
  33. // MemberId contains the member ID assigned by the group.
  34. MemberId string
  35. // GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
  36. GroupInstanceId *string
  37. // GroupAssignments contains each assignment.
  38. GroupAssignments []SyncGroupRequestAssignment
  39. }
  40. func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
  41. if err := pe.putString(s.GroupId); err != nil {
  42. return err
  43. }
  44. pe.putInt32(s.GenerationId)
  45. if err := pe.putString(s.MemberId); err != nil {
  46. return err
  47. }
  48. if s.Version >= 3 {
  49. if err := pe.putNullableString(s.GroupInstanceId); err != nil {
  50. return err
  51. }
  52. }
  53. if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
  54. return err
  55. }
  56. for _, block := range s.GroupAssignments {
  57. if err := block.encode(pe, s.Version); err != nil {
  58. return err
  59. }
  60. }
  61. return nil
  62. }
  63. func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
  64. s.Version = version
  65. if s.GroupId, err = pd.getString(); err != nil {
  66. return err
  67. }
  68. if s.GenerationId, err = pd.getInt32(); err != nil {
  69. return err
  70. }
  71. if s.MemberId, err = pd.getString(); err != nil {
  72. return err
  73. }
  74. if s.Version >= 3 {
  75. if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
  76. return err
  77. }
  78. }
  79. if numAssignments, err := pd.getArrayLength(); err != nil {
  80. return err
  81. } else if numAssignments > 0 {
  82. s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
  83. for i := 0; i < numAssignments; i++ {
  84. var block SyncGroupRequestAssignment
  85. if err := block.decode(pd, s.Version); err != nil {
  86. return err
  87. }
  88. s.GroupAssignments[i] = block
  89. }
  90. }
  91. return nil
  92. }
  93. func (r *SyncGroupRequest) key() int16 {
  94. return 14
  95. }
  96. func (r *SyncGroupRequest) version() int16 {
  97. return r.Version
  98. }
  99. func (r *SyncGroupRequest) headerVersion() int16 {
  100. return 1
  101. }
  102. func (r *SyncGroupRequest) isValidVersion() bool {
  103. return r.Version >= 0 && r.Version <= 3
  104. }
  105. func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
  106. switch r.Version {
  107. case 3:
  108. return V2_3_0_0
  109. case 2:
  110. return V2_0_0_0
  111. case 1:
  112. return V0_11_0_0
  113. case 0:
  114. return V0_9_0_0
  115. default:
  116. return V2_3_0_0
  117. }
  118. }
  119. func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
  120. r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
  121. MemberId: memberId,
  122. Assignment: memberAssignment,
  123. })
  124. }
  125. func (r *SyncGroupRequest) AddGroupAssignmentMember(
  126. memberId string,
  127. memberAssignment *ConsumerGroupMemberAssignment,
  128. ) error {
  129. bin, err := encode(memberAssignment, nil)
  130. if err != nil {
  131. return err
  132. }
  133. r.AddGroupAssignment(memberId, bin)
  134. return nil
  135. }