transfer.sh/server/storage.go

645 lines
15 KiB
Go

package server
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/goamz/goamz/s3"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
)
type Storage interface {
Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
Head(token string, filename string) (contentType string, contentLength uint64, err error)
Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
Delete(token string, filename string) error
IsNotExist(err error) bool
Type() string
}
type LocalStorage struct {
Storage
basedir string
logger *log.Logger
}
func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
return &LocalStorage{basedir: basedir, logger: logger}, nil
}
func (s *LocalStorage) Type() string {
return "local"
}
func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
var fi os.FileInfo
if fi, err = os.Lstat(path); err != nil {
return
}
contentLength = uint64(fi.Size())
contentType = mime.TypeByExtension(filepath.Ext(filename))
return
}
func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
// content type , content length
if reader, err = os.Open(path); err != nil {
return
}
var fi os.FileInfo
if fi, err = os.Lstat(path); err != nil {
return
}
contentLength = uint64(fi.Size())
contentType = mime.TypeByExtension(filepath.Ext(filename))
return
}
func (s *LocalStorage) Delete(token string, filename string) (err error) {
metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
os.Remove(metadata)
path := filepath.Join(s.basedir, token, filename)
err = os.Remove(path)
return
}
func (s *LocalStorage) IsNotExist(err error) bool {
if err == nil {
return false
}
return os.IsNotExist(err)
}
func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
var f io.WriteCloser
var err error
path := filepath.Join(s.basedir, token)
if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
return err
}
if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
return err
}
defer f.Close()
if _, err = io.Copy(f, reader); err != nil {
return err
}
return nil
}
type S3Storage struct {
Storage
bucket *s3.Bucket
logger *log.Logger
noMultipart bool
}
func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger, disableMultipart bool) (*S3Storage, error) {
bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
if err != nil {
return nil, err
}
return &S3Storage{bucket: bucket, logger: logger, noMultipart: disableMultipart}, nil
}
func (s *S3Storage) Type() string {
return "s3"
}
func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.Head(key, map[string][]string{})
if err != nil {
return
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
if err != nil {
return
}
return
}
func (s *S3Storage) IsNotExist(err error) bool {
if err == nil {
return false
}
s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
b := (err.Error() == "The specified key does not exist.")
b = b || (err.Error() == "Access Denied")
return b
}
func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.GetResponse(key)
if err != nil {
return
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
if err != nil {
return
}
reader = response.Body
return
}
func (s *S3Storage) Delete(token string, filename string) (err error) {
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
s.bucket.Del(metadata)
key := fmt.Sprintf("%s/%s", token, filename)
err = s.bucket.Del(key)
return
}
func (s *S3Storage) putMulti(key string, reader io.Reader, contentType string, contentLength uint64) (err error) {
var (
multi *s3.Multi
parts []s3.Part
)
if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
s.logger.Printf(err.Error())
return
}
// 20 mb parts
partsChan := make(chan interface{})
// partsChan := make(chan s3.Part)
go func() {
// maximize to 20 threads
sem := make(chan int, 20)
index := 1
var wg sync.WaitGroup
for {
// buffered in memory because goamz s3 multi needs seekable reader
var (
buffer []byte = make([]byte, (1<<20)*10)
count int
err error
)
// Amazon expects parts of at least 5MB, except for the last one
if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
s.logger.Printf(err.Error())
return
}
// always send minimal 1 part
if err == io.EOF && index > 1 {
s.logger.Printf("Waiting for all parts to finish uploading.")
// wait for all parts to be finished uploading
wg.Wait()
// and close the channel
close(partsChan)
return
}
wg.Add(1)
sem <- 1
// using goroutines because of retries when upload fails
go func(multi *s3.Multi, buffer []byte, index int) {
s.logger.Printf("Uploading part %d %d", index, len(buffer))
defer func() {
s.logger.Printf("Finished part %d %d", index, len(buffer))
wg.Done()
<-sem
}()
partReader := bytes.NewReader(buffer)
var part s3.Part
if part, err = multi.PutPart(index, partReader); err != nil {
s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
partsChan <- err
return
}
s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
partsChan <- part
}(multi, buffer[:count], index)
index++
}
}()
// wait for all parts to be uploaded
for part := range partsChan {
switch part.(type) {
case s3.Part:
parts = append(parts, part.(s3.Part))
case error:
// abort multi upload
s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
err = part.(error)
multi.Abort()
return
}
}
s.logger.Printf("Completing upload %d parts", len(parts))
if err = multi.Complete(parts); err != nil {
s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
return
}
s.logger.Printf("Completed uploading %d", len(parts))
return
}
func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := fmt.Sprintf("%s/%s", token, filename)
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
if !s.noMultipart {
err = s.putMulti(key, reader, contentType, contentLength)
} else {
err = s.bucket.PutReader(key, reader, int64(contentLength), contentType, s3.Private, s3.Options{})
}
if err != nil {
return
}
s.logger.Printf("Completed uploading %s", filename)
return
}
type GDrive struct {
service *drive.Service
rootId string
basedir string
localConfigPath string
chunkSize int
logger *log.Logger
}
func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
b, err := ioutil.ReadFile(clientJsonFilepath)
if err != nil {
return nil, err
}
// If modifying these scopes, delete your previously saved client_secret.json.
config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
if err != nil {
return nil, err
}
srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
if err != nil {
return nil, err
}
chunkSize = chunkSize * 1024 * 1024
storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
err = storage.setupRoot()
if err != nil {
return nil, err
}
return storage, nil
}
const GDriveRootConfigFile = "root_id.conf"
const GDriveTokenJsonFile = "token.json"
const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
func (s *GDrive) setupRoot() error {
rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
rootId, err := ioutil.ReadFile(rootFileConfig)
if err != nil && !os.IsNotExist(err) {
return err
}
if string(rootId) != "" {
s.rootId = string(rootId)
return nil
}
dir := &drive.File{
Name: s.basedir,
MimeType: GDriveDirectoryMimeType,
}
di, err := s.service.Files.Create(dir).Fields("id").Do()
if err != nil {
return err
}
s.rootId = di.Id
err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
if err != nil {
return err
}
return nil
}
func (s *GDrive) hasChecksum(f *drive.File) bool {
return f.Md5Checksum != ""
}
func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
}
func (s *GDrive) findId(filename string, token string) (string, error) {
filename = strings.Replace(filename, `'`, `\'`, -1)
filename = strings.Replace(filename, `"`, `\"`, -1)
fileId, tokenId, nextPageToken := "", "", ""
q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
l, err := s.list(nextPageToken, q)
if err != nil {
return "", err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
tokenId = fi.Id
break
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
if filename == "" {
return tokenId, nil
} else if tokenId == "" {
return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
}
q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
l, err = s.list(nextPageToken, q)
if err != nil {
return "", err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
fileId = fi.Id
break
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
if fileId == "" {
return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
}
return fileId, nil
}
func (s *GDrive) Type() string {
return "gdrive"
}
func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
var fi *drive.File
if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
return
}
contentLength = uint64(fi.Size)
contentType = fi.MimeType
return
}
func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
var fi *drive.File
fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
if !s.hasChecksum(fi) {
err = fmt.Errorf("Cannot find file %s/%s", token, filename)
return
}
contentLength = uint64(fi.Size)
contentType = fi.MimeType
ctx := context.Background()
var res *http.Response
res, err = s.service.Files.Get(fileId).Context(ctx).Download()
if err != nil {
return
}
reader = res.Body
return
}
func (s *GDrive) Delete(token string, filename string) (err error) {
metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
s.service.Files.Delete(metadata).Do()
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
err = s.service.Files.Delete(fileId).Do()
return
}
func (s *GDrive) IsNotExist(err error) bool {
if err == nil {
return false
}
if err != nil {
if e, ok := err.(*googleapi.Error); ok {
return e.Code == http.StatusNotFound
}
}
return false
}
func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
dirId, err := s.findId("", token)
if err != nil {
return err
}
if dirId == "" {
dir := &drive.File{
Name: token,
Parents: []string{s.rootId},
MimeType: GDriveDirectoryMimeType,
}
di, err := s.service.Files.Create(dir).Fields("id").Do()
if err != nil {
return err
}
dirId = di.Id
}
// Instantiate empty drive file
dst := &drive.File{
Name: filename,
Parents: []string{dirId},
MimeType: contentType,
}
ctx := context.Background()
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
if err != nil {
return err
}
return nil
}
// Retrieve a token, saves the token, then returns the generated client.
func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
tok, err := gDriveTokenFromFile(tokenFile)
if err != nil {
tok = getGDriveTokenFromWeb(config, logger)
saveGDriveToken(tokenFile, tok, logger)
}
return config.Client(context.Background(), tok)
}
// Request a token from the web, then returns the retrieved token.
func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
fmt.Printf("Go to the following link in your browser then type the "+
"authorization code: \n%v\n", authURL)
var authCode string
if _, err := fmt.Scan(&authCode); err != nil {
logger.Fatalf("Unable to read authorization code %v", err)
}
tok, err := config.Exchange(context.TODO(), authCode)
if err != nil {
logger.Fatalf("Unable to retrieve token from web %v", err)
}
return tok
}
// Retrieves a token from a local file.
func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
f, err := os.Open(file)
defer f.Close()
if err != nil {
return nil, err
}
tok := &oauth2.Token{}
err = json.NewDecoder(f).Decode(tok)
return tok, err
}
// Saves a token to a file path.
func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
logger.Printf("Saving credential file to: %s\n", path)
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
defer f.Close()
if err != nil {
logger.Fatalf("Unable to cache oauth token: %v", err)
}
json.NewEncoder(f).Encode(token)
}