123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package sarama
- type SyncGroupRequestAssignment struct {
- // MemberId contains the ID of the member to assign.
- MemberId string
- // Assignment contains the member assignment.
- Assignment []byte
- }
- func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
- if err := pe.putString(a.MemberId); err != nil {
- return err
- }
- if err := pe.putBytes(a.Assignment); err != nil {
- return err
- }
- return nil
- }
- func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
- if a.MemberId, err = pd.getString(); err != nil {
- return err
- }
- if a.Assignment, err = pd.getBytes(); err != nil {
- return err
- }
- return nil
- }
- type SyncGroupRequest struct {
- // Version defines the protocol version to use for encode and decode
- Version int16
- // GroupId contains the unique group identifier.
- GroupId string
- // GenerationId contains the generation of the group.
- GenerationId int32
- // MemberId contains the member ID assigned by the group.
- MemberId string
- // GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
- GroupInstanceId *string
- // GroupAssignments contains each assignment.
- GroupAssignments []SyncGroupRequestAssignment
- }
- func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
- if err := pe.putString(s.GroupId); err != nil {
- return err
- }
- pe.putInt32(s.GenerationId)
- if err := pe.putString(s.MemberId); err != nil {
- return err
- }
- if s.Version >= 3 {
- if err := pe.putNullableString(s.GroupInstanceId); err != nil {
- return err
- }
- }
- if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
- return err
- }
- for _, block := range s.GroupAssignments {
- if err := block.encode(pe, s.Version); err != nil {
- return err
- }
- }
- return nil
- }
- func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
- s.Version = version
- if s.GroupId, err = pd.getString(); err != nil {
- return err
- }
- if s.GenerationId, err = pd.getInt32(); err != nil {
- return err
- }
- if s.MemberId, err = pd.getString(); err != nil {
- return err
- }
- if s.Version >= 3 {
- if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
- return err
- }
- }
- if numAssignments, err := pd.getArrayLength(); err != nil {
- return err
- } else if numAssignments > 0 {
- s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
- for i := 0; i < numAssignments; i++ {
- var block SyncGroupRequestAssignment
- if err := block.decode(pd, s.Version); err != nil {
- return err
- }
- s.GroupAssignments[i] = block
- }
- }
- return nil
- }
- func (r *SyncGroupRequest) key() int16 {
- return 14
- }
- func (r *SyncGroupRequest) version() int16 {
- return r.Version
- }
- func (r *SyncGroupRequest) headerVersion() int16 {
- return 1
- }
- func (r *SyncGroupRequest) isValidVersion() bool {
- return r.Version >= 0 && r.Version <= 3
- }
- func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
- switch r.Version {
- case 3:
- return V2_3_0_0
- case 2:
- return V2_0_0_0
- case 1:
- return V0_11_0_0
- case 0:
- return V0_9_0_0
- default:
- return V2_3_0_0
- }
- }
- func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
- r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
- MemberId: memberId,
- Assignment: memberAssignment,
- })
- }
- func (r *SyncGroupRequest) AddGroupAssignmentMember(
- memberId string,
- memberAssignment *ConsumerGroupMemberAssignment,
- ) error {
- bin, err := encode(memberAssignment, nil)
- if err != nil {
- return err
- }
- r.AddGroupAssignment(memberId, bin)
- return nil
- }
|