prep_encoder.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "github.com/rcrowley/go-metrics"
  8. )
  9. type prepEncoder struct {
  10. stack []pushEncoder
  11. length int
  12. }
  13. // primitives
  14. func (pe *prepEncoder) putInt8(in int8) {
  15. pe.length++
  16. }
  17. func (pe *prepEncoder) putInt16(in int16) {
  18. pe.length += 2
  19. }
  20. func (pe *prepEncoder) putInt32(in int32) {
  21. pe.length += 4
  22. }
  23. func (pe *prepEncoder) putInt64(in int64) {
  24. pe.length += 8
  25. }
  26. func (pe *prepEncoder) putVarint(in int64) {
  27. var buf [binary.MaxVarintLen64]byte
  28. pe.length += binary.PutVarint(buf[:], in)
  29. }
  30. func (pe *prepEncoder) putUVarint(in uint64) {
  31. var buf [binary.MaxVarintLen64]byte
  32. pe.length += binary.PutUvarint(buf[:], in)
  33. }
  34. func (pe *prepEncoder) putFloat64(in float64) {
  35. pe.length += 8
  36. }
  37. func (pe *prepEncoder) putArrayLength(in int) error {
  38. if in > math.MaxInt32 {
  39. return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
  40. }
  41. pe.length += 4
  42. return nil
  43. }
  44. func (pe *prepEncoder) putCompactArrayLength(in int) {
  45. pe.putUVarint(uint64(in + 1))
  46. }
  47. func (pe *prepEncoder) putBool(in bool) {
  48. pe.length++
  49. }
  50. // arrays
  51. func (pe *prepEncoder) putBytes(in []byte) error {
  52. pe.length += 4
  53. if in == nil {
  54. return nil
  55. }
  56. return pe.putRawBytes(in)
  57. }
  58. func (pe *prepEncoder) putVarintBytes(in []byte) error {
  59. if in == nil {
  60. pe.putVarint(-1)
  61. return nil
  62. }
  63. pe.putVarint(int64(len(in)))
  64. return pe.putRawBytes(in)
  65. }
  66. func (pe *prepEncoder) putCompactBytes(in []byte) error {
  67. pe.putUVarint(uint64(len(in) + 1))
  68. return pe.putRawBytes(in)
  69. }
  70. func (pe *prepEncoder) putCompactString(in string) error {
  71. pe.putCompactArrayLength(len(in))
  72. return pe.putRawBytes([]byte(in))
  73. }
  74. func (pe *prepEncoder) putNullableCompactString(in *string) error {
  75. if in == nil {
  76. pe.putUVarint(0)
  77. return nil
  78. } else {
  79. return pe.putCompactString(*in)
  80. }
  81. }
  82. func (pe *prepEncoder) putRawBytes(in []byte) error {
  83. if len(in) > math.MaxInt32 {
  84. return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
  85. }
  86. pe.length += len(in)
  87. return nil
  88. }
  89. func (pe *prepEncoder) putNullableString(in *string) error {
  90. if in == nil {
  91. pe.length += 2
  92. return nil
  93. }
  94. return pe.putString(*in)
  95. }
  96. func (pe *prepEncoder) putString(in string) error {
  97. pe.length += 2
  98. if len(in) > math.MaxInt16 {
  99. return PacketEncodingError{fmt.Sprintf("string too long (%d)", len(in))}
  100. }
  101. pe.length += len(in)
  102. return nil
  103. }
  104. func (pe *prepEncoder) putStringArray(in []string) error {
  105. err := pe.putArrayLength(len(in))
  106. if err != nil {
  107. return err
  108. }
  109. for _, str := range in {
  110. if err := pe.putString(str); err != nil {
  111. return err
  112. }
  113. }
  114. return nil
  115. }
  116. func (pe *prepEncoder) putCompactInt32Array(in []int32) error {
  117. if in == nil {
  118. return errors.New("expected int32 array to be non null")
  119. }
  120. pe.putUVarint(uint64(len(in)) + 1)
  121. pe.length += 4 * len(in)
  122. return nil
  123. }
  124. func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error {
  125. if in == nil {
  126. pe.putUVarint(0)
  127. return nil
  128. }
  129. pe.putUVarint(uint64(len(in)) + 1)
  130. pe.length += 4 * len(in)
  131. return nil
  132. }
  133. func (pe *prepEncoder) putInt32Array(in []int32) error {
  134. err := pe.putArrayLength(len(in))
  135. if err != nil {
  136. return err
  137. }
  138. pe.length += 4 * len(in)
  139. return nil
  140. }
  141. func (pe *prepEncoder) putInt64Array(in []int64) error {
  142. err := pe.putArrayLength(len(in))
  143. if err != nil {
  144. return err
  145. }
  146. pe.length += 8 * len(in)
  147. return nil
  148. }
  149. func (pe *prepEncoder) putEmptyTaggedFieldArray() {
  150. pe.putUVarint(0)
  151. }
  152. func (pe *prepEncoder) offset() int {
  153. return pe.length
  154. }
  155. // stackable
  156. func (pe *prepEncoder) push(in pushEncoder) {
  157. in.saveOffset(pe.length)
  158. pe.length += in.reserveLength()
  159. pe.stack = append(pe.stack, in)
  160. }
  161. func (pe *prepEncoder) pop() error {
  162. in := pe.stack[len(pe.stack)-1]
  163. pe.stack = pe.stack[:len(pe.stack)-1]
  164. if dpe, ok := in.(dynamicPushEncoder); ok {
  165. pe.length += dpe.adjustLength(pe.length)
  166. }
  167. return nil
  168. }
  169. // we do not record metrics during the prep encoder pass
  170. func (pe *prepEncoder) metricRegistry() metrics.Registry {
  171. return nil
  172. }