produce_request.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package sarama
  2. import "github.com/rcrowley/go-metrics"
  3. // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
  4. // it must see before responding. Any of the constants defined here are valid. On broker versions
  5. // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
  6. // acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
  7. // by setting the `min.isr` value in the brokers configuration).
  8. type RequiredAcks int16
  9. const (
  10. // NoResponse doesn't send any response, the TCP ACK is all you get.
  11. NoResponse RequiredAcks = 0
  12. // WaitForLocal waits for only the local commit to succeed before responding.
  13. WaitForLocal RequiredAcks = 1
  14. // WaitForAll waits for all in-sync replicas to commit before responding.
  15. // The minimum number of in-sync replicas is configured on the broker via
  16. // the `min.insync.replicas` configuration key.
  17. WaitForAll RequiredAcks = -1
  18. )
  19. type ProduceRequest struct {
  20. TransactionalID *string
  21. RequiredAcks RequiredAcks
  22. Timeout int32
  23. Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
  24. records map[string]map[int32]Records
  25. }
  26. func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
  27. topicCompressionRatioMetric metrics.Histogram,
  28. ) int64 {
  29. var topicRecordCount int64
  30. for _, messageBlock := range msgSet.Messages {
  31. // Is this a fake "message" wrapping real messages?
  32. if messageBlock.Msg.Set != nil {
  33. topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
  34. } else {
  35. // A single uncompressed message
  36. topicRecordCount++
  37. }
  38. // Better be safe than sorry when computing the compression ratio
  39. if messageBlock.Msg.compressedSize != 0 {
  40. compressionRatio := float64(len(messageBlock.Msg.Value)) /
  41. float64(messageBlock.Msg.compressedSize)
  42. // Histogram do not support decimal values, let's multiple it by 100 for better precision
  43. intCompressionRatio := int64(100 * compressionRatio)
  44. compressionRatioMetric.Update(intCompressionRatio)
  45. topicCompressionRatioMetric.Update(intCompressionRatio)
  46. }
  47. }
  48. return topicRecordCount
  49. }
  50. func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
  51. topicCompressionRatioMetric metrics.Histogram,
  52. ) int64 {
  53. if recordBatch.compressedRecords != nil {
  54. compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
  55. compressionRatioMetric.Update(compressionRatio)
  56. topicCompressionRatioMetric.Update(compressionRatio)
  57. }
  58. return int64(len(recordBatch.Records))
  59. }
  60. func (r *ProduceRequest) encode(pe packetEncoder) error {
  61. if r.Version >= 3 {
  62. if err := pe.putNullableString(r.TransactionalID); err != nil {
  63. return err
  64. }
  65. }
  66. pe.putInt16(int16(r.RequiredAcks))
  67. pe.putInt32(r.Timeout)
  68. metricRegistry := pe.metricRegistry()
  69. var batchSizeMetric metrics.Histogram
  70. var compressionRatioMetric metrics.Histogram
  71. if metricRegistry != nil {
  72. batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
  73. compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
  74. }
  75. totalRecordCount := int64(0)
  76. err := pe.putArrayLength(len(r.records))
  77. if err != nil {
  78. return err
  79. }
  80. for topic, partitions := range r.records {
  81. err = pe.putString(topic)
  82. if err != nil {
  83. return err
  84. }
  85. err = pe.putArrayLength(len(partitions))
  86. if err != nil {
  87. return err
  88. }
  89. topicRecordCount := int64(0)
  90. var topicCompressionRatioMetric metrics.Histogram
  91. if metricRegistry != nil {
  92. topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
  93. }
  94. for id, records := range partitions {
  95. startOffset := pe.offset()
  96. pe.putInt32(id)
  97. pe.push(&lengthField{})
  98. err = records.encode(pe)
  99. if err != nil {
  100. return err
  101. }
  102. err = pe.pop()
  103. if err != nil {
  104. return err
  105. }
  106. if metricRegistry != nil {
  107. if r.Version >= 3 {
  108. topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
  109. } else {
  110. topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
  111. }
  112. batchSize := int64(pe.offset() - startOffset)
  113. batchSizeMetric.Update(batchSize)
  114. getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
  115. }
  116. }
  117. if topicRecordCount > 0 {
  118. getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
  119. getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
  120. totalRecordCount += topicRecordCount
  121. }
  122. }
  123. if totalRecordCount > 0 {
  124. metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
  125. getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
  126. }
  127. return nil
  128. }
  129. func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
  130. r.Version = version
  131. if version >= 3 {
  132. id, err := pd.getNullableString()
  133. if err != nil {
  134. return err
  135. }
  136. r.TransactionalID = id
  137. }
  138. requiredAcks, err := pd.getInt16()
  139. if err != nil {
  140. return err
  141. }
  142. r.RequiredAcks = RequiredAcks(requiredAcks)
  143. if r.Timeout, err = pd.getInt32(); err != nil {
  144. return err
  145. }
  146. topicCount, err := pd.getArrayLength()
  147. if err != nil {
  148. return err
  149. }
  150. if topicCount == 0 {
  151. return nil
  152. }
  153. r.records = make(map[string]map[int32]Records)
  154. for i := 0; i < topicCount; i++ {
  155. topic, err := pd.getString()
  156. if err != nil {
  157. return err
  158. }
  159. partitionCount, err := pd.getArrayLength()
  160. if err != nil {
  161. return err
  162. }
  163. r.records[topic] = make(map[int32]Records)
  164. for j := 0; j < partitionCount; j++ {
  165. partition, err := pd.getInt32()
  166. if err != nil {
  167. return err
  168. }
  169. size, err := pd.getInt32()
  170. if err != nil {
  171. return err
  172. }
  173. recordsDecoder, err := pd.getSubset(int(size))
  174. if err != nil {
  175. return err
  176. }
  177. var records Records
  178. if err := records.decode(recordsDecoder); err != nil {
  179. return err
  180. }
  181. r.records[topic][partition] = records
  182. }
  183. }
  184. return nil
  185. }
  186. func (r *ProduceRequest) key() int16 {
  187. return 0
  188. }
  189. func (r *ProduceRequest) version() int16 {
  190. return r.Version
  191. }
  192. func (r *ProduceRequest) headerVersion() int16 {
  193. return 1
  194. }
  195. func (r *ProduceRequest) isValidVersion() bool {
  196. return r.Version >= 0 && r.Version <= 7
  197. }
  198. func (r *ProduceRequest) requiredVersion() KafkaVersion {
  199. switch r.Version {
  200. case 7:
  201. return V2_1_0_0
  202. case 6:
  203. return V2_0_0_0
  204. case 4, 5:
  205. return V1_0_0_0
  206. case 3:
  207. return V0_11_0_0
  208. case 2:
  209. return V0_10_0_0
  210. case 1:
  211. return V0_9_0_0
  212. case 0:
  213. return V0_8_2_0
  214. default:
  215. return V2_1_0_0
  216. }
  217. }
  218. func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
  219. if r.records == nil {
  220. r.records = make(map[string]map[int32]Records)
  221. }
  222. if r.records[topic] == nil {
  223. r.records[topic] = make(map[int32]Records)
  224. }
  225. }
  226. func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  227. r.ensureRecords(topic, partition)
  228. set := r.records[topic][partition].MsgSet
  229. if set == nil {
  230. set = new(MessageSet)
  231. r.records[topic][partition] = newLegacyRecords(set)
  232. }
  233. set.addMessage(msg)
  234. }
  235. func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
  236. r.ensureRecords(topic, partition)
  237. r.records[topic][partition] = newLegacyRecords(set)
  238. }
  239. func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
  240. r.ensureRecords(topic, partition)
  241. r.records[topic][partition] = newDefaultRecords(batch)
  242. }