compressing_reader.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package lz4
  2. import (
  3. "errors"
  4. "io"
  5. "github.com/pierrec/lz4/v4/internal/lz4block"
  6. "github.com/pierrec/lz4/v4/internal/lz4errors"
  7. "github.com/pierrec/lz4/v4/internal/lz4stream"
  8. )
  9. type crState int
  10. const (
  11. crStateInitial crState = iota
  12. crStateReading
  13. crStateFlushing
  14. crStateDone
  15. )
  16. type CompressingReader struct {
  17. state crState
  18. src io.ReadCloser // source reader
  19. level lz4block.CompressionLevel // how hard to try
  20. frame *lz4stream.Frame // frame being built
  21. in []byte
  22. out ovWriter
  23. handler func(int)
  24. }
  25. // NewCompressingReader creates a reader which reads compressed data from
  26. // raw stream. This makes it a logical opposite of a normal lz4.Reader.
  27. // We require an io.ReadCloser as an underlying source for compatibility
  28. // with Go's http.Request.
  29. func NewCompressingReader(src io.ReadCloser) *CompressingReader {
  30. zrd := &CompressingReader {
  31. frame: lz4stream.NewFrame(),
  32. }
  33. _ = zrd.Apply(DefaultBlockSizeOption, DefaultChecksumOption, defaultOnBlockDone)
  34. zrd.Reset(src)
  35. return zrd
  36. }
  37. // Source exposes the underlying source stream for introspection and control.
  38. func (zrd *CompressingReader) Source() io.ReadCloser {
  39. return zrd.src
  40. }
  41. // Close simply invokes the underlying stream Close method. This method is
  42. // provided for the benefit of Go http client/server, which relies on Close
  43. // for goroutine termination.
  44. func (zrd *CompressingReader) Close() error {
  45. return zrd.src.Close()
  46. }
  47. // Apply applies useful options to the lz4 encoder.
  48. func (zrd *CompressingReader) Apply(options ...Option) (err error) {
  49. if zrd.state != crStateInitial {
  50. return lz4errors.ErrOptionClosedOrError
  51. }
  52. zrd.Reset(zrd.src)
  53. for _, o := range options {
  54. if err = o(zrd); err != nil {
  55. return
  56. }
  57. }
  58. return
  59. }
  60. func (*CompressingReader) private() {}
  61. func (zrd *CompressingReader) init() error {
  62. zrd.frame.InitW(&zrd.out, 1, false)
  63. size := zrd.frame.Descriptor.Flags.BlockSizeIndex()
  64. zrd.in = size.Get()
  65. return zrd.frame.Descriptor.Write(zrd.frame, &zrd.out)
  66. }
  67. // Read allows reading of lz4 compressed data
  68. func (zrd *CompressingReader) Read(p []byte) (n int, err error) {
  69. defer func() {
  70. if err != nil {
  71. zrd.state = crStateDone
  72. }
  73. }()
  74. if !zrd.out.reset(p) {
  75. return len(p), nil
  76. }
  77. switch zrd.state {
  78. case crStateInitial:
  79. err = zrd.init()
  80. if err != nil {
  81. return
  82. }
  83. zrd.state = crStateReading
  84. case crStateDone:
  85. return 0, errors.New("This reader is done")
  86. case crStateFlushing:
  87. if zrd.out.dataPos > 0 {
  88. n = zrd.out.dataPos
  89. zrd.out.data = nil
  90. zrd.out.dataPos = 0
  91. return
  92. } else {
  93. zrd.state = crStateDone
  94. return 0, io.EOF
  95. }
  96. }
  97. for zrd.state == crStateReading {
  98. block := zrd.frame.Blocks.Block
  99. var rCount int
  100. rCount, err = io.ReadFull(zrd.src, zrd.in)
  101. switch err {
  102. case nil:
  103. err = block.Compress(
  104. zrd.frame, zrd.in[ : rCount], zrd.level,
  105. ).Write(zrd.frame, &zrd.out)
  106. zrd.handler(len(block.Data))
  107. if err != nil {
  108. return
  109. }
  110. if zrd.out.dataPos == len(zrd.out.data) {
  111. n = zrd.out.dataPos
  112. zrd.out.dataPos = 0
  113. zrd.out.data = nil
  114. return
  115. }
  116. case io.EOF, io.ErrUnexpectedEOF: // read may be partial
  117. if rCount > 0 {
  118. err = block.Compress(
  119. zrd.frame, zrd.in[ : rCount], zrd.level,
  120. ).Write(zrd.frame, &zrd.out)
  121. zrd.handler(len(block.Data))
  122. if err != nil {
  123. return
  124. }
  125. }
  126. err = zrd.frame.CloseW(&zrd.out, 1)
  127. if err != nil {
  128. return
  129. }
  130. zrd.state = crStateFlushing
  131. n = zrd.out.dataPos
  132. zrd.out.dataPos = 0
  133. zrd.out.data = nil
  134. return
  135. default:
  136. return
  137. }
  138. }
  139. err = lz4errors.ErrInternalUnhandledState
  140. return
  141. }
  142. // Reset makes the stream usable again; mostly handy to reuse lz4 encoder
  143. // instances.
  144. func (zrd *CompressingReader) Reset(src io.ReadCloser) {
  145. zrd.frame.Reset(1)
  146. zrd.state = crStateInitial
  147. zrd.src = src
  148. zrd.out.clear()
  149. }
  150. type ovWriter struct {
  151. data []byte
  152. ov []byte
  153. dataPos int
  154. ovPos int
  155. }
  156. func (wr *ovWriter) Write(p []byte) (n int, err error) {
  157. count := copy(wr.data[wr.dataPos : ], p)
  158. wr.dataPos += count
  159. if count < len(p) {
  160. wr.ov = append(wr.ov, p[count : ]...)
  161. }
  162. return len(p), nil
  163. }
  164. func (wr *ovWriter) reset(out []byte) bool {
  165. ovRem := len(wr.ov) - wr.ovPos
  166. if ovRem >= len(out) {
  167. wr.ovPos += copy(out, wr.ov[wr.ovPos : ])
  168. return false
  169. }
  170. if ovRem > 0 {
  171. copy(out, wr.ov[wr.ovPos : ])
  172. wr.ov = wr.ov[ : 0]
  173. wr.ovPos = 0
  174. wr.dataPos = ovRem
  175. } else if wr.ovPos > 0 {
  176. wr.ov = wr.ov[ : 0]
  177. wr.ovPos = 0
  178. wr.dataPos = 0
  179. }
  180. wr.data = out
  181. return true
  182. }
  183. func (wr *ovWriter) clear() {
  184. wr.data = nil
  185. wr.dataPos = 0
  186. wr.ov = wr.ov[ : 0]
  187. wr.ovPos = 0
  188. }