fetch_response.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. package sarama
  2. import (
  3. "errors"
  4. "sort"
  5. "time"
  6. "github.com/rcrowley/go-metrics"
  7. )
  8. const (
  9. invalidLeaderEpoch = -1
  10. invalidPreferredReplicaID = -1
  11. )
  12. type AbortedTransaction struct {
  13. // ProducerID contains the producer id associated with the aborted transaction.
  14. ProducerID int64
  15. // FirstOffset contains the first offset in the aborted transaction.
  16. FirstOffset int64
  17. }
  18. func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
  19. if t.ProducerID, err = pd.getInt64(); err != nil {
  20. return err
  21. }
  22. if t.FirstOffset, err = pd.getInt64(); err != nil {
  23. return err
  24. }
  25. return nil
  26. }
  27. func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
  28. pe.putInt64(t.ProducerID)
  29. pe.putInt64(t.FirstOffset)
  30. return nil
  31. }
  32. type FetchResponseBlock struct {
  33. // Err contains the error code, or 0 if there was no fetch error.
  34. Err KError
  35. // HighWatermarkOffset contains the current high water mark.
  36. HighWaterMarkOffset int64
  37. // LastStableOffset contains the last stable offset (or LSO) of the
  38. // partition. This is the last offset such that the state of all
  39. // transactional records prior to this offset have been decided (ABORTED or
  40. // COMMITTED)
  41. LastStableOffset int64
  42. LastRecordsBatchOffset *int64
  43. // LogStartOffset contains the current log start offset.
  44. LogStartOffset int64
  45. // AbortedTransactions contains the aborted transactions.
  46. AbortedTransactions []*AbortedTransaction
  47. // PreferredReadReplica contains the preferred read replica for the
  48. // consumer to use on its next fetch request
  49. PreferredReadReplica int32
  50. // RecordsSet contains the record data.
  51. RecordsSet []*Records
  52. Partial bool
  53. Records *Records // deprecated: use FetchResponseBlock.RecordsSet
  54. }
  55. func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  56. metricRegistry := pd.metricRegistry()
  57. var sizeMetric metrics.Histogram
  58. if metricRegistry != nil {
  59. sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
  60. }
  61. tmp, err := pd.getInt16()
  62. if err != nil {
  63. return err
  64. }
  65. b.Err = KError(tmp)
  66. b.HighWaterMarkOffset, err = pd.getInt64()
  67. if err != nil {
  68. return err
  69. }
  70. if version >= 4 {
  71. b.LastStableOffset, err = pd.getInt64()
  72. if err != nil {
  73. return err
  74. }
  75. if version >= 5 {
  76. b.LogStartOffset, err = pd.getInt64()
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. numTransact, err := pd.getArrayLength()
  82. if err != nil {
  83. return err
  84. }
  85. if numTransact >= 0 {
  86. b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
  87. }
  88. for i := 0; i < numTransact; i++ {
  89. transact := new(AbortedTransaction)
  90. if err = transact.decode(pd); err != nil {
  91. return err
  92. }
  93. b.AbortedTransactions[i] = transact
  94. }
  95. }
  96. if version >= 11 {
  97. b.PreferredReadReplica, err = pd.getInt32()
  98. if err != nil {
  99. return err
  100. }
  101. } else {
  102. b.PreferredReadReplica = -1
  103. }
  104. recordsSize, err := pd.getInt32()
  105. if err != nil {
  106. return err
  107. }
  108. if sizeMetric != nil {
  109. sizeMetric.Update(int64(recordsSize))
  110. }
  111. recordsDecoder, err := pd.getSubset(int(recordsSize))
  112. if err != nil {
  113. return err
  114. }
  115. b.RecordsSet = []*Records{}
  116. for recordsDecoder.remaining() > 0 {
  117. records := &Records{}
  118. if err := records.decode(recordsDecoder); err != nil {
  119. // If we have at least one decoded records, this is not an error
  120. if errors.Is(err, ErrInsufficientData) {
  121. if len(b.RecordsSet) == 0 {
  122. b.Partial = true
  123. }
  124. break
  125. }
  126. return err
  127. }
  128. b.LastRecordsBatchOffset, err = records.recordsOffset()
  129. if err != nil {
  130. return err
  131. }
  132. partial, err := records.isPartial()
  133. if err != nil {
  134. return err
  135. }
  136. n, err := records.numRecords()
  137. if err != nil {
  138. return err
  139. }
  140. if n > 0 || (partial && len(b.RecordsSet) == 0) {
  141. b.RecordsSet = append(b.RecordsSet, records)
  142. if b.Records == nil {
  143. b.Records = records
  144. }
  145. }
  146. overflow, err := records.isOverflow()
  147. if err != nil {
  148. return err
  149. }
  150. if partial || overflow {
  151. break
  152. }
  153. }
  154. return nil
  155. }
  156. func (b *FetchResponseBlock) numRecords() (int, error) {
  157. sum := 0
  158. for _, records := range b.RecordsSet {
  159. count, err := records.numRecords()
  160. if err != nil {
  161. return 0, err
  162. }
  163. sum += count
  164. }
  165. return sum, nil
  166. }
  167. func (b *FetchResponseBlock) isPartial() (bool, error) {
  168. if b.Partial {
  169. return true, nil
  170. }
  171. if len(b.RecordsSet) == 1 {
  172. return b.RecordsSet[0].isPartial()
  173. }
  174. return false, nil
  175. }
  176. func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  177. pe.putInt16(int16(b.Err))
  178. pe.putInt64(b.HighWaterMarkOffset)
  179. if version >= 4 {
  180. pe.putInt64(b.LastStableOffset)
  181. if version >= 5 {
  182. pe.putInt64(b.LogStartOffset)
  183. }
  184. if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
  185. return err
  186. }
  187. for _, transact := range b.AbortedTransactions {
  188. if err = transact.encode(pe); err != nil {
  189. return err
  190. }
  191. }
  192. }
  193. if version >= 11 {
  194. pe.putInt32(b.PreferredReadReplica)
  195. }
  196. pe.push(&lengthField{})
  197. for _, records := range b.RecordsSet {
  198. err = records.encode(pe)
  199. if err != nil {
  200. return err
  201. }
  202. }
  203. return pe.pop()
  204. }
  205. func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
  206. // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
  207. // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
  208. at := b.AbortedTransactions
  209. sort.Slice(
  210. at,
  211. func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
  212. )
  213. return at
  214. }
  215. type FetchResponse struct {
  216. // Version defines the protocol version to use for encode and decode
  217. Version int16
  218. // ThrottleTime contains the duration in milliseconds for which the request
  219. // was throttled due to a quota violation, or zero if the request did not
  220. // violate any quota.
  221. ThrottleTime time.Duration
  222. // ErrorCode contains the top level response error code.
  223. ErrorCode int16
  224. // SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
  225. SessionID int32
  226. // Blocks contains the response topics.
  227. Blocks map[string]map[int32]*FetchResponseBlock
  228. LogAppendTime bool
  229. Timestamp time.Time
  230. }
  231. func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
  232. r.Version = version
  233. if r.Version >= 1 {
  234. throttle, err := pd.getInt32()
  235. if err != nil {
  236. return err
  237. }
  238. r.ThrottleTime = time.Duration(throttle) * time.Millisecond
  239. }
  240. if r.Version >= 7 {
  241. r.ErrorCode, err = pd.getInt16()
  242. if err != nil {
  243. return err
  244. }
  245. r.SessionID, err = pd.getInt32()
  246. if err != nil {
  247. return err
  248. }
  249. }
  250. numTopics, err := pd.getArrayLength()
  251. if err != nil {
  252. return err
  253. }
  254. r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  255. for i := 0; i < numTopics; i++ {
  256. name, err := pd.getString()
  257. if err != nil {
  258. return err
  259. }
  260. numBlocks, err := pd.getArrayLength()
  261. if err != nil {
  262. return err
  263. }
  264. r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  265. for j := 0; j < numBlocks; j++ {
  266. id, err := pd.getInt32()
  267. if err != nil {
  268. return err
  269. }
  270. block := new(FetchResponseBlock)
  271. err = block.decode(pd, version)
  272. if err != nil {
  273. return err
  274. }
  275. r.Blocks[name][id] = block
  276. }
  277. }
  278. return nil
  279. }
  280. func (r *FetchResponse) encode(pe packetEncoder) (err error) {
  281. if r.Version >= 1 {
  282. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  283. }
  284. if r.Version >= 7 {
  285. pe.putInt16(r.ErrorCode)
  286. pe.putInt32(r.SessionID)
  287. }
  288. err = pe.putArrayLength(len(r.Blocks))
  289. if err != nil {
  290. return err
  291. }
  292. for topic, partitions := range r.Blocks {
  293. err = pe.putString(topic)
  294. if err != nil {
  295. return err
  296. }
  297. err = pe.putArrayLength(len(partitions))
  298. if err != nil {
  299. return err
  300. }
  301. for id, block := range partitions {
  302. pe.putInt32(id)
  303. err = block.encode(pe, r.Version)
  304. if err != nil {
  305. return err
  306. }
  307. }
  308. }
  309. return nil
  310. }
  311. func (r *FetchResponse) key() int16 {
  312. return 1
  313. }
  314. func (r *FetchResponse) version() int16 {
  315. return r.Version
  316. }
  317. func (r *FetchResponse) headerVersion() int16 {
  318. return 0
  319. }
  320. func (r *FetchResponse) isValidVersion() bool {
  321. return r.Version >= 0 && r.Version <= 11
  322. }
  323. func (r *FetchResponse) requiredVersion() KafkaVersion {
  324. switch r.Version {
  325. case 11:
  326. return V2_3_0_0
  327. case 9, 10:
  328. return V2_1_0_0
  329. case 8:
  330. return V2_0_0_0
  331. case 7:
  332. return V1_1_0_0
  333. case 6:
  334. return V1_0_0_0
  335. case 4, 5:
  336. return V0_11_0_0
  337. case 3:
  338. return V0_10_1_0
  339. case 2:
  340. return V0_10_0_0
  341. case 1:
  342. return V0_9_0_0
  343. case 0:
  344. return V0_8_2_0
  345. default:
  346. return V2_3_0_0
  347. }
  348. }
  349. func (r *FetchResponse) throttleTime() time.Duration {
  350. return r.ThrottleTime
  351. }
  352. func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  353. if r.Blocks == nil {
  354. return nil
  355. }
  356. if r.Blocks[topic] == nil {
  357. return nil
  358. }
  359. return r.Blocks[topic][partition]
  360. }
  361. func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
  362. if r.Blocks == nil {
  363. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  364. }
  365. partitions, ok := r.Blocks[topic]
  366. if !ok {
  367. partitions = make(map[int32]*FetchResponseBlock)
  368. r.Blocks[topic] = partitions
  369. }
  370. frb, ok := partitions[partition]
  371. if !ok {
  372. frb = new(FetchResponseBlock)
  373. partitions[partition] = frb
  374. }
  375. frb.Err = err
  376. }
  377. func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
  378. if r.Blocks == nil {
  379. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  380. }
  381. partitions, ok := r.Blocks[topic]
  382. if !ok {
  383. partitions = make(map[int32]*FetchResponseBlock)
  384. r.Blocks[topic] = partitions
  385. }
  386. frb, ok := partitions[partition]
  387. if !ok {
  388. frb = new(FetchResponseBlock)
  389. partitions[partition] = frb
  390. }
  391. return frb
  392. }
  393. func encodeKV(key, value Encoder) ([]byte, []byte) {
  394. var kb []byte
  395. var vb []byte
  396. if key != nil {
  397. kb, _ = key.Encode()
  398. }
  399. if value != nil {
  400. vb, _ = value.Encode()
  401. }
  402. return kb, vb
  403. }
  404. func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
  405. frb := r.getOrCreateBlock(topic, partition)
  406. kb, vb := encodeKV(key, value)
  407. if r.LogAppendTime {
  408. timestamp = r.Timestamp
  409. }
  410. msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
  411. msgBlock := &MessageBlock{Msg: msg, Offset: offset}
  412. if len(frb.RecordsSet) == 0 {
  413. records := newLegacyRecords(&MessageSet{})
  414. frb.RecordsSet = []*Records{&records}
  415. }
  416. set := frb.RecordsSet[0].MsgSet
  417. set.Messages = append(set.Messages, msgBlock)
  418. }
  419. func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
  420. frb := r.getOrCreateBlock(topic, partition)
  421. kb, vb := encodeKV(key, value)
  422. if len(frb.RecordsSet) == 0 {
  423. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  424. frb.RecordsSet = []*Records{&records}
  425. }
  426. batch := frb.RecordsSet[0].RecordBatch
  427. rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  428. batch.addRecord(rec)
  429. }
  430. // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
  431. // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
  432. // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
  433. func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
  434. frb := r.getOrCreateBlock(topic, partition)
  435. kb, vb := encodeKV(key, value)
  436. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  437. batch := &RecordBatch{
  438. Version: 2,
  439. LogAppendTime: r.LogAppendTime,
  440. FirstTimestamp: timestamp,
  441. MaxTimestamp: r.Timestamp,
  442. FirstOffset: offset,
  443. LastOffsetDelta: 0,
  444. ProducerID: producerID,
  445. IsTransactional: isTransactional,
  446. }
  447. rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  448. batch.addRecord(rec)
  449. records.RecordBatch = batch
  450. frb.RecordsSet = append(frb.RecordsSet, &records)
  451. }
  452. func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
  453. frb := r.getOrCreateBlock(topic, partition)
  454. // batch
  455. batch := &RecordBatch{
  456. Version: 2,
  457. LogAppendTime: r.LogAppendTime,
  458. FirstTimestamp: timestamp,
  459. MaxTimestamp: r.Timestamp,
  460. FirstOffset: offset,
  461. LastOffsetDelta: 0,
  462. ProducerID: producerID,
  463. IsTransactional: true,
  464. Control: true,
  465. }
  466. // records
  467. records := newDefaultRecords(nil)
  468. records.RecordBatch = batch
  469. // record
  470. crAbort := ControlRecord{
  471. Version: 0,
  472. Type: recordType,
  473. }
  474. crKey := &realEncoder{raw: make([]byte, 4)}
  475. crValue := &realEncoder{raw: make([]byte, 6)}
  476. crAbort.encode(crKey, crValue)
  477. rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  478. batch.addRecord(rec)
  479. frb.RecordsSet = append(frb.RecordsSet, &records)
  480. }
  481. func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
  482. r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
  483. }
  484. func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
  485. r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
  486. }
  487. func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
  488. r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
  489. }
  490. func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
  491. // define controlRecord key and value
  492. r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
  493. }
  494. func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
  495. frb := r.getOrCreateBlock(topic, partition)
  496. if len(frb.RecordsSet) == 0 {
  497. records := newDefaultRecords(&RecordBatch{Version: 2})
  498. frb.RecordsSet = []*Records{&records}
  499. }
  500. batch := frb.RecordsSet[0].RecordBatch
  501. batch.LastOffsetDelta = offset
  502. }
  503. func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
  504. frb := r.getOrCreateBlock(topic, partition)
  505. frb.LastStableOffset = offset
  506. }