record.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "time"
  5. )
  6. const (
  7. isTransactionalMask = 0x10
  8. controlMask = 0x20
  9. maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
  10. )
  11. // RecordHeader stores key and value for a record header
  12. type RecordHeader struct {
  13. Key []byte
  14. Value []byte
  15. }
  16. func (h *RecordHeader) encode(pe packetEncoder) error {
  17. if err := pe.putVarintBytes(h.Key); err != nil {
  18. return err
  19. }
  20. return pe.putVarintBytes(h.Value)
  21. }
  22. func (h *RecordHeader) decode(pd packetDecoder) (err error) {
  23. if h.Key, err = pd.getVarintBytes(); err != nil {
  24. return err
  25. }
  26. if h.Value, err = pd.getVarintBytes(); err != nil {
  27. return err
  28. }
  29. return nil
  30. }
  31. // Record is kafka record type
  32. type Record struct {
  33. Headers []*RecordHeader
  34. Attributes int8
  35. TimestampDelta time.Duration
  36. OffsetDelta int64
  37. Key []byte
  38. Value []byte
  39. length varintLengthField
  40. }
  41. func (r *Record) encode(pe packetEncoder) error {
  42. pe.push(&r.length)
  43. pe.putInt8(r.Attributes)
  44. pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
  45. pe.putVarint(r.OffsetDelta)
  46. if err := pe.putVarintBytes(r.Key); err != nil {
  47. return err
  48. }
  49. if err := pe.putVarintBytes(r.Value); err != nil {
  50. return err
  51. }
  52. pe.putVarint(int64(len(r.Headers)))
  53. for _, h := range r.Headers {
  54. if err := h.encode(pe); err != nil {
  55. return err
  56. }
  57. }
  58. return pe.pop()
  59. }
  60. func (r *Record) decode(pd packetDecoder) (err error) {
  61. if err = pd.push(&r.length); err != nil {
  62. return err
  63. }
  64. if r.Attributes, err = pd.getInt8(); err != nil {
  65. return err
  66. }
  67. timestamp, err := pd.getVarint()
  68. if err != nil {
  69. return err
  70. }
  71. r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
  72. if r.OffsetDelta, err = pd.getVarint(); err != nil {
  73. return err
  74. }
  75. if r.Key, err = pd.getVarintBytes(); err != nil {
  76. return err
  77. }
  78. if r.Value, err = pd.getVarintBytes(); err != nil {
  79. return err
  80. }
  81. numHeaders, err := pd.getVarint()
  82. if err != nil {
  83. return err
  84. }
  85. if numHeaders >= 0 {
  86. r.Headers = make([]*RecordHeader, numHeaders)
  87. }
  88. for i := int64(0); i < numHeaders; i++ {
  89. hdr := new(RecordHeader)
  90. if err := hdr.decode(pd); err != nil {
  91. return err
  92. }
  93. r.Headers[i] = hdr
  94. }
  95. return pd.pop()
  96. }