offset_response.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package sarama
  2. import "time"
  3. type OffsetResponseBlock struct {
  4. Err KError
  5. // Offsets contains the result offsets (for V0/V1 compatibility)
  6. Offsets []int64 // Version 0
  7. // Timestamp contains the timestamp associated with the returned offset.
  8. Timestamp int64 // Version 1
  9. // Offset contains the returned offset.
  10. Offset int64 // Version 1
  11. // LeaderEpoch contains the current leader epoch of the partition.
  12. LeaderEpoch int32
  13. }
  14. func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  15. tmp, err := pd.getInt16()
  16. if err != nil {
  17. return err
  18. }
  19. b.Err = KError(tmp)
  20. if version == 0 {
  21. b.Offsets, err = pd.getInt64Array()
  22. return err
  23. }
  24. if version >= 1 {
  25. b.Timestamp, err = pd.getInt64()
  26. if err != nil {
  27. return err
  28. }
  29. b.Offset, err = pd.getInt64()
  30. if err != nil {
  31. return err
  32. }
  33. // For backwards compatibility put the offset in the offsets array too
  34. b.Offsets = []int64{b.Offset}
  35. }
  36. if version >= 4 {
  37. if b.LeaderEpoch, err = pd.getInt32(); err != nil {
  38. return err
  39. }
  40. }
  41. return nil
  42. }
  43. func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  44. pe.putInt16(int16(b.Err))
  45. if version == 0 {
  46. return pe.putInt64Array(b.Offsets)
  47. }
  48. if version >= 1 {
  49. pe.putInt64(b.Timestamp)
  50. pe.putInt64(b.Offset)
  51. }
  52. if version >= 4 {
  53. pe.putInt32(b.LeaderEpoch)
  54. }
  55. return nil
  56. }
  57. type OffsetResponse struct {
  58. Version int16
  59. ThrottleTimeMs int32
  60. Blocks map[string]map[int32]*OffsetResponseBlock
  61. }
  62. func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
  63. if version >= 2 {
  64. r.ThrottleTimeMs, err = pd.getInt32()
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. numTopics, err := pd.getArrayLength()
  70. if err != nil {
  71. return err
  72. }
  73. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  74. for i := 0; i < numTopics; i++ {
  75. name, err := pd.getString()
  76. if err != nil {
  77. return err
  78. }
  79. numBlocks, err := pd.getArrayLength()
  80. if err != nil {
  81. return err
  82. }
  83. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  84. for j := 0; j < numBlocks; j++ {
  85. id, err := pd.getInt32()
  86. if err != nil {
  87. return err
  88. }
  89. block := new(OffsetResponseBlock)
  90. err = block.decode(pd, version)
  91. if err != nil {
  92. return err
  93. }
  94. r.Blocks[name][id] = block
  95. }
  96. }
  97. return nil
  98. }
  99. func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
  100. if r.Blocks == nil {
  101. return nil
  102. }
  103. if r.Blocks[topic] == nil {
  104. return nil
  105. }
  106. return r.Blocks[topic][partition]
  107. }
  108. /*
  109. // [0 0 0 1 ntopics
  110. 0 8 109 121 95 116 111 112 105 99 topic
  111. 0 0 0 1 npartitions
  112. 0 0 0 0 id
  113. 0 0
  114. 0 0 0 1 0 0 0 0
  115. 0 1 1 1 0 0 0 1
  116. 0 8 109 121 95 116 111 112
  117. 105 99 0 0 0 1 0 0
  118. 0 0 0 0 0 0 0 1
  119. 0 0 0 0 0 1 1 1] <nil>
  120. */
  121. func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
  122. if r.Version >= 2 {
  123. pe.putInt32(r.ThrottleTimeMs)
  124. }
  125. if err = pe.putArrayLength(len(r.Blocks)); err != nil {
  126. return err
  127. }
  128. for topic, partitions := range r.Blocks {
  129. if err = pe.putString(topic); err != nil {
  130. return err
  131. }
  132. if err = pe.putArrayLength(len(partitions)); err != nil {
  133. return err
  134. }
  135. for partition, block := range partitions {
  136. pe.putInt32(partition)
  137. if err = block.encode(pe, r.version()); err != nil {
  138. return err
  139. }
  140. }
  141. }
  142. return nil
  143. }
  144. func (r *OffsetResponse) key() int16 {
  145. return 2
  146. }
  147. func (r *OffsetResponse) version() int16 {
  148. return r.Version
  149. }
  150. func (r *OffsetResponse) headerVersion() int16 {
  151. return 0
  152. }
  153. func (r *OffsetResponse) isValidVersion() bool {
  154. return r.Version >= 0 && r.Version <= 4
  155. }
  156. func (r *OffsetResponse) requiredVersion() KafkaVersion {
  157. switch r.Version {
  158. case 4:
  159. return V2_1_0_0
  160. case 3:
  161. return V2_0_0_0
  162. case 2:
  163. return V0_11_0_0
  164. case 1:
  165. return V0_10_1_0
  166. case 0:
  167. return V0_8_2_0
  168. default:
  169. return V2_0_0_0
  170. }
  171. }
  172. func (r *OffsetResponse) throttleTime() time.Duration {
  173. return time.Duration(r.ThrottleTimeMs) * time.Millisecond
  174. }
  175. // testing API
  176. func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
  177. if r.Blocks == nil {
  178. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
  179. }
  180. byTopic, ok := r.Blocks[topic]
  181. if !ok {
  182. byTopic = make(map[int32]*OffsetResponseBlock)
  183. r.Blocks[topic] = byTopic
  184. }
  185. byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
  186. }