describe_groups_response.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package sarama
  2. import "time"
  3. type DescribeGroupsResponse struct {
  4. // Version defines the protocol version to use for encode and decode
  5. Version int16
  6. // ThrottleTimeMs contains the duration in milliseconds for which the
  7. // request was throttled due to a quota violation, or zero if the request
  8. // did not violate any quota.
  9. ThrottleTimeMs int32
  10. // Groups contains each described group.
  11. Groups []*GroupDescription
  12. }
  13. func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
  14. if r.Version >= 1 {
  15. pe.putInt32(r.ThrottleTimeMs)
  16. }
  17. if err := pe.putArrayLength(len(r.Groups)); err != nil {
  18. return err
  19. }
  20. for _, block := range r.Groups {
  21. if err := block.encode(pe, r.Version); err != nil {
  22. return err
  23. }
  24. }
  25. return nil
  26. }
  27. func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
  28. r.Version = version
  29. if r.Version >= 1 {
  30. if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
  31. return err
  32. }
  33. }
  34. if numGroups, err := pd.getArrayLength(); err != nil {
  35. return err
  36. } else if numGroups > 0 {
  37. r.Groups = make([]*GroupDescription, numGroups)
  38. for i := 0; i < numGroups; i++ {
  39. block := &GroupDescription{}
  40. if err := block.decode(pd, r.Version); err != nil {
  41. return err
  42. }
  43. r.Groups[i] = block
  44. }
  45. }
  46. return nil
  47. }
  48. func (r *DescribeGroupsResponse) key() int16 {
  49. return 15
  50. }
  51. func (r *DescribeGroupsResponse) version() int16 {
  52. return r.Version
  53. }
  54. func (r *DescribeGroupsResponse) headerVersion() int16 {
  55. return 0
  56. }
  57. func (r *DescribeGroupsResponse) isValidVersion() bool {
  58. return r.Version >= 0 && r.Version <= 4
  59. }
  60. func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
  61. switch r.Version {
  62. case 4:
  63. return V2_4_0_0
  64. case 3:
  65. return V2_3_0_0
  66. case 2:
  67. return V2_0_0_0
  68. case 1:
  69. return V0_11_0_0
  70. case 0:
  71. return V0_9_0_0
  72. default:
  73. return V2_4_0_0
  74. }
  75. }
  76. func (r *DescribeGroupsResponse) throttleTime() time.Duration {
  77. return time.Duration(r.ThrottleTimeMs) * time.Millisecond
  78. }
  79. // GroupDescription contains each described group.
  80. type GroupDescription struct {
  81. // Version defines the protocol version to use for encode and decode
  82. Version int16
  83. // Err contains the describe error as the KError type.
  84. Err KError
  85. // ErrorCode contains the describe error, or 0 if there was no error.
  86. ErrorCode int16
  87. // GroupId contains the group ID string.
  88. GroupId string
  89. // State contains the group state string, or the empty string.
  90. State string
  91. // ProtocolType contains the group protocol type, or the empty string.
  92. ProtocolType string
  93. // Protocol contains the group protocol data, or the empty string.
  94. Protocol string
  95. // Members contains the group members.
  96. Members map[string]*GroupMemberDescription
  97. // AuthorizedOperations contains a 32-bit bitfield to represent authorized
  98. // operations for this group.
  99. AuthorizedOperations int32
  100. }
  101. func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) {
  102. gd.Version = version
  103. pe.putInt16(gd.ErrorCode)
  104. if err := pe.putString(gd.GroupId); err != nil {
  105. return err
  106. }
  107. if err := pe.putString(gd.State); err != nil {
  108. return err
  109. }
  110. if err := pe.putString(gd.ProtocolType); err != nil {
  111. return err
  112. }
  113. if err := pe.putString(gd.Protocol); err != nil {
  114. return err
  115. }
  116. if err := pe.putArrayLength(len(gd.Members)); err != nil {
  117. return err
  118. }
  119. for _, block := range gd.Members {
  120. if err := block.encode(pe, gd.Version); err != nil {
  121. return err
  122. }
  123. }
  124. if gd.Version >= 3 {
  125. pe.putInt32(gd.AuthorizedOperations)
  126. }
  127. return nil
  128. }
  129. func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) {
  130. gd.Version = version
  131. if gd.ErrorCode, err = pd.getInt16(); err != nil {
  132. return err
  133. }
  134. gd.Err = KError(gd.ErrorCode)
  135. if gd.GroupId, err = pd.getString(); err != nil {
  136. return err
  137. }
  138. if gd.State, err = pd.getString(); err != nil {
  139. return err
  140. }
  141. if gd.ProtocolType, err = pd.getString(); err != nil {
  142. return err
  143. }
  144. if gd.Protocol, err = pd.getString(); err != nil {
  145. return err
  146. }
  147. if numMembers, err := pd.getArrayLength(); err != nil {
  148. return err
  149. } else if numMembers > 0 {
  150. gd.Members = make(map[string]*GroupMemberDescription, numMembers)
  151. for i := 0; i < numMembers; i++ {
  152. block := &GroupMemberDescription{}
  153. if err := block.decode(pd, gd.Version); err != nil {
  154. return err
  155. }
  156. gd.Members[block.MemberId] = block
  157. }
  158. }
  159. if gd.Version >= 3 {
  160. if gd.AuthorizedOperations, err = pd.getInt32(); err != nil {
  161. return err
  162. }
  163. }
  164. return nil
  165. }
  166. // GroupMemberDescription contains the group members.
  167. type GroupMemberDescription struct {
  168. // Version defines the protocol version to use for encode and decode
  169. Version int16
  170. // MemberId contains the member ID assigned by the group coordinator.
  171. MemberId string
  172. // GroupInstanceId contains the unique identifier of the consumer instance
  173. // provided by end user.
  174. GroupInstanceId *string
  175. // ClientId contains the client ID used in the member's latest join group
  176. // request.
  177. ClientId string
  178. // ClientHost contains the client host.
  179. ClientHost string
  180. // MemberMetadata contains the metadata corresponding to the current group
  181. // protocol in use.
  182. MemberMetadata []byte
  183. // MemberAssignment contains the current assignment provided by the group
  184. // leader.
  185. MemberAssignment []byte
  186. }
  187. func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) {
  188. gmd.Version = version
  189. if err := pe.putString(gmd.MemberId); err != nil {
  190. return err
  191. }
  192. if gmd.Version >= 4 {
  193. if err := pe.putNullableString(gmd.GroupInstanceId); err != nil {
  194. return err
  195. }
  196. }
  197. if err := pe.putString(gmd.ClientId); err != nil {
  198. return err
  199. }
  200. if err := pe.putString(gmd.ClientHost); err != nil {
  201. return err
  202. }
  203. if err := pe.putBytes(gmd.MemberMetadata); err != nil {
  204. return err
  205. }
  206. if err := pe.putBytes(gmd.MemberAssignment); err != nil {
  207. return err
  208. }
  209. return nil
  210. }
  211. func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) {
  212. gmd.Version = version
  213. if gmd.MemberId, err = pd.getString(); err != nil {
  214. return err
  215. }
  216. if gmd.Version >= 4 {
  217. if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil {
  218. return err
  219. }
  220. }
  221. if gmd.ClientId, err = pd.getString(); err != nil {
  222. return err
  223. }
  224. if gmd.ClientHost, err = pd.getString(); err != nil {
  225. return err
  226. }
  227. if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
  228. return err
  229. }
  230. if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
  231. return err
  232. }
  233. return nil
  234. }
  235. func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
  236. if len(gmd.MemberAssignment) == 0 {
  237. return nil, nil
  238. }
  239. assignment := new(ConsumerGroupMemberAssignment)
  240. err := decode(gmd.MemberAssignment, assignment, nil)
  241. return assignment, err
  242. }
  243. func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
  244. if len(gmd.MemberMetadata) == 0 {
  245. return nil, nil
  246. }
  247. metadata := new(ConsumerGroupMemberMetadata)
  248. err := decode(gmd.MemberMetadata, metadata, nil)
  249. return metadata, err
  250. }