reader.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package lz4
  2. import (
  3. "bytes"
  4. "io"
  5. "github.com/pierrec/lz4/v4/internal/lz4block"
  6. "github.com/pierrec/lz4/v4/internal/lz4errors"
  7. "github.com/pierrec/lz4/v4/internal/lz4stream"
  8. )
  9. var readerStates = []aState{
  10. noState: newState,
  11. errorState: newState,
  12. newState: readState,
  13. readState: closedState,
  14. closedState: newState,
  15. }
  16. // NewReader returns a new LZ4 frame decoder.
  17. func NewReader(r io.Reader) *Reader {
  18. return newReader(r, false)
  19. }
  20. func newReader(r io.Reader, legacy bool) *Reader {
  21. zr := &Reader{frame: lz4stream.NewFrame()}
  22. zr.state.init(readerStates)
  23. _ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
  24. zr.Reset(r)
  25. return zr
  26. }
  27. // Reader allows reading an LZ4 stream.
  28. type Reader struct {
  29. state _State
  30. src io.Reader // source reader
  31. num int // concurrency level
  32. frame *lz4stream.Frame // frame being read
  33. data []byte // block buffer allocated in non concurrent mode
  34. reads chan []byte // pending data
  35. idx int // size of pending data
  36. handler func(int)
  37. cum uint32
  38. dict []byte
  39. }
  40. func (*Reader) private() {}
  41. func (r *Reader) Apply(options ...Option) (err error) {
  42. defer r.state.check(&err)
  43. switch r.state.state {
  44. case newState:
  45. case errorState:
  46. return r.state.err
  47. default:
  48. return lz4errors.ErrOptionClosedOrError
  49. }
  50. for _, o := range options {
  51. if err = o(r); err != nil {
  52. return
  53. }
  54. }
  55. return
  56. }
  57. // Size returns the size of the underlying uncompressed data, if set in the stream.
  58. func (r *Reader) Size() int {
  59. switch r.state.state {
  60. case readState, closedState:
  61. if r.frame.Descriptor.Flags.Size() {
  62. return int(r.frame.Descriptor.ContentSize)
  63. }
  64. }
  65. return 0
  66. }
  67. func (r *Reader) isNotConcurrent() bool {
  68. return r.num == 1
  69. }
  70. func (r *Reader) init() error {
  71. err := r.frame.ParseHeaders(r.src)
  72. if err != nil {
  73. return err
  74. }
  75. if !r.frame.Descriptor.Flags.BlockIndependence() {
  76. // We can't decompress dependent blocks concurrently.
  77. // Instead of throwing an error to the user, silently drop concurrency
  78. r.num = 1
  79. }
  80. data, err := r.frame.InitR(r.src, r.num)
  81. if err != nil {
  82. return err
  83. }
  84. r.reads = data
  85. r.idx = 0
  86. size := r.frame.Descriptor.Flags.BlockSizeIndex()
  87. r.data = size.Get()
  88. r.cum = 0
  89. return nil
  90. }
  91. func (r *Reader) Read(buf []byte) (n int, err error) {
  92. defer r.state.check(&err)
  93. switch r.state.state {
  94. case readState:
  95. case closedState, errorState:
  96. return 0, r.state.err
  97. case newState:
  98. // First initialization.
  99. if err = r.init(); r.state.next(err) {
  100. return
  101. }
  102. default:
  103. return 0, r.state.fail()
  104. }
  105. for len(buf) > 0 {
  106. var bn int
  107. if r.idx == 0 {
  108. if r.isNotConcurrent() {
  109. bn, err = r.read(buf)
  110. } else {
  111. lz4block.Put(r.data)
  112. r.data = <-r.reads
  113. if len(r.data) == 0 {
  114. // No uncompressed data: something went wrong or we are done.
  115. err = r.frame.Blocks.ErrorR()
  116. }
  117. }
  118. switch err {
  119. case nil:
  120. case io.EOF:
  121. if er := r.frame.CloseR(r.src); er != nil {
  122. err = er
  123. }
  124. lz4block.Put(r.data)
  125. r.data = nil
  126. return
  127. default:
  128. return
  129. }
  130. }
  131. if bn == 0 {
  132. // Fill buf with buffered data.
  133. bn = copy(buf, r.data[r.idx:])
  134. r.idx += bn
  135. if r.idx == len(r.data) {
  136. // All data read, get ready for the next Read.
  137. r.idx = 0
  138. }
  139. }
  140. buf = buf[bn:]
  141. n += bn
  142. r.handler(bn)
  143. }
  144. return
  145. }
  146. // read uncompresses the next block as follow:
  147. // - if buf has enough room, the block is uncompressed into it directly
  148. // and the lenght of used space is returned
  149. // - else, the uncompress data is stored in r.data and 0 is returned
  150. func (r *Reader) read(buf []byte) (int, error) {
  151. block := r.frame.Blocks.Block
  152. _, err := block.Read(r.frame, r.src, r.cum)
  153. if err != nil {
  154. return 0, err
  155. }
  156. var direct bool
  157. dst := r.data[:cap(r.data)]
  158. if len(buf) >= len(dst) {
  159. // Uncompress directly into buf.
  160. direct = true
  161. dst = buf
  162. }
  163. dst, err = block.Uncompress(r.frame, dst, r.dict, true)
  164. if err != nil {
  165. return 0, err
  166. }
  167. if !r.frame.Descriptor.Flags.BlockIndependence() {
  168. if len(r.dict)+len(dst) > 128*1024 {
  169. preserveSize := 64*1024 - len(dst)
  170. if preserveSize < 0 {
  171. preserveSize = 0
  172. }
  173. r.dict = r.dict[len(r.dict)-preserveSize:]
  174. }
  175. r.dict = append(r.dict, dst...)
  176. }
  177. r.cum += uint32(len(dst))
  178. if direct {
  179. return len(dst), nil
  180. }
  181. r.data = dst
  182. return 0, nil
  183. }
  184. // Reset clears the state of the Reader r such that it is equivalent to its
  185. // initial state from NewReader, but instead reading from reader.
  186. // No access to reader is performed.
  187. func (r *Reader) Reset(reader io.Reader) {
  188. if r.data != nil {
  189. lz4block.Put(r.data)
  190. r.data = nil
  191. }
  192. r.frame.Reset(r.num)
  193. r.state.reset()
  194. r.src = reader
  195. r.reads = nil
  196. }
  197. // WriteTo efficiently uncompresses the data from the Reader underlying source to w.
  198. func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
  199. switch r.state.state {
  200. case closedState, errorState:
  201. return 0, r.state.err
  202. case newState:
  203. if err = r.init(); r.state.next(err) {
  204. return
  205. }
  206. default:
  207. return 0, r.state.fail()
  208. }
  209. defer r.state.nextd(&err)
  210. var data []byte
  211. if r.isNotConcurrent() {
  212. size := r.frame.Descriptor.Flags.BlockSizeIndex()
  213. data = size.Get()
  214. defer lz4block.Put(data)
  215. }
  216. for {
  217. var bn int
  218. var dst []byte
  219. if r.isNotConcurrent() {
  220. bn, err = r.read(data)
  221. dst = data[:bn]
  222. } else {
  223. lz4block.Put(dst)
  224. dst = <-r.reads
  225. bn = len(dst)
  226. if bn == 0 {
  227. // No uncompressed data: something went wrong or we are done.
  228. err = r.frame.Blocks.ErrorR()
  229. }
  230. }
  231. switch err {
  232. case nil:
  233. case io.EOF:
  234. err = r.frame.CloseR(r.src)
  235. return
  236. default:
  237. return
  238. }
  239. r.handler(bn)
  240. bn, err = w.Write(dst)
  241. n += int64(bn)
  242. if err != nil {
  243. return
  244. }
  245. }
  246. }
  247. // ValidFrameHeader returns a bool indicating if the given bytes slice matches a LZ4 header.
  248. func ValidFrameHeader(in []byte) (bool, error) {
  249. f := lz4stream.NewFrame()
  250. err := f.ParseHeaders(bytes.NewReader(in))
  251. if err == nil {
  252. return true, nil
  253. }
  254. if err == lz4errors.ErrInvalidFrame {
  255. return false, nil
  256. }
  257. return false, err
  258. }