161 lines
3.9 KiB
Go
161 lines
3.9 KiB
Go
|
// Copyright 2012 Google Inc. All rights reserved.
|
||
|
// Use of this source code is governed by the Apache 2.0
|
||
|
// license that can be found in the LICENSE file.
|
||
|
|
||
|
package blobstore
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"os"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
"golang.org/x/net/context"
|
||
|
|
||
|
"google.golang.org/appengine"
|
||
|
"google.golang.org/appengine/internal"
|
||
|
|
||
|
blobpb "google.golang.org/appengine/internal/blobstore"
|
||
|
)
|
||
|
|
||
|
// openBlob returns a reader for a blob. It always succeeds; if the blob does
|
||
|
// not exist then an error will be reported upon first read.
|
||
|
func openBlob(c context.Context, blobKey appengine.BlobKey) Reader {
|
||
|
return &reader{
|
||
|
c: c,
|
||
|
blobKey: blobKey,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const readBufferSize = 256 * 1024
|
||
|
|
||
|
// reader is a blob reader. It implements the Reader interface.
|
||
|
type reader struct {
|
||
|
c context.Context
|
||
|
|
||
|
// Either blobKey or filename is set:
|
||
|
blobKey appengine.BlobKey
|
||
|
filename string
|
||
|
|
||
|
closeFunc func() // is nil if unavailable or already closed.
|
||
|
|
||
|
// buf is the read buffer. r is how much of buf has been read.
|
||
|
// off is the offset of buf[0] relative to the start of the blob.
|
||
|
// An invariant is 0 <= r && r <= len(buf).
|
||
|
// Reads that don't require an RPC call will increment r but not off.
|
||
|
// Seeks may modify r without discarding the buffer, but only if the
|
||
|
// invariant can be maintained.
|
||
|
mu sync.Mutex
|
||
|
buf []byte
|
||
|
r int
|
||
|
off int64
|
||
|
}
|
||
|
|
||
|
func (r *reader) Close() error {
|
||
|
if f := r.closeFunc; f != nil {
|
||
|
f()
|
||
|
}
|
||
|
r.closeFunc = nil
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *reader) Read(p []byte) (int, error) {
|
||
|
if len(p) == 0 {
|
||
|
return 0, nil
|
||
|
}
|
||
|
r.mu.Lock()
|
||
|
defer r.mu.Unlock()
|
||
|
if r.r == len(r.buf) {
|
||
|
if err := r.fetch(r.off + int64(r.r)); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
}
|
||
|
n := copy(p, r.buf[r.r:])
|
||
|
r.r += n
|
||
|
return n, nil
|
||
|
}
|
||
|
|
||
|
func (r *reader) ReadAt(p []byte, off int64) (int, error) {
|
||
|
if len(p) == 0 {
|
||
|
return 0, nil
|
||
|
}
|
||
|
r.mu.Lock()
|
||
|
defer r.mu.Unlock()
|
||
|
// Convert relative offsets to absolute offsets.
|
||
|
ab0 := r.off + int64(r.r)
|
||
|
ab1 := r.off + int64(len(r.buf))
|
||
|
ap0 := off
|
||
|
ap1 := off + int64(len(p))
|
||
|
// Check if we can satisfy the read entirely out of the existing buffer.
|
||
|
if r.off <= ap0 && ap1 <= ab1 {
|
||
|
// Convert off from an absolute offset to a relative offset.
|
||
|
rp0 := int(ap0 - r.off)
|
||
|
return copy(p, r.buf[rp0:]), nil
|
||
|
}
|
||
|
// Restore the original Read/Seek offset after ReadAt completes.
|
||
|
defer r.seek(ab0)
|
||
|
// Repeatedly fetch and copy until we have filled p.
|
||
|
n := 0
|
||
|
for len(p) > 0 {
|
||
|
if err := r.fetch(off + int64(n)); err != nil {
|
||
|
return n, err
|
||
|
}
|
||
|
r.r = copy(p, r.buf)
|
||
|
n += r.r
|
||
|
p = p[r.r:]
|
||
|
}
|
||
|
return n, nil
|
||
|
}
|
||
|
|
||
|
func (r *reader) Seek(offset int64, whence int) (ret int64, err error) {
|
||
|
r.mu.Lock()
|
||
|
defer r.mu.Unlock()
|
||
|
switch whence {
|
||
|
case os.SEEK_SET:
|
||
|
ret = offset
|
||
|
case os.SEEK_CUR:
|
||
|
ret = r.off + int64(r.r) + offset
|
||
|
case os.SEEK_END:
|
||
|
return 0, errors.New("seeking relative to the end of a blob isn't supported")
|
||
|
default:
|
||
|
return 0, fmt.Errorf("invalid Seek whence value: %d", whence)
|
||
|
}
|
||
|
if ret < 0 {
|
||
|
return 0, errors.New("negative Seek offset")
|
||
|
}
|
||
|
return r.seek(ret)
|
||
|
}
|
||
|
|
||
|
// fetch fetches readBufferSize bytes starting at the given offset. On success,
|
||
|
// the data is saved as r.buf.
|
||
|
func (r *reader) fetch(off int64) error {
|
||
|
req := &blobpb.FetchDataRequest{
|
||
|
BlobKey: proto.String(string(r.blobKey)),
|
||
|
StartIndex: proto.Int64(off),
|
||
|
EndIndex: proto.Int64(off + readBufferSize - 1), // EndIndex is inclusive.
|
||
|
}
|
||
|
res := &blobpb.FetchDataResponse{}
|
||
|
if err := internal.Call(r.c, "blobstore", "FetchData", req, res); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if len(res.Data) == 0 {
|
||
|
return io.EOF
|
||
|
}
|
||
|
r.buf, r.r, r.off = res.Data, 0, off
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// seek seeks to the given offset with an effective whence equal to SEEK_SET.
|
||
|
// It discards the read buffer if the invariant cannot be maintained.
|
||
|
func (r *reader) seek(off int64) (int64, error) {
|
||
|
delta := off - r.off
|
||
|
if delta >= 0 && delta < int64(len(r.buf)) {
|
||
|
r.r = int(delta)
|
||
|
return off, nil
|
||
|
}
|
||
|
r.buf, r.r, r.off = nil, 0, off
|
||
|
return off, nil
|
||
|
}
|