decompress.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package sarama
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync"
  6. snappy "github.com/eapache/go-xerial-snappy"
  7. "github.com/klauspost/compress/gzip"
  8. "github.com/pierrec/lz4/v4"
  9. )
  10. var (
  11. lz4ReaderPool = sync.Pool{
  12. New: func() interface{} {
  13. return lz4.NewReader(nil)
  14. },
  15. }
  16. gzipReaderPool sync.Pool
  17. bufferPool = sync.Pool{
  18. New: func() interface{} {
  19. return new(bytes.Buffer)
  20. },
  21. }
  22. bytesPool = sync.Pool{
  23. New: func() interface{} {
  24. res := make([]byte, 0, 4096)
  25. return &res
  26. },
  27. }
  28. )
  29. func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
  30. switch cc {
  31. case CompressionNone:
  32. return data, nil
  33. case CompressionGZIP:
  34. var err error
  35. reader, ok := gzipReaderPool.Get().(*gzip.Reader)
  36. if !ok {
  37. reader, err = gzip.NewReader(bytes.NewReader(data))
  38. } else {
  39. err = reader.Reset(bytes.NewReader(data))
  40. }
  41. if err != nil {
  42. return nil, err
  43. }
  44. buffer := bufferPool.Get().(*bytes.Buffer)
  45. _, err = buffer.ReadFrom(reader)
  46. // copy the buffer to a new slice with the correct length
  47. // reuse gzipReader and buffer
  48. gzipReaderPool.Put(reader)
  49. res := make([]byte, buffer.Len())
  50. copy(res, buffer.Bytes())
  51. buffer.Reset()
  52. bufferPool.Put(buffer)
  53. return res, err
  54. case CompressionSnappy:
  55. return snappy.Decode(data)
  56. case CompressionLZ4:
  57. reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
  58. if !ok {
  59. reader = lz4.NewReader(bytes.NewReader(data))
  60. } else {
  61. reader.Reset(bytes.NewReader(data))
  62. }
  63. buffer := bufferPool.Get().(*bytes.Buffer)
  64. _, err := buffer.ReadFrom(reader)
  65. // copy the buffer to a new slice with the correct length
  66. // reuse lz4Reader and buffer
  67. lz4ReaderPool.Put(reader)
  68. res := make([]byte, buffer.Len())
  69. copy(res, buffer.Bytes())
  70. buffer.Reset()
  71. bufferPool.Put(buffer)
  72. return res, err
  73. case CompressionZSTD:
  74. buffer := *bytesPool.Get().(*[]byte)
  75. var err error
  76. buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data)
  77. // copy the buffer to a new slice with the correct length and reuse buffer
  78. res := make([]byte, len(buffer))
  79. copy(res, buffer)
  80. buffer = buffer[:0]
  81. bytesPool.Put(&buffer)
  82. return res, err
  83. default:
  84. return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
  85. }
  86. }