refactor: open boltdb only while using it (#1879)
* refactor: open boltdb only while using it * patch * Update handler_test.go * Update handler_test.go * Update handler_test.go * Update handler.go * timeout * 10 * pr feedback * fixup
This commit is contained in:
parent
310cb79e81
commit
e597046195
2 changed files with 83 additions and 42 deletions
|
@ -27,7 +27,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
db *bolthold.Store
|
dir string
|
||||||
storage *Storage
|
storage *Storage
|
||||||
router *httprouter.Router
|
router *httprouter.Router
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
|
@ -62,19 +62,7 @@ func StartHandler(dir, outboundIP string, port uint16, logger logrus.FieldLogger
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := bolthold.Open(filepath.Join(dir, "bolt.db"), 0o644, &bolthold.Options{
|
h.dir = dir
|
||||||
Encoder: json.Marshal,
|
|
||||||
Decoder: json.Unmarshal,
|
|
||||||
Options: &bbolt.Options{
|
|
||||||
Timeout: 5 * time.Second,
|
|
||||||
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
|
|
||||||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
h.db = db
|
|
||||||
|
|
||||||
storage, err := NewStorage(filepath.Join(dir, "cache"))
|
storage, err := NewStorage(filepath.Join(dir, "cache"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -150,16 +138,21 @@ func (h *Handler) Close() error {
|
||||||
}
|
}
|
||||||
h.listener = nil
|
h.listener = nil
|
||||||
}
|
}
|
||||||
if h.db != nil {
|
|
||||||
err := h.db.Close()
|
|
||||||
if err != nil {
|
|
||||||
retErr = err
|
|
||||||
}
|
|
||||||
h.db = nil
|
|
||||||
}
|
|
||||||
return retErr
|
return retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) openDB() (*bolthold.Store, error) {
|
||||||
|
return bolthold.Open(filepath.Join(h.dir, "bolt.db"), 0o644, &bolthold.Options{
|
||||||
|
Encoder: json.Marshal,
|
||||||
|
Decoder: json.Unmarshal,
|
||||||
|
Options: &bbolt.Options{
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
|
||||||
|
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// GET /_apis/artifactcache/cache
|
// GET /_apis/artifactcache/cache
|
||||||
func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
||||||
keys := strings.Split(r.URL.Query().Get("keys"), ",")
|
keys := strings.Split(r.URL.Query().Get("keys"), ",")
|
||||||
|
@ -169,7 +162,14 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para
|
||||||
}
|
}
|
||||||
version := r.URL.Query().Get("version")
|
version := r.URL.Query().Get("version")
|
||||||
|
|
||||||
cache, err := h.findCache(keys, version)
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
cache, err := h.findCache(db, keys, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
|
@ -183,7 +183,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
_ = h.db.Delete(cache.ID, cache)
|
_ = db.Delete(cache.ID, cache)
|
||||||
h.responseJSON(w, r, 204)
|
h.responseJSON(w, r, 204)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -206,7 +206,13 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P
|
||||||
|
|
||||||
cache := api.ToCache()
|
cache := api.ToCache()
|
||||||
cache.FillKeyVersionHash()
|
cache.FillKeyVersionHash()
|
||||||
if err := h.db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
||||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
if !errors.Is(err, bolthold.ErrNotFound) {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
|
@ -219,12 +225,12 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
cache.CreatedAt = now
|
cache.CreatedAt = now
|
||||||
cache.UsedAt = now
|
cache.UsedAt = now
|
||||||
if err := h.db.Insert(bolthold.NextSequence(), cache); err != nil {
|
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// write back id to db
|
// write back id to db
|
||||||
if err := h.db.Update(cache.ID, cache); err != nil {
|
if err := db.Update(cache.ID, cache); err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -242,7 +248,13 @@ func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
}
|
}
|
||||||
|
|
||||||
cache := &Cache{}
|
cache := &Cache{}
|
||||||
if err := h.db.Get(id, cache); err != nil {
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
if err := db.Get(id, cache); err != nil {
|
||||||
if errors.Is(err, bolthold.ErrNotFound) {
|
if errors.Is(err, bolthold.ErrNotFound) {
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
||||||
return
|
return
|
||||||
|
@ -255,6 +267,7 @@ func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
db.Close()
|
||||||
start, _, err := parseContentRange(r.Header.Get("Content-Range"))
|
start, _, err := parseContentRange(r.Header.Get("Content-Range"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 400, err)
|
h.responseJSON(w, r, 400, err)
|
||||||
|
@ -276,7 +289,13 @@ func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
}
|
}
|
||||||
|
|
||||||
cache := &Cache{}
|
cache := &Cache{}
|
||||||
if err := h.db.Get(id, cache); err != nil {
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
if err := db.Get(id, cache); err != nil {
|
||||||
if errors.Is(err, bolthold.ErrNotFound) {
|
if errors.Is(err, bolthold.ErrNotFound) {
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
||||||
return
|
return
|
||||||
|
@ -290,13 +309,22 @@ func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.Close()
|
||||||
|
|
||||||
if err := h.storage.Commit(cache.ID, cache.Size); err != nil {
|
if err := h.storage.Commit(cache.ID, cache.Size); err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db, err = h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
cache.Complete = true
|
cache.Complete = true
|
||||||
if err := h.db.Update(cache.ID, cache); err != nil {
|
if err := db.Update(cache.ID, cache); err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -332,7 +360,7 @@ func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found, return (nil, nil) instead of an error.
|
// if not found, return (nil, nil) instead of an error.
|
||||||
func (h *Handler) findCache(keys []string, version string) (*Cache, error) {
|
func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -344,7 +372,7 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) {
|
||||||
}
|
}
|
||||||
cache.FillKeyVersionHash()
|
cache.FillKeyVersionHash()
|
||||||
|
|
||||||
if err := h.db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
||||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
if !errors.Is(err, bolthold.ErrNotFound) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -355,7 +383,7 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) {
|
||||||
|
|
||||||
for _, prefix := range keys[1:] {
|
for _, prefix := range keys[1:] {
|
||||||
found := false
|
found := false
|
||||||
if err := h.db.ForEach(bolthold.Where("Key").Ge(prefix).And("Version").Eq(version).SortBy("Key"), func(v *Cache) error {
|
if err := db.ForEach(bolthold.Where("Key").Ge(prefix).And("Version").Eq(version).SortBy("Key"), func(v *Cache) error {
|
||||||
if !strings.HasPrefix(v.Key, prefix) {
|
if !strings.HasPrefix(v.Key, prefix) {
|
||||||
return stop
|
return stop
|
||||||
}
|
}
|
||||||
|
@ -378,12 +406,17 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) useCache(id int64) {
|
func (h *Handler) useCache(id int64) {
|
||||||
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
cache := &Cache{}
|
cache := &Cache{}
|
||||||
if err := h.db.Get(id, cache); err != nil {
|
if err := db.Get(id, cache); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cache.UsedAt = time.Now().Unix()
|
cache.UsedAt = time.Now().Unix()
|
||||||
_ = h.db.Update(cache.ID, cache)
|
_ = db.Update(cache.ID, cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) gcCache() {
|
func (h *Handler) gcCache() {
|
||||||
|
@ -408,8 +441,14 @@ func (h *Handler) gcCache() {
|
||||||
keepTemp = 5 * time.Minute
|
keepTemp = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
db, err := h.openDB()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
var caches []*Cache
|
var caches []*Cache
|
||||||
if err := h.db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil {
|
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
h.logger.Warnf("find caches: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
|
@ -417,7 +456,7 @@ func (h *Handler) gcCache() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
if err := h.db.Delete(cache.ID, cache); err != nil {
|
if err := db.Delete(cache.ID, cache); err != nil {
|
||||||
h.logger.Warnf("delete cache: %v", err)
|
h.logger.Warnf("delete cache: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -426,12 +465,12 @@ func (h *Handler) gcCache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
caches = caches[:0]
|
caches = caches[:0]
|
||||||
if err := h.db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil {
|
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
h.logger.Warnf("find caches: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
if err := h.db.Delete(cache.ID, cache); err != nil {
|
if err := db.Delete(cache.ID, cache); err != nil {
|
||||||
h.logger.Warnf("delete cache: %v", err)
|
h.logger.Warnf("delete cache: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -440,12 +479,12 @@ func (h *Handler) gcCache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
caches = caches[:0]
|
caches = caches[:0]
|
||||||
if err := h.db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil {
|
if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
h.logger.Warnf("find caches: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
if err := h.db.Delete(cache.ID, cache); err != nil {
|
if err := db.Delete(cache.ID, cache); err != nil {
|
||||||
h.logger.Warnf("delete cache: %v", err)
|
h.logger.Warnf("delete cache: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,10 @@ func TestHandler(t *testing.T) {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
t.Run("inpect db", func(t *testing.T) {
|
t.Run("inpect db", func(t *testing.T) {
|
||||||
require.NoError(t, handler.db.Bolt().View(func(tx *bbolt.Tx) error {
|
db, err := handler.openDB()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
require.NoError(t, db.Bolt().View(func(tx *bbolt.Tx) error {
|
||||||
return tx.Bucket([]byte("Cache")).ForEach(func(k, v []byte) error {
|
return tx.Bucket([]byte("Cache")).ForEach(func(k, v []byte) error {
|
||||||
t.Logf("%s: %s", k, v)
|
t.Logf("%s: %s", k, v)
|
||||||
return nil
|
return nil
|
||||||
|
@ -36,7 +39,6 @@ func TestHandler(t *testing.T) {
|
||||||
require.NoError(t, handler.Close())
|
require.NoError(t, handler.Close())
|
||||||
assert.Nil(t, handler.server)
|
assert.Nil(t, handler.server)
|
||||||
assert.Nil(t, handler.listener)
|
assert.Nil(t, handler.listener)
|
||||||
assert.Nil(t, handler.db)
|
|
||||||
_, err := http.Post(fmt.Sprintf("%s/caches/%d", base, 1), "", nil)
|
_, err := http.Post(fmt.Sprintf("%s/caches/%d", base, 1), "", nil)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue