123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package sarama
- import "time"
- type DescribeGroupsResponse struct {
- // Version defines the protocol version to use for encode and decode
- Version int16
- // ThrottleTimeMs contains the duration in milliseconds for which the
- // request was throttled due to a quota violation, or zero if the request
- // did not violate any quota.
- ThrottleTimeMs int32
- // Groups contains each described group.
- Groups []*GroupDescription
- }
- func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
- if r.Version >= 1 {
- pe.putInt32(r.ThrottleTimeMs)
- }
- if err := pe.putArrayLength(len(r.Groups)); err != nil {
- return err
- }
- for _, block := range r.Groups {
- if err := block.encode(pe, r.Version); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
- r.Version = version
- if r.Version >= 1 {
- if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
- return err
- }
- }
- if numGroups, err := pd.getArrayLength(); err != nil {
- return err
- } else if numGroups > 0 {
- r.Groups = make([]*GroupDescription, numGroups)
- for i := 0; i < numGroups; i++ {
- block := &GroupDescription{}
- if err := block.decode(pd, r.Version); err != nil {
- return err
- }
- r.Groups[i] = block
- }
- }
- return nil
- }
- func (r *DescribeGroupsResponse) key() int16 {
- return 15
- }
- func (r *DescribeGroupsResponse) version() int16 {
- return r.Version
- }
- func (r *DescribeGroupsResponse) headerVersion() int16 {
- return 0
- }
- func (r *DescribeGroupsResponse) isValidVersion() bool {
- return r.Version >= 0 && r.Version <= 4
- }
- func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 4:
- return V2_4_0_0
- case 3:
- return V2_3_0_0
- case 2:
- return V2_0_0_0
- case 1:
- return V0_11_0_0
- case 0:
- return V0_9_0_0
- default:
- return V2_4_0_0
- }
- }
- func (r *DescribeGroupsResponse) throttleTime() time.Duration {
- return time.Duration(r.ThrottleTimeMs) * time.Millisecond
- }
- // GroupDescription contains each described group.
- type GroupDescription struct {
- // Version defines the protocol version to use for encode and decode
- Version int16
- // Err contains the describe error as the KError type.
- Err KError
- // ErrorCode contains the describe error, or 0 if there was no error.
- ErrorCode int16
- // GroupId contains the group ID string.
- GroupId string
- // State contains the group state string, or the empty string.
- State string
- // ProtocolType contains the group protocol type, or the empty string.
- ProtocolType string
- // Protocol contains the group protocol data, or the empty string.
- Protocol string
- // Members contains the group members.
- Members map[string]*GroupMemberDescription
- // AuthorizedOperations contains a 32-bit bitfield to represent authorized
- // operations for this group.
- AuthorizedOperations int32
- }
- func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) {
- gd.Version = version
- pe.putInt16(gd.ErrorCode)
- if err := pe.putString(gd.GroupId); err != nil {
- return err
- }
- if err := pe.putString(gd.State); err != nil {
- return err
- }
- if err := pe.putString(gd.ProtocolType); err != nil {
- return err
- }
- if err := pe.putString(gd.Protocol); err != nil {
- return err
- }
- if err := pe.putArrayLength(len(gd.Members)); err != nil {
- return err
- }
- for _, block := range gd.Members {
- if err := block.encode(pe, gd.Version); err != nil {
- return err
- }
- }
- if gd.Version >= 3 {
- pe.putInt32(gd.AuthorizedOperations)
- }
- return nil
- }
- func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) {
- gd.Version = version
- if gd.ErrorCode, err = pd.getInt16(); err != nil {
- return err
- }
- gd.Err = KError(gd.ErrorCode)
- if gd.GroupId, err = pd.getString(); err != nil {
- return err
- }
- if gd.State, err = pd.getString(); err != nil {
- return err
- }
- if gd.ProtocolType, err = pd.getString(); err != nil {
- return err
- }
- if gd.Protocol, err = pd.getString(); err != nil {
- return err
- }
- if numMembers, err := pd.getArrayLength(); err != nil {
- return err
- } else if numMembers > 0 {
- gd.Members = make(map[string]*GroupMemberDescription, numMembers)
- for i := 0; i < numMembers; i++ {
- block := &GroupMemberDescription{}
- if err := block.decode(pd, gd.Version); err != nil {
- return err
- }
- gd.Members[block.MemberId] = block
- }
- }
- if gd.Version >= 3 {
- if gd.AuthorizedOperations, err = pd.getInt32(); err != nil {
- return err
- }
- }
- return nil
- }
- // GroupMemberDescription contains the group members.
- type GroupMemberDescription struct {
- // Version defines the protocol version to use for encode and decode
- Version int16
- // MemberId contains the member ID assigned by the group coordinator.
- MemberId string
- // GroupInstanceId contains the unique identifier of the consumer instance
- // provided by end user.
- GroupInstanceId *string
- // ClientId contains the client ID used in the member's latest join group
- // request.
- ClientId string
- // ClientHost contains the client host.
- ClientHost string
- // MemberMetadata contains the metadata corresponding to the current group
- // protocol in use.
- MemberMetadata []byte
- // MemberAssignment contains the current assignment provided by the group
- // leader.
- MemberAssignment []byte
- }
- func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) {
- gmd.Version = version
- if err := pe.putString(gmd.MemberId); err != nil {
- return err
- }
- if gmd.Version >= 4 {
- if err := pe.putNullableString(gmd.GroupInstanceId); err != nil {
- return err
- }
- }
- if err := pe.putString(gmd.ClientId); err != nil {
- return err
- }
- if err := pe.putString(gmd.ClientHost); err != nil {
- return err
- }
- if err := pe.putBytes(gmd.MemberMetadata); err != nil {
- return err
- }
- if err := pe.putBytes(gmd.MemberAssignment); err != nil {
- return err
- }
- return nil
- }
- func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) {
- gmd.Version = version
- if gmd.MemberId, err = pd.getString(); err != nil {
- return err
- }
- if gmd.Version >= 4 {
- if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil {
- return err
- }
- }
- if gmd.ClientId, err = pd.getString(); err != nil {
- return err
- }
- if gmd.ClientHost, err = pd.getString(); err != nil {
- return err
- }
- if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
- return err
- }
- if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
- return err
- }
- return nil
- }
- func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
- if len(gmd.MemberAssignment) == 0 {
- return nil, nil
- }
- assignment := new(ConsumerGroupMemberAssignment)
- err := decode(gmd.MemberAssignment, assignment, nil)
- return assignment, err
- }
- func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
- if len(gmd.MemberMetadata) == 0 {
- return nil, nil
- }
- metadata := new(ConsumerGroupMemberMetadata)
- err := decode(gmd.MemberMetadata, metadata, nil)
- return metadata, err
- }
|