message_set.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package sarama
  2. import "errors"
  3. type MessageBlock struct {
  4. Offset int64
  5. Msg *Message
  6. }
  7. // Messages convenience helper which returns either all the
  8. // messages that are wrapped in this block
  9. func (msb *MessageBlock) Messages() []*MessageBlock {
  10. if msb.Msg.Set != nil {
  11. return msb.Msg.Set.Messages
  12. }
  13. return []*MessageBlock{msb}
  14. }
  15. func (msb *MessageBlock) encode(pe packetEncoder) error {
  16. pe.putInt64(msb.Offset)
  17. pe.push(&lengthField{})
  18. err := msb.Msg.encode(pe)
  19. if err != nil {
  20. return err
  21. }
  22. return pe.pop()
  23. }
  24. func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
  25. if msb.Offset, err = pd.getInt64(); err != nil {
  26. return err
  27. }
  28. lengthDecoder := acquireLengthField()
  29. defer releaseLengthField(lengthDecoder)
  30. if err = pd.push(lengthDecoder); err != nil {
  31. return err
  32. }
  33. msb.Msg = new(Message)
  34. if err = msb.Msg.decode(pd); err != nil {
  35. return err
  36. }
  37. if err = pd.pop(); err != nil {
  38. return err
  39. }
  40. return nil
  41. }
  42. type MessageSet struct {
  43. PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
  44. OverflowMessage bool // whether the set on the wire contained an overflow message
  45. Messages []*MessageBlock
  46. }
  47. func (ms *MessageSet) encode(pe packetEncoder) error {
  48. for i := range ms.Messages {
  49. err := ms.Messages[i].encode(pe)
  50. if err != nil {
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. func (ms *MessageSet) decode(pd packetDecoder) (err error) {
  57. ms.Messages = nil
  58. for pd.remaining() > 0 {
  59. magic, err := magicValue(pd)
  60. if err != nil {
  61. if errors.Is(err, ErrInsufficientData) {
  62. ms.PartialTrailingMessage = true
  63. return nil
  64. }
  65. return err
  66. }
  67. if magic > 1 {
  68. return nil
  69. }
  70. msb := new(MessageBlock)
  71. err = msb.decode(pd)
  72. if err == nil {
  73. ms.Messages = append(ms.Messages, msb)
  74. } else if errors.Is(err, ErrInsufficientData) {
  75. // As an optimization the server is allowed to return a partial message at the
  76. // end of the message set. Clients should handle this case. So we just ignore such things.
  77. if msb.Offset == -1 {
  78. // This is an overflow message caused by chunked down conversion
  79. ms.OverflowMessage = true
  80. } else {
  81. ms.PartialTrailingMessage = true
  82. }
  83. return nil
  84. } else {
  85. return err
  86. }
  87. }
  88. return nil
  89. }
  90. func (ms *MessageSet) addMessage(msg *Message) {
  91. block := new(MessageBlock)
  92. block.Msg = msg
  93. ms.Messages = append(ms.Messages, block)
  94. }