1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package sarama
- import (
- "bytes"
- "fmt"
- "sync"
- snappy "github.com/eapache/go-xerial-snappy"
- "github.com/klauspost/compress/gzip"
- "github.com/pierrec/lz4/v4"
- )
- var (
- lz4ReaderPool = sync.Pool{
- New: func() interface{} {
- return lz4.NewReader(nil)
- },
- }
- gzipReaderPool sync.Pool
- bufferPool = sync.Pool{
- New: func() interface{} {
- return new(bytes.Buffer)
- },
- }
- bytesPool = sync.Pool{
- New: func() interface{} {
- res := make([]byte, 0, 4096)
- return &res
- },
- }
- )
- func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
- switch cc {
- case CompressionNone:
- return data, nil
- case CompressionGZIP:
- var err error
- reader, ok := gzipReaderPool.Get().(*gzip.Reader)
- if !ok {
- reader, err = gzip.NewReader(bytes.NewReader(data))
- } else {
- err = reader.Reset(bytes.NewReader(data))
- }
- if err != nil {
- return nil, err
- }
- buffer := bufferPool.Get().(*bytes.Buffer)
- _, err = buffer.ReadFrom(reader)
- // copy the buffer to a new slice with the correct length
- // reuse gzipReader and buffer
- gzipReaderPool.Put(reader)
- res := make([]byte, buffer.Len())
- copy(res, buffer.Bytes())
- buffer.Reset()
- bufferPool.Put(buffer)
- return res, err
- case CompressionSnappy:
- return snappy.Decode(data)
- case CompressionLZ4:
- reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
- if !ok {
- reader = lz4.NewReader(bytes.NewReader(data))
- } else {
- reader.Reset(bytes.NewReader(data))
- }
- buffer := bufferPool.Get().(*bytes.Buffer)
- _, err := buffer.ReadFrom(reader)
- // copy the buffer to a new slice with the correct length
- // reuse lz4Reader and buffer
- lz4ReaderPool.Put(reader)
- res := make([]byte, buffer.Len())
- copy(res, buffer.Bytes())
- buffer.Reset()
- bufferPool.Put(buffer)
- return res, err
- case CompressionZSTD:
- buffer := *bytesPool.Get().(*[]byte)
- var err error
- buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data)
- // copy the buffer to a new slice with the correct length and reuse buffer
- res := make([]byte, len(buffer))
- copy(res, buffer)
- buffer = buffer[:0]
- bytesPool.Put(&buffer)
- return res, err
- default:
- return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
- }
- }
|