alter_partition_reassignments_request.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package sarama
  2. type alterPartitionReassignmentsBlock struct {
  3. replicas []int32
  4. }
  5. func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error {
  6. if err := pe.putNullableCompactInt32Array(b.replicas); err != nil {
  7. return err
  8. }
  9. pe.putEmptyTaggedFieldArray()
  10. return nil
  11. }
  12. func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder) (err error) {
  13. if b.replicas, err = pd.getCompactInt32Array(); err != nil {
  14. return err
  15. }
  16. return nil
  17. }
  18. type AlterPartitionReassignmentsRequest struct {
  19. TimeoutMs int32
  20. blocks map[string]map[int32]*alterPartitionReassignmentsBlock
  21. Version int16
  22. }
  23. func (r *AlterPartitionReassignmentsRequest) encode(pe packetEncoder) error {
  24. pe.putInt32(r.TimeoutMs)
  25. pe.putCompactArrayLength(len(r.blocks))
  26. for topic, partitions := range r.blocks {
  27. if err := pe.putCompactString(topic); err != nil {
  28. return err
  29. }
  30. pe.putCompactArrayLength(len(partitions))
  31. for partition, block := range partitions {
  32. pe.putInt32(partition)
  33. if err := block.encode(pe); err != nil {
  34. return err
  35. }
  36. }
  37. pe.putEmptyTaggedFieldArray()
  38. }
  39. pe.putEmptyTaggedFieldArray()
  40. return nil
  41. }
  42. func (r *AlterPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) {
  43. r.Version = version
  44. if r.TimeoutMs, err = pd.getInt32(); err != nil {
  45. return err
  46. }
  47. topicCount, err := pd.getCompactArrayLength()
  48. if err != nil {
  49. return err
  50. }
  51. if topicCount > 0 {
  52. r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
  53. for i := 0; i < topicCount; i++ {
  54. topic, err := pd.getCompactString()
  55. if err != nil {
  56. return err
  57. }
  58. partitionCount, err := pd.getCompactArrayLength()
  59. if err != nil {
  60. return err
  61. }
  62. r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
  63. for j := 0; j < partitionCount; j++ {
  64. partition, err := pd.getInt32()
  65. if err != nil {
  66. return err
  67. }
  68. block := &alterPartitionReassignmentsBlock{}
  69. if err := block.decode(pd); err != nil {
  70. return err
  71. }
  72. r.blocks[topic][partition] = block
  73. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  74. return err
  75. }
  76. }
  77. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  78. return err
  79. }
  80. }
  81. }
  82. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  83. return err
  84. }
  85. return
  86. }
  87. func (r *AlterPartitionReassignmentsRequest) key() int16 {
  88. return 45
  89. }
  90. func (r *AlterPartitionReassignmentsRequest) version() int16 {
  91. return r.Version
  92. }
  93. func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
  94. return 2
  95. }
  96. func (r *AlterPartitionReassignmentsRequest) isValidVersion() bool {
  97. return r.Version == 0
  98. }
  99. func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
  100. return V2_4_0_0
  101. }
  102. func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32) {
  103. if r.blocks == nil {
  104. r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
  105. }
  106. if r.blocks[topic] == nil {
  107. r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
  108. }
  109. r.blocks[topic][partitionID] = &alterPartitionReassignmentsBlock{replicas}
  110. }