Commit 1d5719b2 authored by Mustafa Gezen's avatar Mustafa Gezen
Browse files

make srpmproc more efficient with downloads and uploads. use files from blob storage when possible

parent a7cb7862
......@@ -2,4 +2,6 @@ package blob
type Storage interface {
Write(path string, content []byte)
Read(path string) []byte
Exists(path string) bool
}
......@@ -3,6 +3,7 @@ package gcs
import (
"cloud.google.com/go/storage"
"context"
"io/ioutil"
"log"
)
......@@ -37,3 +38,27 @@ func (g *GCS) Write(path string, content []byte) {
log.Fatalf("could not close gcs writer to source: %v", err)
}
}
func (g *GCS) Read(path string) []byte {
ctx := context.Background()
obj := g.bucket.Object(path)
r, err := obj.NewReader(ctx)
if err != nil {
return nil
}
body, err := ioutil.ReadAll(r)
if err != nil {
return nil
}
return body
}
func (g *GCS) Exists(path string) bool {
ctx := context.Background()
obj := g.bucket.Object(path)
_, err := obj.NewReader(ctx)
return err == nil
}
......@@ -4,7 +4,9 @@ import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io/ioutil"
"log"
)
......@@ -35,3 +37,28 @@ func (s *S3) Write(path string, content []byte) {
log.Fatalf("failed to upload file to s3, %v", err)
}
}
func (s *S3) Read(path string) []byte {
obj, err := s.uploader.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
return nil
}
body, err := ioutil.ReadAll(obj.Body)
if err != nil {
return nil
}
return body
}
func (s *S3) Exists(path string) bool {
_, err := s.uploader.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
return err == nil
}
......@@ -101,7 +101,7 @@ func (g *GitMode) RetrieveSource(pd *ProcessData) *modeData {
}
}
func (g *GitMode) WriteSource(md *modeData) {
func (g *GitMode) WriteSource(pd *ProcessData, md *modeData) {
remote, err := md.repo.Remote("upstream")
if err != nil {
log.Fatalf("could not get upstream remote: %v", err)
......@@ -168,27 +168,42 @@ func (g *GitMode) WriteSource(md *modeData) {
hash := strings.TrimSpace(lineInfo[0])
path := strings.TrimSpace(lineInfo[1])
url := fmt.Sprintf("https://git.centos.org/sources/%s/%s/%s", md.rpmFile.Name(), branchName, hash)
log.Printf("downloading %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("could not create new http request: %v", err)
}
req.Header.Set("Accept-Encoding", "*")
resp, err := client.Do(req)
if err != nil {
log.Fatalf("could not download dist-git file: %v", err)
}
var body []byte
if md.blobCache[hash] != nil {
body = md.blobCache[hash]
log.Printf("retrieving %s from cache", hash)
} else {
fromBlobStorage := pd.BlobStorage.Read(hash)
if fromBlobStorage != nil {
body = fromBlobStorage
log.Printf("downloading %s from blob storage", hash)
} else {
url := fmt.Sprintf("https://git.centos.org/sources/%s/%s/%s", md.rpmFile.Name(), branchName, hash)
log.Printf("downloading %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("could not create new http request: %v", err)
}
req.Header.Set("Accept-Encoding", "*")
resp, err := client.Do(req)
if err != nil {
log.Fatalf("could not download dist-git file: %v", err)
}
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("could not read the whole dist-git file: %v", err)
}
err = resp.Body.Close()
if err != nil {
log.Fatalf("could not close body handle: %v", err)
}
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("could not read the whole dist-git file: %v", err)
}
err = resp.Body.Close()
if err != nil {
log.Fatalf("could not close body handle: %v", err)
md.blobCache[hash] = body
}
f, err := md.worktree.Filesystem.Create(path)
......
......@@ -2,7 +2,7 @@ package internal
type ImportMode interface {
RetrieveSource(pd *ProcessData) *modeData
WriteSource(md *modeData)
WriteSource(pd *ProcessData, md *modeData)
PostProcess(md *modeData)
ImportName(pd *ProcessData, md *modeData) string
}
......@@ -2,6 +2,7 @@ package internal
import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/cavaliercoder/go-rpm"
"github.com/go-git/go-billy/v5/memfs"
......@@ -15,6 +16,7 @@ import (
"hash"
"io/ioutil"
"log"
"os"
"path/filepath"
"regexp"
"strings"
......@@ -57,6 +59,7 @@ type modeData struct {
pushBranch string
branches []string
sourcesToIgnore []*ignoredSource
blobCache map[string][]byte
}
// ProcessRPM checks the RPM specs and discards any remote files
......@@ -69,12 +72,18 @@ type modeData struct {
func ProcessRPM(pd *ProcessData) {
tagImportRegex = regexp.MustCompile(fmt.Sprintf("refs/tags/(imports/(%s.|%s.-.+)/(.*))", pd.ImportBranchPrefix, pd.ImportBranchPrefix))
md := pd.Importer.RetrieveSource(pd)
md.blobCache = map[string][]byte{}
remotePrefix := "rpms"
if pd.ModuleMode {
remotePrefix = "modules"
}
var pushedHashes []string
// already uploaded blobs are skipped
var alreadyUploadedBlobs []string
// if no-dup-mode is enabled then skip already imported versions
var tagIgnoreList []string
if pd.NoDupMode {
......@@ -113,6 +122,7 @@ func ProcessRPM(pd *ProcessData) {
sourceWorktree := *md.worktree
for _, branch := range md.branches {
md.sourcesToIgnore = []*ignoredSource{}
md.repo = &sourceRepo
md.worktree = &sourceWorktree
md.tagBranch = branch
......@@ -202,7 +212,7 @@ func ProcessRPM(pd *ProcessData) {
}
}
pd.Importer.WriteSource(md)
pd.Importer.WriteSource(pd, md)
copyFromFs(md.worktree.Filesystem, w.Filesystem, ".")
md.repo = repo
......@@ -214,8 +224,6 @@ func ProcessRPM(pd *ProcessData) {
executePatchesRpm(pd, md)
}
// already uploaded blobs are skipped
var alreadyUploadedBlobs []string
// get ignored files hash and add to .{name}.metadata
metadataFile := fmt.Sprintf(".%s.metadata", rpmFile.Name())
metadata, err := w.Filesystem.Create(metadataFile)
......@@ -255,8 +263,10 @@ func ProcessRPM(pd *ProcessData) {
if strContains(alreadyUploadedBlobs, path) {
continue
}
pd.BlobStorage.Write(path, sourceFileBts)
log.Printf("wrote %s to blob storage", path)
if !pd.BlobStorage.Exists(path) {
pd.BlobStorage.Write(path, sourceFileBts)
log.Printf("wrote %s to blob storage", path)
}
alreadyUploadedBlobs = append(alreadyUploadedBlobs, path)
}
......@@ -341,5 +351,13 @@ func ProcessRPM(pd *ProcessData) {
if err != nil {
log.Fatalf("could not push to remote: %v", err)
}
hashString := obj.Hash.String()
pushedHashes = append(pushedHashes, fmt.Sprintf("%s:%s", md.pushBranch, hashString))
}
err := json.NewEncoder(os.Stdout).Encode(pushedHashes)
if err != nil {
log.Fatalf("could not print hashes")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment