| package storage |
| |
| import ( |
| "bufio" |
| "bytes" |
| "context" |
| "fmt" |
| "io" |
| "io/ioutil" |
| |
| storagedriver "github.com/docker/distribution/registry/storage/driver" |
| ) |
| |
| // TODO(stevvooe): Set an optimal buffer size here. We'll have to |
| // understand the latency characteristics of the underlying network to |
| // set this correctly, so we may want to leave it to the driver. For |
| // out of process drivers, we'll have to optimize this buffer size for |
| // local communication. |
| const fileReaderBufferSize = 4 << 20 |
| |
| // remoteFileReader provides a read seeker interface to files stored in |
| // storagedriver. Used to implement part of layer interface and will be used |
| // to implement read side of LayerUpload. |
| type fileReader struct { |
| driver storagedriver.StorageDriver |
| |
| ctx context.Context |
| |
| // identifying fields |
| path string |
| size int64 // size is the total size, must be set. |
| |
| // mutable fields |
| rc io.ReadCloser // remote read closer |
| brd *bufio.Reader // internal buffered io |
| offset int64 // offset is the current read offset |
| err error // terminal error, if set, reader is closed |
| } |
| |
| // newFileReader initializes a file reader for the remote file. The reader |
| // takes on the size and path that must be determined externally with a stat |
| // call. The reader operates optimistically, assuming that the file is already |
| // there. |
| func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string, size int64) (*fileReader, error) { |
| return &fileReader{ |
| ctx: ctx, |
| driver: driver, |
| path: path, |
| size: size, |
| }, nil |
| } |
| |
| func (fr *fileReader) Read(p []byte) (n int, err error) { |
| if fr.err != nil { |
| return 0, fr.err |
| } |
| |
| rd, err := fr.reader() |
| if err != nil { |
| return 0, err |
| } |
| |
| n, err = rd.Read(p) |
| fr.offset += int64(n) |
| |
| // Simulate io.EOR error if we reach filesize. |
| if err == nil && fr.offset >= fr.size { |
| err = io.EOF |
| } |
| |
| return n, err |
| } |
| |
| func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { |
| if fr.err != nil { |
| return 0, fr.err |
| } |
| |
| var err error |
| newOffset := fr.offset |
| |
| switch whence { |
| case io.SeekCurrent: |
| newOffset += offset |
| case io.SeekEnd: |
| newOffset = fr.size + offset |
| case io.SeekStart: |
| newOffset = offset |
| } |
| |
| if newOffset < 0 { |
| err = fmt.Errorf("cannot seek to negative position") |
| } else { |
| if fr.offset != newOffset { |
| fr.reset() |
| } |
| |
| // No problems, set the offset. |
| fr.offset = newOffset |
| } |
| |
| return fr.offset, err |
| } |
| |
| func (fr *fileReader) Close() error { |
| return fr.closeWithErr(fmt.Errorf("fileReader: closed")) |
| } |
| |
| // reader prepares the current reader at the lrs offset, ensuring its buffered |
| // and ready to go. |
| func (fr *fileReader) reader() (io.Reader, error) { |
| if fr.err != nil { |
| return nil, fr.err |
| } |
| |
| if fr.rc != nil { |
| return fr.brd, nil |
| } |
| |
| // If we don't have a reader, open one up. |
| rc, err := fr.driver.Reader(fr.ctx, fr.path, fr.offset) |
| if err != nil { |
| switch err := err.(type) { |
| case storagedriver.PathNotFoundError: |
| // NOTE(stevvooe): If the path is not found, we simply return a |
| // reader that returns io.EOF. However, we do not set fr.rc, |
| // allowing future attempts at getting a reader to possibly |
| // succeed if the file turns up later. |
| return ioutil.NopCloser(bytes.NewReader([]byte{})), nil |
| default: |
| return nil, err |
| } |
| } |
| |
| fr.rc = rc |
| |
| if fr.brd == nil { |
| fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize) |
| } else { |
| fr.brd.Reset(fr.rc) |
| } |
| |
| return fr.brd, nil |
| } |
| |
| // resetReader resets the reader, forcing the read method to open up a new |
| // connection and rebuild the buffered reader. This should be called when the |
| // offset and the reader will become out of sync, such as during a seek |
| // operation. |
| func (fr *fileReader) reset() { |
| if fr.err != nil { |
| return |
| } |
| if fr.rc != nil { |
| fr.rc.Close() |
| fr.rc = nil |
| } |
| } |
| |
| func (fr *fileReader) closeWithErr(err error) error { |
| if fr.err != nil { |
| return fr.err |
| } |
| |
| fr.err = err |
| |
| // close and release reader chain |
| if fr.rc != nil { |
| fr.rc.Close() |
| } |
| |
| fr.rc = nil |
| fr.brd = nil |
| |
| return fr.err |
| } |