encoder_decoder.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package sarama
  2. import (
  3. "fmt"
  4. "github.com/rcrowley/go-metrics"
  5. )
  6. // Encoder is the interface that wraps the basic Encode method.
  7. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
  8. type encoder interface {
  9. encode(pe packetEncoder) error
  10. }
  11. type encoderWithHeader interface {
  12. encoder
  13. headerVersion() int16
  14. }
  15. // Encode takes an Encoder and turns it into bytes while potentially recording metrics.
  16. func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
  17. if e == nil {
  18. return nil, nil
  19. }
  20. var prepEnc prepEncoder
  21. var realEnc realEncoder
  22. err := e.encode(&prepEnc)
  23. if err != nil {
  24. return nil, err
  25. }
  26. if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
  27. return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
  28. }
  29. realEnc.raw = make([]byte, prepEnc.length)
  30. realEnc.registry = metricRegistry
  31. err = e.encode(&realEnc)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return realEnc.raw, nil
  36. }
  37. // decoder is the interface that wraps the basic Decode method.
  38. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
  39. type decoder interface {
  40. decode(pd packetDecoder) error
  41. }
  42. type versionedDecoder interface {
  43. decode(pd packetDecoder, version int16) error
  44. }
  45. // decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
  46. // interpreted using Kafka's encoding rules.
  47. func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
  48. if buf == nil {
  49. return nil
  50. }
  51. helper := realDecoder{
  52. raw: buf,
  53. registry: metricRegistry,
  54. }
  55. err := in.decode(&helper)
  56. if err != nil {
  57. return err
  58. }
  59. if helper.off != len(buf) {
  60. return PacketDecodingError{"invalid length"}
  61. }
  62. return nil
  63. }
  64. func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
  65. if buf == nil {
  66. return nil
  67. }
  68. helper := realDecoder{
  69. raw: buf,
  70. registry: metricRegistry,
  71. }
  72. err := in.decode(&helper, version)
  73. if err != nil {
  74. return err
  75. }
  76. if helper.off != len(buf) {
  77. return PacketDecodingError{
  78. Info: fmt.Sprintf("invalid length (off=%d, len=%d)", helper.off, len(buf)),
  79. }
  80. }
  81. return nil
  82. }