package sarama import ( "bytes" "fmt" "sync" snappy "github.com/eapache/go-xerial-snappy" "github.com/klauspost/compress/gzip" "github.com/pierrec/lz4/v4" ) var ( lz4WriterPool = sync.Pool{ New: func() interface{} { return lz4.NewWriter(nil) }, } gzipWriterPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } gzipWriterPoolForCompressionLevel1 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 1) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel2 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 2) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel3 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 3) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel4 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 4) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel5 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 5) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel6 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 6) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel7 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 7) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel8 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 8) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel9 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 9) if err != nil { panic(err) } return gz }, } ) func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { switch cc { case CompressionNone: return data, nil case CompressionGZIP: var ( err error buf bytes.Buffer writer *gzip.Writer ) switch level { case CompressionLevelDefault: writer = gzipWriterPool.Get().(*gzip.Writer) defer gzipWriterPool.Put(writer) writer.Reset(&buf) case 1: writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel1.Put(writer) writer.Reset(&buf) case 2: writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel2.Put(writer) writer.Reset(&buf) case 3: writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel3.Put(writer) writer.Reset(&buf) case 4: writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel4.Put(writer) writer.Reset(&buf) case 5: writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel5.Put(writer) writer.Reset(&buf) case 6: writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel6.Put(writer) writer.Reset(&buf) case 7: writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel7.Put(writer) writer.Reset(&buf) case 8: writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel8.Put(writer) writer.Reset(&buf) case 9: writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer) defer gzipWriterPoolForCompressionLevel9.Put(writer) writer.Reset(&buf) default: writer, err = gzip.NewWriterLevel(&buf, level) if err != nil { return nil, err } } if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionSnappy: return snappy.Encode(data), nil case CompressionLZ4: writer := lz4WriterPool.Get().(*lz4.Writer) defer lz4WriterPool.Put(writer) var buf bytes.Buffer writer.Reset(&buf) if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionZSTD: return zstdCompress(ZstdEncoderParams{level}, nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } }