diff --git a/pkg/artifactcache/handler.go b/pkg/artifactcache/handler.go index f11def6..ac4755e 100644 --- a/pkg/artifactcache/handler.go +++ b/pkg/artifactcache/handler.go @@ -27,7 +27,7 @@ const ( ) type Handler struct { - db *bolthold.Store + dir string storage *Storage router *httprouter.Router listener net.Listener @@ -62,19 +62,7 @@ func StartHandler(dir, outboundIP string, port uint16, logger logrus.FieldLogger return nil, err } - db, err := bolthold.Open(filepath.Join(dir, "bolt.db"), 0o644, &bolthold.Options{ - Encoder: json.Marshal, - Decoder: json.Unmarshal, - Options: &bbolt.Options{ - Timeout: 5 * time.Second, - NoGrowSync: bbolt.DefaultOptions.NoGrowSync, - FreelistType: bbolt.DefaultOptions.FreelistType, - }, - }) - if err != nil { - return nil, err - } - h.db = db + h.dir = dir storage, err := NewStorage(filepath.Join(dir, "cache")) if err != nil { @@ -150,16 +138,21 @@ func (h *Handler) Close() error { } h.listener = nil } - if h.db != nil { - err := h.db.Close() - if err != nil { - retErr = err - } - h.db = nil - } return retErr } +func (h *Handler) openDB() (*bolthold.Store, error) { + return bolthold.Open(filepath.Join(h.dir, "bolt.db"), 0o644, &bolthold.Options{ + Encoder: json.Marshal, + Decoder: json.Unmarshal, + Options: &bbolt.Options{ + Timeout: 5 * time.Second, + NoGrowSync: bbolt.DefaultOptions.NoGrowSync, + FreelistType: bbolt.DefaultOptions.FreelistType, + }, + }) +} + // GET /_apis/artifactcache/cache func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { keys := strings.Split(r.URL.Query().Get("keys"), ",") @@ -169,7 +162,14 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para } version := r.URL.Query().Get("version") - cache, err := h.findCache(keys, version) + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + + cache, err := h.findCache(db, keys, version) if err != nil { h.responseJSON(w, r, 500, err) return @@ -183,7 +183,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para h.responseJSON(w, r, 500, err) return } else if !ok { - _ = h.db.Delete(cache.ID, cache) + _ = db.Delete(cache.ID, cache) h.responseJSON(w, r, 204) return } @@ -206,7 +206,13 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P cache := api.ToCache() cache.FillKeyVersionHash() - if err := h.db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { if !errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 500, err) return @@ -219,12 +225,12 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P now := time.Now().Unix() cache.CreatedAt = now cache.UsedAt = now - if err := h.db.Insert(bolthold.NextSequence(), cache); err != nil { + if err := db.Insert(bolthold.NextSequence(), cache); err != nil { h.responseJSON(w, r, 500, err) return } // write back id to db - if err := h.db.Update(cache.ID, cache); err != nil { + if err := db.Update(cache.ID, cache); err != nil { h.responseJSON(w, r, 500, err) return } @@ -242,7 +248,13 @@ func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprout } cache := &Cache{} - if err := h.db.Get(id, cache); err != nil { + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.Get(id, cache); err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) return @@ -255,6 +267,7 @@ func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprout h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key)) return } + db.Close() start, _, err := parseContentRange(r.Header.Get("Content-Range")) if err != nil { h.responseJSON(w, r, 400, err) @@ -276,7 +289,13 @@ func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprout } cache := &Cache{} - if err := h.db.Get(id, cache); err != nil { + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.Get(id, cache); err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) return @@ -290,13 +309,22 @@ func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprout return } + db.Close() + if err := h.storage.Commit(cache.ID, cache.Size); err != nil { h.responseJSON(w, r, 500, err) return } + db, err = h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + cache.Complete = true - if err := h.db.Update(cache.ID, cache); err != nil { + if err := db.Update(cache.ID, cache); err != nil { h.responseJSON(w, r, 500, err) return } @@ -332,7 +360,7 @@ func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle { } // if not found, return (nil, nil) instead of an error. -func (h *Handler) findCache(keys []string, version string) (*Cache, error) { +func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { if len(keys) == 0 { return nil, nil } @@ -344,7 +372,7 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) { } cache.FillKeyVersionHash() - if err := h.db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { + if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { if !errors.Is(err, bolthold.ErrNotFound) { return nil, err } @@ -355,7 +383,7 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) { for _, prefix := range keys[1:] { found := false - if err := h.db.ForEach(bolthold.Where("Key").Ge(prefix).And("Version").Eq(version).SortBy("Key"), func(v *Cache) error { + if err := db.ForEach(bolthold.Where("Key").Ge(prefix).And("Version").Eq(version).SortBy("Key"), func(v *Cache) error { if !strings.HasPrefix(v.Key, prefix) { return stop } @@ -378,12 +406,17 @@ func (h *Handler) findCache(keys []string, version string) (*Cache, error) { } func (h *Handler) useCache(id int64) { + db, err := h.openDB() + if err != nil { + return + } + defer db.Close() cache := &Cache{} - if err := h.db.Get(id, cache); err != nil { + if err := db.Get(id, cache); err != nil { return } cache.UsedAt = time.Now().Unix() - _ = h.db.Update(cache.ID, cache) + _ = db.Update(cache.ID, cache) } func (h *Handler) gcCache() { @@ -408,8 +441,14 @@ func (h *Handler) gcCache() { keepTemp = 5 * time.Minute ) + db, err := h.openDB() + if err != nil { + return + } + defer db.Close() + var caches []*Cache - if err := h.db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil { + if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { @@ -417,7 +456,7 @@ func (h *Handler) gcCache() { continue } h.storage.Remove(cache.ID) - if err := h.db.Delete(cache.ID, cache); err != nil { + if err := db.Delete(cache.ID, cache); err != nil { h.logger.Warnf("delete cache: %v", err) continue } @@ -426,12 +465,12 @@ func (h *Handler) gcCache() { } caches = caches[:0] - if err := h.db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil { + if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { h.storage.Remove(cache.ID) - if err := h.db.Delete(cache.ID, cache); err != nil { + if err := db.Delete(cache.ID, cache); err != nil { h.logger.Warnf("delete cache: %v", err) continue } @@ -440,12 +479,12 @@ func (h *Handler) gcCache() { } caches = caches[:0] - if err := h.db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil { + if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { h.storage.Remove(cache.ID) - if err := h.db.Delete(cache.ID, cache); err != nil { + if err := db.Delete(cache.ID, cache); err != nil { h.logger.Warnf("delete cache: %v", err) continue } diff --git a/pkg/artifactcache/handler_test.go b/pkg/artifactcache/handler_test.go index 7c6840a..35ec753 100644 --- a/pkg/artifactcache/handler_test.go +++ b/pkg/artifactcache/handler_test.go @@ -25,7 +25,10 @@ func TestHandler(t *testing.T) { defer func() { t.Run("inpect db", func(t *testing.T) { - require.NoError(t, handler.db.Bolt().View(func(tx *bbolt.Tx) error { + db, err := handler.openDB() + require.NoError(t, err) + defer db.Close() + require.NoError(t, db.Bolt().View(func(tx *bbolt.Tx) error { return tx.Bucket([]byte("Cache")).ForEach(func(k, v []byte) error { t.Logf("%s: %s", k, v) return nil @@ -36,7 +39,6 @@ func TestHandler(t *testing.T) { require.NoError(t, handler.Close()) assert.Nil(t, handler.server) assert.Nil(t, handler.listener) - assert.Nil(t, handler.db) _, err := http.Post(fmt.Sprintf("%s/caches/%d", base, 1), "", nil) assert.Error(t, err) })