diff --git a/go.mod b/go.mod index 7a5a4c2f..12563d3c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/emirpasic/gods v1.18.1 github.com/go-git/go-git/v5 v5.16.4 + github.com/gofrs/flock v0.13.0 github.com/libgit2/git2go/v34 v34.0.0 github.com/minio/sha256-simd v1.0.1 github.com/modelpack/model-spec v0.0.7 @@ -24,7 +25,7 @@ require ( github.com/vbauerster/mpb/v8 v8.11.3 golang.org/x/crypto v0.45.0 golang.org/x/sync v0.18.0 - golang.org/x/sys v0.39.0 + golang.org/x/sys v0.40.0 google.golang.org/grpc v1.78.0 oras.land/oras-go/v2 v2.6.0 ) diff --git a/go.sum b/go.sum index f9a782da..010e3a42 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9L github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.13.0 h1:95JolYOvGMqeH31+FC7D2+uULf6mG61mEZ/A8dRYMzw= +github.com/gofrs/flock v0.13.0/go.mod h1:jxeyy9R1auM5S6JYDBhDt+E2TCo7DkratH4Pgi8P+Z0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= @@ -337,8 +339,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 00000000..0de35d9d --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,210 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "time" + + "github.com/gofrs/flock" +) + +const ( + // TTL is the time-to-live for cached items. + TTL = 24 * time.Hour + + // FileLockRetryDelay is the delay between retries when acquiring file locks. + FileLockRetryDelay = 100 * time.Millisecond +) + +// ErrNotFound is returned when an item is not found in the cache. +var ErrNotFound = errors.New("item not found") + +// Cache is the interface for caching file related information. +type Cache interface { + // Get retrieves an item from the cache. + Get(ctx context.Context, path string) (*Item, error) + + // Put inserts or updates an item in the cache. + Put(ctx context.Context, item *Item) error +} + +// Item represents a cached file item. +type Item struct { + // Path is the absolute path of the file. + Path string `json:"path"` + + // ModTime is the last modification time of the file. + ModTime time.Time `json:"mod_time"` + + // Size is the size of the file in bytes. + Size int64 `json:"size"` + + // Digest is the SHA-256 digest of the file. + Digest string `json:"digest"` + + // CreatedAt is the time when the item was created. + CreatedAt time.Time `json:"created_at"` +} + +// cache is the implementation of the Cache interface. +type cache struct { + // storageDir is the directory where the cache items are stored. + storageDir string + + // flock is the file lock for the cache file. + flock *flock.Flock +} + +// New creates a new cache instance. +func New(storageDir string) (Cache, error) { + c := &cache{ + storageDir: storageDir, + } + + // Ensure cache directory exists. + cacheDir := filepath.Dir(c.storagePath()) + if err := os.MkdirAll(cacheDir, 0755); err != nil { + return nil, err + } + + c.flock = flock.New(c.storagePath()) + return c, nil +} + +// storagePath returns the path to the storage cache file. +func (c *cache) storagePath() string { + return filepath.Join(c.storageDir, "modctl-cache.json") +} + +// readItems reads all items from the cache file without locking. +// The caller must hold the lock. +func (c *cache) readItems() (map[string]*Item, error) { + data, err := os.ReadFile(c.storagePath()) + if err != nil { + // If the file doesn't exist, return an empty map. + if os.IsNotExist(err) { + return make(map[string]*Item), nil + } + return nil, err + } + + // Handle empty file. + if len(data) == 0 { + return make(map[string]*Item), nil + } + + var items []*Item + if err := json.Unmarshal(data, &items); err != nil { + return nil, err + } + + itemMap := make(map[string]*Item, len(items)) + for _, item := range items { + itemMap[item.Path] = item + } + + return itemMap, nil +} + +// writeItems writes items to the cache file without locking. +// The caller must hold the lock. +func (c *cache) writeItems(itemsMap map[string]*Item) error { + items := make([]*Item, 0, len(itemsMap)) + for _, item := range itemsMap { + items = append(items, item) + } + + data, err := json.Marshal(items) + if err != nil { + return err + } + + return os.WriteFile(c.storagePath(), data, 0644) +} + +// prune removes expired items from the map in-place. +func (c *cache) prune(itemsMap map[string]*Item) { + now := time.Now() + for path, item := range itemsMap { + if now.Sub(item.CreatedAt) > TTL { + delete(itemsMap, path) + } + } +} + +// Get retrieves an item from the cache. +func (c *cache) Get(ctx context.Context, path string) (*Item, error) { + // Check context before locking + if err := ctx.Err(); err != nil { + return nil, err + } + + if _, err := c.flock.TryLockContext(ctx, FileLockRetryDelay); err != nil { + return nil, err + } + defer c.flock.Unlock() + + items, err := c.readItems() + if err != nil { + return nil, err + } + + item, ok := items[path] + if !ok { + return nil, ErrNotFound + } + + // If the item is expired, return not found. + if time.Since(item.CreatedAt) > TTL { + return nil, ErrNotFound + } + + return item, nil +} + +// Put inserts or updates an item in the cache. +func (c *cache) Put(ctx context.Context, item *Item) error { + // Check context before locking. + if err := ctx.Err(); err != nil { + return err + } + + if _, err := c.flock.TryLockContext(ctx, FileLockRetryDelay); err != nil { + return err + } + defer c.flock.Unlock() + + // Read existing items. + itemsMap, err := c.readItems() + if err != nil { + return err + } + + // Update or insert the item. + itemsMap[item.Path] = item + + // Prune expired items. + c.prune(itemsMap) + + // Write back to file. + return c.writeItems(itemsMap) +} diff --git a/pkg/backend/build/builder.go b/pkg/backend/build/builder.go index bd2c089a..9c69f432 100644 --- a/pkg/backend/build/builder.go +++ b/pkg/backend/build/builder.go @@ -25,7 +25,6 @@ import ( "io" "os" "path/filepath" - "strconv" "sync" "syscall" "time" @@ -37,12 +36,12 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "github.com/modelpack/modctl/internal/cache" buildconfig "github.com/modelpack/modctl/pkg/backend/build/config" "github.com/modelpack/modctl/pkg/backend/build/hooks" "github.com/modelpack/modctl/pkg/backend/build/interceptor" pkgcodec "github.com/modelpack/modctl/pkg/codec" "github.com/modelpack/modctl/pkg/storage" - "github.com/modelpack/modctl/pkg/xattr" ) // OutputType defines the type of output to generate. @@ -102,12 +101,20 @@ func NewBuilder(outputType OutputType, store storage.Storage, repo, tag string, return nil, err } + // TODO: Use the storage dir specified from user. + cache, err := cache.New(os.TempDir()) + if err != nil { + // Just print the error message because cache is not critical. + logrus.Errorf("failed to create cache: %v", err) + } + return &abstractBuilder{ store: store, repo: repo, tag: tag, strategy: strategy, interceptor: cfg.interceptor, + cache: cache, }, nil } @@ -120,6 +127,8 @@ type abstractBuilder struct { strategy OutputStrategy // interceptor is the interceptor used to intercept the build process. interceptor interceptor.Interceptor + // cache is the cache used to store the file digest. + cache cache.Cache } func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, path, destPath string, hooks hooks.Hooks) (ocispec.Descriptor, error) { @@ -157,7 +166,7 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err) } - reader, digest, size, err := computeDigestAndSize(mediaType, path, workDirPath, info, reader, codec) + reader, digest, size, err := ab.computeDigestAndSize(ctx, mediaType, path, workDirPath, info, reader, codec) if err != nil { return ocispec.Descriptor{}, fmt.Errorf("failed to compute digest and size: %w", err) } @@ -237,6 +246,83 @@ func (ab *abstractBuilder) BuildManifest(ctx context.Context, layers []ocispec.D return ab.strategy.OutputManifest(ctx, manifest.MediaType, digest, int64(len(manifestJSON)), bytes.NewReader(manifestJSON), hooks) } +// computeDigestAndSize computes the digest and size for the encoded content, using cache if available. +func (ab *abstractBuilder) computeDigestAndSize(ctx context.Context, mediaType, path, workDirPath string, info os.FileInfo, reader io.Reader, codec pkgcodec.Codec) (io.Reader, string, int64, error) { + // Try to retrieve valid digest from cache for raw model weights. + if mediaType == modelspec.MediaTypeModelWeightRaw { + if digest, size, ok := ab.retrieveCache(ctx, path, info); ok { + return reader, digest, size, nil + } + } + + logrus.Infof("builder: calculating digest for file %s", path) + + hash := sha256.New() + size, err := io.Copy(hash, reader) + if err != nil { + return reader, "", 0, fmt.Errorf("failed to copy content to hash: %w", err) + } + digest := fmt.Sprintf("sha256:%x", hash.Sum(nil)) + + logrus.Infof("builder: calculated digest for file %s [digest: %s]", path, digest) + + // Reset reader for subsequent use. + reader, err = resetReader(reader, path, workDirPath, codec) + if err != nil { + return reader, "", 0, err + } + + // Update cache. + if mediaType == modelspec.MediaTypeModelWeightRaw { + if err := ab.updateCache(ctx, path, info.ModTime(), size, digest); err != nil { + logrus.Warnf("builder: failed to update cache for file %s: %s", path, err) + } + } + + return reader, digest, size, nil +} + +// retrieveCache checks if mtime and size match, then returns the cached digest. +func (ab *abstractBuilder) retrieveCache(ctx context.Context, path string, info os.FileInfo) (string, int64, bool) { + if ab.cache == nil { + return "", 0, false + } + + item, err := ab.cache.Get(ctx, path) + if err != nil { + if !errors.Is(err, cache.ErrNotFound) { + logrus.Errorf("builder: failed to retrieve cache item for file %s: %s", path, err) + } + + return "", 0, false + } + + if item.ModTime != info.ModTime() || item.Size != info.Size() { + logrus.Warnf("builder: cache item for file %s is stale, skip cache", path) + return "", 0, false + } + + logrus.Infof("builder: retrieved from cache for file %s [digest: %s]", path, item.Digest) + return item.Digest, item.Size, true +} + +// updateCache writes mtime, size, and digest to cache. +func (ab *abstractBuilder) updateCache(ctx context.Context, path string, mtime time.Time, size int64, digest string) error { + if ab.cache == nil { + return errors.New("cache is not initialized") + } + + item := &cache.Item{ + Path: path, + ModTime: mtime, + Size: size, + Digest: digest, + CreatedAt: time.Now(), + } + + return ab.cache.Put(ctx, item) +} + // BuildModelConfig builds the model config. func BuildModelConfig(modelConfig *buildconfig.Model, layers []ocispec.Descriptor) (modelspec.Model, error) { if modelConfig == nil { @@ -286,82 +372,6 @@ func BuildModelConfig(modelConfig *buildconfig.Model, layers []ocispec.Descripto }, nil } -// computeDigestAndSize computes the digest and size for the encoded content, using xattrs if available. -func computeDigestAndSize(mediaType, path, workDirPath string, info os.FileInfo, reader io.Reader, codec pkgcodec.Codec) (io.Reader, string, int64, error) { - // Try to retrieve valid digest from xattrs cache. - if pkgcodec.IsRawMediaType(mediaType) { - if digest, size, ok := retrieveCachedDigest(path, info); ok { - return reader, digest, size, nil - } - } - - logrus.Infof("builder: calculating digest for file %s", path) - - hash := sha256.New() - size, err := io.Copy(hash, reader) - if err != nil { - return reader, "", 0, fmt.Errorf("failed to copy content to hash: %w", err) - } - digest := fmt.Sprintf("sha256:%x", hash.Sum(nil)) - - logrus.Infof("builder: calculated digest for file %s [digest: %s]", path, digest) - - // Reset reader for subsequent use. - reader, err = resetReader(reader, path, workDirPath, codec) - if err != nil { - return reader, "", 0, err - } - - // Update xattrs cache. - if pkgcodec.IsRawMediaType(mediaType) { - if err := updateCachedDigest(path, info.ModTime().UnixNano(), size, digest); err != nil { - logrus.Warnf("builder: failed to update xattrs for file %s: %s", path, err) - } - } - - return reader, digest, size, nil -} - -// retrieveCachedDigest checks if mtime and size match, then returns the cached digest. -func retrieveCachedDigest(path string, info os.FileInfo) (string, int64, bool) { - mtimeData, err := xattr.Get(path, xattr.MakeKey(xattr.KeyMtime)) - if err != nil || string(mtimeData) != strconv.FormatInt(info.ModTime().UnixNano(), 10) { - return "", 0, false - } - - sizeData, err := xattr.Get(path, xattr.MakeKey(xattr.KeySize)) - if err != nil { - return "", 0, false - } - cachedSize, err := strconv.ParseInt(string(sizeData), 10, 64) - if err != nil || cachedSize != info.Size() { - return "", 0, false - } - - digestData, err := xattr.Get(path, xattr.MakeKey(xattr.KeySha256)) - if err != nil { - return "", 0, false - } - - digest := string(digestData) - logrus.Infof("builder: retrieved from xattr cache for file %s [digest: %s]", path, digest) - return digest, cachedSize, true -} - -// updateCachedDigest writes mtime, size, and digest to xattrs. -func updateCachedDigest(path string, mtime, size int64, digest string) error { - if err := xattr.Set(path, xattr.MakeKey(xattr.KeyMtime), []byte(strconv.FormatInt(mtime, 10))); err != nil { - return err - } - if err := xattr.Set(path, xattr.MakeKey(xattr.KeySha256), []byte(digest)); err != nil { - return err - } - if err := xattr.Set(path, xattr.MakeKey(xattr.KeySize), []byte(strconv.FormatInt(size, 10))); err != nil { - return err - } - return nil -} - // resetReader resets the reader to the beginning or re-encodes if not seekable. func resetReader(reader io.Reader, path, workDirPath string, codec pkgcodec.Codec) (io.Reader, error) { if seeker, ok := reader.(io.ReadSeeker); ok { diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 4b4105f5..537507db 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -72,8 +72,3 @@ func TypeFromMediaType(mediaType string) Type { return "" } - -// IsRawMediaType returns true if the media type is raw. -func IsRawMediaType(mediaType string) bool { - return strings.HasSuffix(mediaType, ".raw") -}