records.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package sarama
  2. import "fmt"
  3. const (
  4. unknownRecords = iota
  5. legacyRecords
  6. defaultRecords
  7. magicOffset = 16
  8. )
  9. // Records implements a union type containing either a RecordBatch or a legacy MessageSet.
  10. type Records struct {
  11. recordsType int
  12. MsgSet *MessageSet
  13. RecordBatch *RecordBatch
  14. }
  15. func newLegacyRecords(msgSet *MessageSet) Records {
  16. return Records{recordsType: legacyRecords, MsgSet: msgSet}
  17. }
  18. func newDefaultRecords(batch *RecordBatch) Records {
  19. return Records{recordsType: defaultRecords, RecordBatch: batch}
  20. }
  21. // setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
  22. // The first return value indicates whether both fields are nil (and the type is not set).
  23. // If both fields are not nil, it returns an error.
  24. func (r *Records) setTypeFromFields() (bool, error) {
  25. if r.MsgSet == nil && r.RecordBatch == nil {
  26. return true, nil
  27. }
  28. if r.MsgSet != nil && r.RecordBatch != nil {
  29. return false, fmt.Errorf("both MsgSet and RecordBatch are set, but record type is unknown")
  30. }
  31. r.recordsType = defaultRecords
  32. if r.MsgSet != nil {
  33. r.recordsType = legacyRecords
  34. }
  35. return false, nil
  36. }
  37. func (r *Records) encode(pe packetEncoder) error {
  38. if r.recordsType == unknownRecords {
  39. if empty, err := r.setTypeFromFields(); err != nil || empty {
  40. return err
  41. }
  42. }
  43. switch r.recordsType {
  44. case legacyRecords:
  45. if r.MsgSet == nil {
  46. return nil
  47. }
  48. return r.MsgSet.encode(pe)
  49. case defaultRecords:
  50. if r.RecordBatch == nil {
  51. return nil
  52. }
  53. return r.RecordBatch.encode(pe)
  54. }
  55. return fmt.Errorf("unknown records type: %v", r.recordsType)
  56. }
  57. func (r *Records) setTypeFromMagic(pd packetDecoder) error {
  58. magic, err := magicValue(pd)
  59. if err != nil {
  60. return err
  61. }
  62. r.recordsType = defaultRecords
  63. if magic < 2 {
  64. r.recordsType = legacyRecords
  65. }
  66. return nil
  67. }
  68. func (r *Records) decode(pd packetDecoder) error {
  69. if r.recordsType == unknownRecords {
  70. if err := r.setTypeFromMagic(pd); err != nil {
  71. return err
  72. }
  73. }
  74. switch r.recordsType {
  75. case legacyRecords:
  76. r.MsgSet = &MessageSet{}
  77. return r.MsgSet.decode(pd)
  78. case defaultRecords:
  79. r.RecordBatch = &RecordBatch{}
  80. return r.RecordBatch.decode(pd)
  81. }
  82. return fmt.Errorf("unknown records type: %v", r.recordsType)
  83. }
  84. func (r *Records) numRecords() (int, error) {
  85. if r.recordsType == unknownRecords {
  86. if empty, err := r.setTypeFromFields(); err != nil || empty {
  87. return 0, err
  88. }
  89. }
  90. switch r.recordsType {
  91. case legacyRecords:
  92. if r.MsgSet == nil {
  93. return 0, nil
  94. }
  95. return len(r.MsgSet.Messages), nil
  96. case defaultRecords:
  97. if r.RecordBatch == nil {
  98. return 0, nil
  99. }
  100. return len(r.RecordBatch.Records), nil
  101. }
  102. return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
  103. }
  104. func (r *Records) isPartial() (bool, error) {
  105. if r.recordsType == unknownRecords {
  106. if empty, err := r.setTypeFromFields(); err != nil || empty {
  107. return false, err
  108. }
  109. }
  110. switch r.recordsType {
  111. case unknownRecords:
  112. return false, nil
  113. case legacyRecords:
  114. if r.MsgSet == nil {
  115. return false, nil
  116. }
  117. return r.MsgSet.PartialTrailingMessage, nil
  118. case defaultRecords:
  119. if r.RecordBatch == nil {
  120. return false, nil
  121. }
  122. return r.RecordBatch.PartialTrailingRecord, nil
  123. }
  124. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  125. }
  126. func (r *Records) isControl() (bool, error) {
  127. if r.recordsType == unknownRecords {
  128. if empty, err := r.setTypeFromFields(); err != nil || empty {
  129. return false, err
  130. }
  131. }
  132. switch r.recordsType {
  133. case legacyRecords:
  134. return false, nil
  135. case defaultRecords:
  136. if r.RecordBatch == nil {
  137. return false, nil
  138. }
  139. return r.RecordBatch.Control, nil
  140. }
  141. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  142. }
  143. func (r *Records) isOverflow() (bool, error) {
  144. if r.recordsType == unknownRecords {
  145. if empty, err := r.setTypeFromFields(); err != nil || empty {
  146. return false, err
  147. }
  148. }
  149. switch r.recordsType {
  150. case unknownRecords:
  151. return false, nil
  152. case legacyRecords:
  153. if r.MsgSet == nil {
  154. return false, nil
  155. }
  156. return r.MsgSet.OverflowMessage, nil
  157. case defaultRecords:
  158. return false, nil
  159. }
  160. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  161. }
  162. func (r *Records) recordsOffset() (*int64, error) {
  163. switch r.recordsType {
  164. case unknownRecords:
  165. return nil, nil
  166. case legacyRecords:
  167. return nil, nil
  168. case defaultRecords:
  169. if r.RecordBatch == nil {
  170. return nil, nil
  171. }
  172. return &r.RecordBatch.FirstOffset, nil
  173. }
  174. return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
  175. }
  176. func magicValue(pd packetDecoder) (int8, error) {
  177. return pd.peekInt8(magicOffset)
  178. }
  179. func (r *Records) getControlRecord() (ControlRecord, error) {
  180. if r.RecordBatch == nil || len(r.RecordBatch.Records) == 0 {
  181. return ControlRecord{}, fmt.Errorf("cannot get control record, record batch is empty")
  182. }
  183. firstRecord := r.RecordBatch.Records[0]
  184. controlRecord := ControlRecord{}
  185. err := controlRecord.decode(&realDecoder{raw: firstRecord.Key}, &realDecoder{raw: firstRecord.Value})
  186. if err != nil {
  187. return ControlRecord{}, err
  188. }
  189. return controlRecord, nil
  190. }