223 lines
4.5 KiB
Go
223 lines
4.5 KiB
Go
|
package lz4
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io"
|
||
|
|
||
|
"github.com/pierrec/lz4/v4/internal/lz4block"
|
||
|
"github.com/pierrec/lz4/v4/internal/lz4errors"
|
||
|
"github.com/pierrec/lz4/v4/internal/lz4stream"
|
||
|
)
|
||
|
|
||
|
type crState int
|
||
|
|
||
|
const (
|
||
|
crStateInitial crState = iota
|
||
|
crStateReading
|
||
|
crStateFlushing
|
||
|
crStateDone
|
||
|
)
|
||
|
|
||
|
type CompressingReader struct {
|
||
|
state crState
|
||
|
src io.ReadCloser // source reader
|
||
|
level lz4block.CompressionLevel // how hard to try
|
||
|
frame *lz4stream.Frame // frame being built
|
||
|
in []byte
|
||
|
out ovWriter
|
||
|
handler func(int)
|
||
|
}
|
||
|
|
||
|
// NewCompressingReader creates a reader which reads compressed data from
|
||
|
// raw stream. This makes it a logical opposite of a normal lz4.Reader.
|
||
|
// We require an io.ReadCloser as an underlying source for compatibility
|
||
|
// with Go's http.Request.
|
||
|
func NewCompressingReader(src io.ReadCloser) *CompressingReader {
|
||
|
zrd := &CompressingReader {
|
||
|
frame: lz4stream.NewFrame(),
|
||
|
}
|
||
|
|
||
|
_ = zrd.Apply(DefaultBlockSizeOption, DefaultChecksumOption, defaultOnBlockDone)
|
||
|
zrd.Reset(src)
|
||
|
|
||
|
return zrd
|
||
|
}
|
||
|
|
||
|
// Source exposes the underlying source stream for introspection and control.
|
||
|
func (zrd *CompressingReader) Source() io.ReadCloser {
|
||
|
return zrd.src
|
||
|
}
|
||
|
|
||
|
// Close simply invokes the underlying stream Close method. This method is
|
||
|
// provided for the benefit of Go http client/server, which relies on Close
|
||
|
// for goroutine termination.
|
||
|
func (zrd *CompressingReader) Close() error {
|
||
|
return zrd.src.Close()
|
||
|
}
|
||
|
|
||
|
// Apply applies useful options to the lz4 encoder.
|
||
|
func (zrd *CompressingReader) Apply(options ...Option) (err error) {
|
||
|
if zrd.state != crStateInitial {
|
||
|
return lz4errors.ErrOptionClosedOrError
|
||
|
}
|
||
|
|
||
|
zrd.Reset(zrd.src)
|
||
|
|
||
|
for _, o := range options {
|
||
|
if err = o(zrd); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (*CompressingReader) private() {}
|
||
|
|
||
|
func (zrd *CompressingReader) init() error {
|
||
|
zrd.frame.InitW(&zrd.out, 1, false)
|
||
|
size := zrd.frame.Descriptor.Flags.BlockSizeIndex()
|
||
|
zrd.in = size.Get()
|
||
|
return zrd.frame.Descriptor.Write(zrd.frame, &zrd.out)
|
||
|
}
|
||
|
|
||
|
// Read allows reading of lz4 compressed data
|
||
|
func (zrd *CompressingReader) Read(p []byte) (n int, err error) {
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
zrd.state = crStateDone
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
if !zrd.out.reset(p) {
|
||
|
return len(p), nil
|
||
|
}
|
||
|
|
||
|
switch zrd.state {
|
||
|
case crStateInitial:
|
||
|
err = zrd.init()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
zrd.state = crStateReading
|
||
|
case crStateDone:
|
||
|
return 0, errors.New("This reader is done")
|
||
|
case crStateFlushing:
|
||
|
if zrd.out.dataPos > 0 {
|
||
|
n = zrd.out.dataPos
|
||
|
zrd.out.data = nil
|
||
|
zrd.out.dataPos = 0
|
||
|
return
|
||
|
} else {
|
||
|
zrd.state = crStateDone
|
||
|
return 0, io.EOF
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for zrd.state == crStateReading {
|
||
|
block := zrd.frame.Blocks.Block
|
||
|
|
||
|
var rCount int
|
||
|
rCount, err = io.ReadFull(zrd.src, zrd.in)
|
||
|
switch err {
|
||
|
case nil:
|
||
|
err = block.Compress(
|
||
|
zrd.frame, zrd.in[ : rCount], zrd.level,
|
||
|
).Write(zrd.frame, &zrd.out)
|
||
|
zrd.handler(len(block.Data))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if zrd.out.dataPos == len(zrd.out.data) {
|
||
|
n = zrd.out.dataPos
|
||
|
zrd.out.dataPos = 0
|
||
|
zrd.out.data = nil
|
||
|
return
|
||
|
}
|
||
|
case io.EOF, io.ErrUnexpectedEOF: // read may be partial
|
||
|
if rCount > 0 {
|
||
|
err = block.Compress(
|
||
|
zrd.frame, zrd.in[ : rCount], zrd.level,
|
||
|
).Write(zrd.frame, &zrd.out)
|
||
|
zrd.handler(len(block.Data))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
err = zrd.frame.CloseW(&zrd.out, 1)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
zrd.state = crStateFlushing
|
||
|
|
||
|
n = zrd.out.dataPos
|
||
|
zrd.out.dataPos = 0
|
||
|
zrd.out.data = nil
|
||
|
return
|
||
|
default:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
err = lz4errors.ErrInternalUnhandledState
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Reset makes the stream usable again; mostly handy to reuse lz4 encoder
|
||
|
// instances.
|
||
|
func (zrd *CompressingReader) Reset(src io.ReadCloser) {
|
||
|
zrd.frame.Reset(1)
|
||
|
zrd.state = crStateInitial
|
||
|
zrd.src = src
|
||
|
zrd.out.clear()
|
||
|
}
|
||
|
|
||
|
type ovWriter struct {
|
||
|
data []byte
|
||
|
ov []byte
|
||
|
dataPos int
|
||
|
ovPos int
|
||
|
}
|
||
|
|
||
|
func (wr *ovWriter) Write(p []byte) (n int, err error) {
|
||
|
count := copy(wr.data[wr.dataPos : ], p)
|
||
|
wr.dataPos += count
|
||
|
|
||
|
if count < len(p) {
|
||
|
wr.ov = append(wr.ov, p[count : ]...)
|
||
|
}
|
||
|
|
||
|
return len(p), nil
|
||
|
}
|
||
|
|
||
|
func (wr *ovWriter) reset(out []byte) bool {
|
||
|
ovRem := len(wr.ov) - wr.ovPos
|
||
|
|
||
|
if ovRem >= len(out) {
|
||
|
wr.ovPos += copy(out, wr.ov[wr.ovPos : ])
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if ovRem > 0 {
|
||
|
copy(out, wr.ov[wr.ovPos : ])
|
||
|
wr.ov = wr.ov[ : 0]
|
||
|
wr.ovPos = 0
|
||
|
wr.dataPos = ovRem
|
||
|
} else if wr.ovPos > 0 {
|
||
|
wr.ov = wr.ov[ : 0]
|
||
|
wr.ovPos = 0
|
||
|
wr.dataPos = 0
|
||
|
}
|
||
|
|
||
|
wr.data = out
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (wr *ovWriter) clear() {
|
||
|
wr.data = nil
|
||
|
wr.dataPos = 0
|
||
|
wr.ov = wr.ov[ : 0]
|
||
|
wr.ovPos = 0
|
||
|
}
|