1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package sarama
- import (
- "sync"
- "github.com/klauspost/compress/zstd"
- )
- // zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
- // If the pool of encoders is exhausted then new encoders will be created on the fly
- const zstdMaxBufferedEncoders = 1
- type ZstdEncoderParams struct {
- Level int
- }
- type ZstdDecoderParams struct {
- }
- var zstdDecMap sync.Map
- var zstdAvailableEncoders sync.Map
- func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
- if c, ok := zstdAvailableEncoders.Load(params); ok {
- return c.(chan *zstd.Encoder)
- }
- c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders))
- return c.(chan *zstd.Encoder)
- }
- func getZstdEncoder(params ZstdEncoderParams) *zstd.Encoder {
- select {
- case enc := <-getZstdEncoderChannel(params):
- return enc
- default:
- encoderLevel := zstd.SpeedDefault
- if params.Level != CompressionLevelDefault {
- encoderLevel = zstd.EncoderLevelFromZstd(params.Level)
- }
- zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true),
- zstd.WithEncoderLevel(encoderLevel),
- zstd.WithEncoderConcurrency(1))
- return zstdEnc
- }
- }
- func releaseEncoder(params ZstdEncoderParams, enc *zstd.Encoder) {
- select {
- case getZstdEncoderChannel(params) <- enc:
- default:
- }
- }
- func getDecoder(params ZstdDecoderParams) *zstd.Decoder {
- if ret, ok := zstdDecMap.Load(params); ok {
- return ret.(*zstd.Decoder)
- }
- // It's possible to race and create multiple new readers.
- // Only one will survive GC after use.
- zstdDec, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
- zstdDecMap.Store(params, zstdDec)
- return zstdDec
- }
- func zstdDecompress(params ZstdDecoderParams, dst, src []byte) ([]byte, error) {
- return getDecoder(params).DecodeAll(src, dst)
- }
- func zstdCompress(params ZstdEncoderParams, dst, src []byte) ([]byte, error) {
- enc := getZstdEncoder(params)
- out := enc.EncodeAll(src, dst)
- releaseEncoder(params, enc)
- return out, nil
- }
|