real_encoder.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "math"
  6. "github.com/rcrowley/go-metrics"
  7. )
  8. type realEncoder struct {
  9. raw []byte
  10. off int
  11. stack []pushEncoder
  12. registry metrics.Registry
  13. }
  14. // primitives
  15. func (re *realEncoder) putInt8(in int8) {
  16. re.raw[re.off] = byte(in)
  17. re.off++
  18. }
  19. func (re *realEncoder) putInt16(in int16) {
  20. binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
  21. re.off += 2
  22. }
  23. func (re *realEncoder) putInt32(in int32) {
  24. binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
  25. re.off += 4
  26. }
  27. func (re *realEncoder) putInt64(in int64) {
  28. binary.BigEndian.PutUint64(re.raw[re.off:], uint64(in))
  29. re.off += 8
  30. }
  31. func (re *realEncoder) putVarint(in int64) {
  32. re.off += binary.PutVarint(re.raw[re.off:], in)
  33. }
  34. func (re *realEncoder) putUVarint(in uint64) {
  35. re.off += binary.PutUvarint(re.raw[re.off:], in)
  36. }
  37. func (re *realEncoder) putFloat64(in float64) {
  38. binary.BigEndian.PutUint64(re.raw[re.off:], math.Float64bits(in))
  39. re.off += 8
  40. }
  41. func (re *realEncoder) putArrayLength(in int) error {
  42. re.putInt32(int32(in))
  43. return nil
  44. }
  45. func (re *realEncoder) putCompactArrayLength(in int) {
  46. // 0 represents a null array, so +1 has to be added
  47. re.putUVarint(uint64(in + 1))
  48. }
  49. func (re *realEncoder) putBool(in bool) {
  50. if in {
  51. re.putInt8(1)
  52. return
  53. }
  54. re.putInt8(0)
  55. }
  56. // collection
  57. func (re *realEncoder) putRawBytes(in []byte) error {
  58. copy(re.raw[re.off:], in)
  59. re.off += len(in)
  60. return nil
  61. }
  62. func (re *realEncoder) putBytes(in []byte) error {
  63. if in == nil {
  64. re.putInt32(-1)
  65. return nil
  66. }
  67. re.putInt32(int32(len(in)))
  68. return re.putRawBytes(in)
  69. }
  70. func (re *realEncoder) putVarintBytes(in []byte) error {
  71. if in == nil {
  72. re.putVarint(-1)
  73. return nil
  74. }
  75. re.putVarint(int64(len(in)))
  76. return re.putRawBytes(in)
  77. }
  78. func (re *realEncoder) putCompactBytes(in []byte) error {
  79. re.putUVarint(uint64(len(in) + 1))
  80. return re.putRawBytes(in)
  81. }
  82. func (re *realEncoder) putCompactString(in string) error {
  83. re.putCompactArrayLength(len(in))
  84. return re.putRawBytes([]byte(in))
  85. }
  86. func (re *realEncoder) putNullableCompactString(in *string) error {
  87. if in == nil {
  88. re.putInt8(0)
  89. return nil
  90. }
  91. return re.putCompactString(*in)
  92. }
  93. func (re *realEncoder) putString(in string) error {
  94. re.putInt16(int16(len(in)))
  95. copy(re.raw[re.off:], in)
  96. re.off += len(in)
  97. return nil
  98. }
  99. func (re *realEncoder) putNullableString(in *string) error {
  100. if in == nil {
  101. re.putInt16(-1)
  102. return nil
  103. }
  104. return re.putString(*in)
  105. }
  106. func (re *realEncoder) putStringArray(in []string) error {
  107. err := re.putArrayLength(len(in))
  108. if err != nil {
  109. return err
  110. }
  111. for _, val := range in {
  112. if err := re.putString(val); err != nil {
  113. return err
  114. }
  115. }
  116. return nil
  117. }
  118. func (re *realEncoder) putCompactInt32Array(in []int32) error {
  119. if in == nil {
  120. return errors.New("expected int32 array to be non null")
  121. }
  122. // 0 represents a null array, so +1 has to be added
  123. re.putUVarint(uint64(len(in)) + 1)
  124. for _, val := range in {
  125. re.putInt32(val)
  126. }
  127. return nil
  128. }
  129. func (re *realEncoder) putNullableCompactInt32Array(in []int32) error {
  130. if in == nil {
  131. re.putUVarint(0)
  132. return nil
  133. }
  134. // 0 represents a null array, so +1 has to be added
  135. re.putUVarint(uint64(len(in)) + 1)
  136. for _, val := range in {
  137. re.putInt32(val)
  138. }
  139. return nil
  140. }
  141. func (re *realEncoder) putInt32Array(in []int32) error {
  142. err := re.putArrayLength(len(in))
  143. if err != nil {
  144. return err
  145. }
  146. for _, val := range in {
  147. re.putInt32(val)
  148. }
  149. return nil
  150. }
  151. func (re *realEncoder) putInt64Array(in []int64) error {
  152. err := re.putArrayLength(len(in))
  153. if err != nil {
  154. return err
  155. }
  156. for _, val := range in {
  157. re.putInt64(val)
  158. }
  159. return nil
  160. }
  161. func (re *realEncoder) putEmptyTaggedFieldArray() {
  162. re.putUVarint(0)
  163. }
  164. func (re *realEncoder) offset() int {
  165. return re.off
  166. }
  167. // stacks
  168. func (re *realEncoder) push(in pushEncoder) {
  169. in.saveOffset(re.off)
  170. re.off += in.reserveLength()
  171. re.stack = append(re.stack, in)
  172. }
  173. func (re *realEncoder) pop() error {
  174. // this is go's ugly pop pattern (the inverse of append)
  175. in := re.stack[len(re.stack)-1]
  176. re.stack = re.stack[:len(re.stack)-1]
  177. return in.run(re.off, re.raw)
  178. }
  179. // we do record metrics during the real encoder pass
  180. func (re *realEncoder) metricRegistry() metrics.Registry {
  181. return re.registry
  182. }