consumer_group_members.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package sarama
  2. import "errors"
  3. // ConsumerGroupMemberMetadata holds the metadata for consumer group
  4. // https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
  5. type ConsumerGroupMemberMetadata struct {
  6. Version int16
  7. Topics []string
  8. UserData []byte
  9. OwnedPartitions []*OwnedPartition
  10. GenerationID int32
  11. RackID *string
  12. }
  13. func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
  14. pe.putInt16(m.Version)
  15. if err := pe.putStringArray(m.Topics); err != nil {
  16. return err
  17. }
  18. if err := pe.putBytes(m.UserData); err != nil {
  19. return err
  20. }
  21. if m.Version >= 1 {
  22. if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil {
  23. return err
  24. }
  25. for _, op := range m.OwnedPartitions {
  26. if err := op.encode(pe); err != nil {
  27. return err
  28. }
  29. }
  30. }
  31. if m.Version >= 2 {
  32. pe.putInt32(m.GenerationID)
  33. }
  34. if m.Version >= 3 {
  35. if err := pe.putNullableString(m.RackID); err != nil {
  36. return err
  37. }
  38. }
  39. return nil
  40. }
  41. func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
  42. if m.Version, err = pd.getInt16(); err != nil {
  43. return
  44. }
  45. if m.Topics, err = pd.getStringArray(); err != nil {
  46. return
  47. }
  48. if m.UserData, err = pd.getBytes(); err != nil {
  49. return
  50. }
  51. if m.Version >= 1 {
  52. n, err := pd.getArrayLength()
  53. if err != nil {
  54. // permit missing data here in case of misbehaving 3rd party
  55. // clients who incorrectly marked the member metadata as V1 in
  56. // their JoinGroup request
  57. if errors.Is(err, ErrInsufficientData) {
  58. return nil
  59. }
  60. return err
  61. }
  62. if n > 0 {
  63. m.OwnedPartitions = make([]*OwnedPartition, n)
  64. for i := 0; i < n; i++ {
  65. m.OwnedPartitions[i] = &OwnedPartition{}
  66. if err := m.OwnedPartitions[i].decode(pd); err != nil {
  67. return err
  68. }
  69. }
  70. }
  71. }
  72. if m.Version >= 2 {
  73. if m.GenerationID, err = pd.getInt32(); err != nil {
  74. return err
  75. }
  76. }
  77. if m.Version >= 3 {
  78. if m.RackID, err = pd.getNullableString(); err != nil {
  79. return err
  80. }
  81. }
  82. return nil
  83. }
  84. type OwnedPartition struct {
  85. Topic string
  86. Partitions []int32
  87. }
  88. func (m *OwnedPartition) encode(pe packetEncoder) error {
  89. if err := pe.putString(m.Topic); err != nil {
  90. return err
  91. }
  92. if err := pe.putInt32Array(m.Partitions); err != nil {
  93. return err
  94. }
  95. return nil
  96. }
  97. func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
  98. if m.Topic, err = pd.getString(); err != nil {
  99. return err
  100. }
  101. if m.Partitions, err = pd.getInt32Array(); err != nil {
  102. return err
  103. }
  104. return nil
  105. }
  106. // ConsumerGroupMemberAssignment holds the member assignment for a consume group
  107. // https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
  108. type ConsumerGroupMemberAssignment struct {
  109. Version int16
  110. Topics map[string][]int32
  111. UserData []byte
  112. }
  113. func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
  114. pe.putInt16(m.Version)
  115. if err := pe.putArrayLength(len(m.Topics)); err != nil {
  116. return err
  117. }
  118. for topic, partitions := range m.Topics {
  119. if err := pe.putString(topic); err != nil {
  120. return err
  121. }
  122. if err := pe.putInt32Array(partitions); err != nil {
  123. return err
  124. }
  125. }
  126. if err := pe.putBytes(m.UserData); err != nil {
  127. return err
  128. }
  129. return nil
  130. }
  131. func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
  132. if m.Version, err = pd.getInt16(); err != nil {
  133. return
  134. }
  135. var topicLen int
  136. if topicLen, err = pd.getArrayLength(); err != nil {
  137. return
  138. }
  139. m.Topics = make(map[string][]int32, topicLen)
  140. for i := 0; i < topicLen; i++ {
  141. var topic string
  142. if topic, err = pd.getString(); err != nil {
  143. return
  144. }
  145. if m.Topics[topic], err = pd.getInt32Array(); err != nil {
  146. return
  147. }
  148. }
  149. if m.UserData, err = pd.getBytes(); err != nil {
  150. return
  151. }
  152. return nil
  153. }