produce_response.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // Protocol, http://kafka.apache.org/protocol.html
  7. // v1
  8. // v2 = v3 = v4
  9. // v5 = v6 = v7
  10. // Produce Response (Version: 7) => [responses] throttle_time_ms
  11. // responses => topic [partition_responses]
  12. // topic => STRING
  13. // partition_responses => partition error_code base_offset log_append_time log_start_offset
  14. // partition => INT32
  15. // error_code => INT16
  16. // base_offset => INT64
  17. // log_append_time => INT64
  18. // log_start_offset => INT64
  19. // throttle_time_ms => INT32
  20. // partition_responses in protocol
  21. type ProduceResponseBlock struct {
  22. Err KError // v0, error_code
  23. Offset int64 // v0, base_offset
  24. Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
  25. StartOffset int64 // v5, log_start_offset
  26. }
  27. func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  28. tmp, err := pd.getInt16()
  29. if err != nil {
  30. return err
  31. }
  32. b.Err = KError(tmp)
  33. b.Offset, err = pd.getInt64()
  34. if err != nil {
  35. return err
  36. }
  37. if version >= 2 {
  38. if millis, err := pd.getInt64(); err != nil {
  39. return err
  40. } else if millis != -1 {
  41. b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
  42. }
  43. }
  44. if version >= 5 {
  45. b.StartOffset, err = pd.getInt64()
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. return nil
  51. }
  52. func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  53. pe.putInt16(int16(b.Err))
  54. pe.putInt64(b.Offset)
  55. if version >= 2 {
  56. timestamp := int64(-1)
  57. if !b.Timestamp.Before(time.Unix(0, 0)) {
  58. timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
  59. } else if !b.Timestamp.IsZero() {
  60. return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
  61. }
  62. pe.putInt64(timestamp)
  63. }
  64. if version >= 5 {
  65. pe.putInt64(b.StartOffset)
  66. }
  67. return nil
  68. }
  69. type ProduceResponse struct {
  70. Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
  71. Version int16
  72. ThrottleTime time.Duration // v1, throttle_time_ms
  73. }
  74. func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
  75. r.Version = version
  76. numTopics, err := pd.getArrayLength()
  77. if err != nil {
  78. return err
  79. }
  80. r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
  81. for i := 0; i < numTopics; i++ {
  82. name, err := pd.getString()
  83. if err != nil {
  84. return err
  85. }
  86. numBlocks, err := pd.getArrayLength()
  87. if err != nil {
  88. return err
  89. }
  90. r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
  91. for j := 0; j < numBlocks; j++ {
  92. id, err := pd.getInt32()
  93. if err != nil {
  94. return err
  95. }
  96. block := new(ProduceResponseBlock)
  97. err = block.decode(pd, version)
  98. if err != nil {
  99. return err
  100. }
  101. r.Blocks[name][id] = block
  102. }
  103. }
  104. if r.Version >= 1 {
  105. millis, err := pd.getInt32()
  106. if err != nil {
  107. return err
  108. }
  109. r.ThrottleTime = time.Duration(millis) * time.Millisecond
  110. }
  111. return nil
  112. }
  113. func (r *ProduceResponse) encode(pe packetEncoder) error {
  114. err := pe.putArrayLength(len(r.Blocks))
  115. if err != nil {
  116. return err
  117. }
  118. for topic, partitions := range r.Blocks {
  119. err = pe.putString(topic)
  120. if err != nil {
  121. return err
  122. }
  123. err = pe.putArrayLength(len(partitions))
  124. if err != nil {
  125. return err
  126. }
  127. for id, prb := range partitions {
  128. pe.putInt32(id)
  129. err = prb.encode(pe, r.Version)
  130. if err != nil {
  131. return err
  132. }
  133. }
  134. }
  135. if r.Version >= 1 {
  136. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  137. }
  138. return nil
  139. }
  140. func (r *ProduceResponse) key() int16 {
  141. return 0
  142. }
  143. func (r *ProduceResponse) version() int16 {
  144. return r.Version
  145. }
  146. func (r *ProduceResponse) headerVersion() int16 {
  147. return 0
  148. }
  149. func (r *ProduceResponse) isValidVersion() bool {
  150. return r.Version >= 0 && r.Version <= 7
  151. }
  152. func (r *ProduceResponse) requiredVersion() KafkaVersion {
  153. switch r.Version {
  154. case 7:
  155. return V2_1_0_0
  156. case 6:
  157. return V2_0_0_0
  158. case 4, 5:
  159. return V1_0_0_0
  160. case 3:
  161. return V0_11_0_0
  162. case 2:
  163. return V0_10_0_0
  164. case 1:
  165. return V0_9_0_0
  166. case 0:
  167. return V0_8_2_0
  168. default:
  169. return V2_1_0_0
  170. }
  171. }
  172. func (r *ProduceResponse) throttleTime() time.Duration {
  173. return r.ThrottleTime
  174. }
  175. func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
  176. if r.Blocks == nil {
  177. return nil
  178. }
  179. if r.Blocks[topic] == nil {
  180. return nil
  181. }
  182. return r.Blocks[topic][partition]
  183. }
  184. // Testing API
  185. func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
  186. if r.Blocks == nil {
  187. r.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  188. }
  189. byTopic, ok := r.Blocks[topic]
  190. if !ok {
  191. byTopic = make(map[int32]*ProduceResponseBlock)
  192. r.Blocks[topic] = byTopic
  193. }
  194. block := &ProduceResponseBlock{
  195. Err: err,
  196. }
  197. if r.Version >= 2 {
  198. block.Timestamp = time.Now()
  199. }
  200. byTopic[partition] = block
  201. }