writer.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package lz4
  2. import (
  3. "io"
  4. "github.com/pierrec/lz4/v4/internal/lz4block"
  5. "github.com/pierrec/lz4/v4/internal/lz4errors"
  6. "github.com/pierrec/lz4/v4/internal/lz4stream"
  7. )
  8. var writerStates = []aState{
  9. noState: newState,
  10. newState: writeState,
  11. writeState: closedState,
  12. closedState: newState,
  13. errorState: newState,
  14. }
  15. // NewWriter returns a new LZ4 frame encoder.
  16. func NewWriter(w io.Writer) *Writer {
  17. zw := &Writer{frame: lz4stream.NewFrame()}
  18. zw.state.init(writerStates)
  19. _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
  20. zw.Reset(w)
  21. return zw
  22. }
  23. // Writer allows writing an LZ4 stream.
  24. type Writer struct {
  25. state _State
  26. src io.Writer // destination writer
  27. level lz4block.CompressionLevel // how hard to try
  28. num int // concurrency level
  29. frame *lz4stream.Frame // frame being built
  30. data []byte // pending data
  31. idx int // size of pending data
  32. handler func(int)
  33. legacy bool
  34. }
  35. func (*Writer) private() {}
  36. func (w *Writer) Apply(options ...Option) (err error) {
  37. defer w.state.check(&err)
  38. switch w.state.state {
  39. case newState:
  40. case errorState:
  41. return w.state.err
  42. default:
  43. return lz4errors.ErrOptionClosedOrError
  44. }
  45. w.Reset(w.src)
  46. for _, o := range options {
  47. if err = o(w); err != nil {
  48. return
  49. }
  50. }
  51. return
  52. }
  53. func (w *Writer) isNotConcurrent() bool {
  54. return w.num == 1
  55. }
  56. // init sets up the Writer when in newState. It does not change the Writer state.
  57. func (w *Writer) init() error {
  58. w.frame.InitW(w.src, w.num, w.legacy)
  59. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  60. w.data = size.Get()
  61. w.idx = 0
  62. return w.frame.Descriptor.Write(w.frame, w.src)
  63. }
  64. func (w *Writer) Write(buf []byte) (n int, err error) {
  65. defer w.state.check(&err)
  66. switch w.state.state {
  67. case writeState:
  68. case closedState, errorState:
  69. return 0, w.state.err
  70. case newState:
  71. if err = w.init(); w.state.next(err) {
  72. return
  73. }
  74. default:
  75. return 0, w.state.fail()
  76. }
  77. zn := len(w.data)
  78. for len(buf) > 0 {
  79. if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn {
  80. // Avoid a copy as there is enough data for a block.
  81. if err = w.write(buf[:zn], false); err != nil {
  82. return
  83. }
  84. n += zn
  85. buf = buf[zn:]
  86. continue
  87. }
  88. // Accumulate the data to be compressed.
  89. m := copy(w.data[w.idx:], buf)
  90. n += m
  91. w.idx += m
  92. buf = buf[m:]
  93. if w.idx < len(w.data) {
  94. // Buffer not filled.
  95. return
  96. }
  97. // Buffer full.
  98. if err = w.write(w.data, true); err != nil {
  99. return
  100. }
  101. if !w.isNotConcurrent() {
  102. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  103. w.data = size.Get()
  104. }
  105. w.idx = 0
  106. }
  107. return
  108. }
  109. func (w *Writer) write(data []byte, safe bool) error {
  110. if w.isNotConcurrent() {
  111. block := w.frame.Blocks.Block
  112. err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
  113. w.handler(len(block.Data))
  114. return err
  115. }
  116. c := make(chan *lz4stream.FrameDataBlock)
  117. w.frame.Blocks.Blocks <- c
  118. go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
  119. b := lz4stream.NewFrameDataBlock(w.frame)
  120. c <- b.Compress(w.frame, data, w.level)
  121. <-c
  122. w.handler(len(b.Data))
  123. b.Close(w.frame)
  124. if safe {
  125. // safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
  126. lz4block.Put(data)
  127. }
  128. }(c, data, safe)
  129. return nil
  130. }
  131. // Flush any buffered data to the underlying writer immediately.
  132. func (w *Writer) Flush() (err error) {
  133. switch w.state.state {
  134. case writeState:
  135. case errorState:
  136. return w.state.err
  137. case newState:
  138. if err = w.init(); w.state.next(err) {
  139. return
  140. }
  141. default:
  142. return nil
  143. }
  144. if w.idx > 0 {
  145. // Flush pending data, disable w.data freeing as it is done later on.
  146. if err = w.write(w.data[:w.idx], false); err != nil {
  147. return err
  148. }
  149. w.idx = 0
  150. }
  151. return nil
  152. }
  153. // Close closes the Writer, flushing any unwritten data to the underlying writer
  154. // without closing it.
  155. func (w *Writer) Close() error {
  156. if err := w.Flush(); err != nil {
  157. return err
  158. }
  159. err := w.frame.CloseW(w.src, w.num)
  160. // It is now safe to free the buffer.
  161. if w.data != nil {
  162. lz4block.Put(w.data)
  163. w.data = nil
  164. }
  165. return err
  166. }
  167. // Reset clears the state of the Writer w such that it is equivalent to its
  168. // initial state from NewWriter, but instead writing to writer.
  169. // Reset keeps the previous options unless overwritten by the supplied ones.
  170. // No access to writer is performed.
  171. //
  172. // w.Close must be called before Reset or pending data may be dropped.
  173. func (w *Writer) Reset(writer io.Writer) {
  174. w.frame.Reset(w.num)
  175. w.state.reset()
  176. w.src = writer
  177. }
  178. // ReadFrom efficiently reads from r and compressed into the Writer destination.
  179. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  180. switch w.state.state {
  181. case closedState, errorState:
  182. return 0, w.state.err
  183. case newState:
  184. if err = w.init(); w.state.next(err) {
  185. return
  186. }
  187. default:
  188. return 0, w.state.fail()
  189. }
  190. defer w.state.check(&err)
  191. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  192. var done bool
  193. var rn int
  194. data := size.Get()
  195. if w.isNotConcurrent() {
  196. // Keep the same buffer for the whole process.
  197. defer lz4block.Put(data)
  198. }
  199. for !done {
  200. rn, err = io.ReadFull(r, data)
  201. switch err {
  202. case nil:
  203. case io.EOF, io.ErrUnexpectedEOF: // read may be partial
  204. done = true
  205. default:
  206. return
  207. }
  208. n += int64(rn)
  209. err = w.write(data[:rn], true)
  210. if err != nil {
  211. return
  212. }
  213. w.handler(rn)
  214. if !done && !w.isNotConcurrent() {
  215. // The buffer will be returned automatically by go routines (safe=true)
  216. // so get a new one fo the next round.
  217. data = size.Get()
  218. }
  219. }
  220. return
  221. }