Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

### CLI

* Improve performance of `databricks fs cp` command by parallelizing file uploads when
copying directories with the `--recursive` flag.

### Bundles

### Dependency updates
Expand Down
123 changes: 86 additions & 37 deletions cmd/fs/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,94 +9,133 @@ import (
"path"
"path/filepath"
"strings"
"sync"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

// Default number of concurrent file copy operations.
const defaultConcurrency = 16

// errInvalidConcurrency is returned when the value of the concurrency
// flag is invalid.
var errInvalidConcurrency = errors.New("--concurrency must be at least 1")

type copy struct {
overwrite bool
recursive bool
overwrite bool
recursive bool
concurrency int

ctx context.Context
sourceFiler filer.Filer
targetFiler filer.Filer
sourceScheme string
targetScheme string

mu sync.Mutex // protect output from concurrent writes
}

func (c *copy) cpWriteCallback(sourceDir, targetDir string) fs.WalkDirFunc {
return func(sourcePath string, d fs.DirEntry, err error) error {
// cpDirToDir recursively copies the contents of a directory to another
// directory.
//
// There is no guarantee on the order in which the files are copied.
//
// The method does not take care of retrying on error; this is considered to
// be the responsibility of the Filer implementation. If a file copy fails,
// the error is returned and the other copies are cancelled.
func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) error {
if !c.recursive {
return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir)
}

// Create cancellable context to ensure cleanup and that all goroutines
// are stopped when the function exits on any error path (e.g. permission
// denied when walking the source directory).
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Pool of workers to process copy operations in parallel.
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(c.concurrency)

// Walk the source directory, queueing file copy operations for processing.
sourceFs := filer.NewFS(ctx, c.sourceFiler)
err := fs.WalkDir(sourceFs, sourceDir, func(sourcePath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

// Compute path relative to the target directory
// Compute path relative to the source directory.
relPath, err := filepath.Rel(sourceDir, sourcePath)
if err != nil {
return err
}
relPath = filepath.ToSlash(relPath)

// Compute target path for the file
// Compute target path for the file.
targetPath := path.Join(targetDir, relPath)

// create directory and return early
// Create the directory synchronously. This must happen before files
// are copied into it, and WalkDir guarantees directories are visited
// before their contents.
if d.IsDir() {
return c.targetFiler.Mkdir(c.ctx, targetPath)
return c.targetFiler.Mkdir(ctx, targetPath)
}

return c.cpFileToFile(sourcePath, targetPath)
}
}

func (c *copy) cpDirToDir(sourceDir, targetDir string) error {
if !c.recursive {
return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir)
// Queue file copy operation for processing.
g.Go(func() error {
if ctx.Err() != nil {
return ctx.Err()
}
return c.cpFileToFile(ctx, sourcePath, targetPath)
})
return nil
})
if err != nil {
return err
}

sourceFs := filer.NewFS(c.ctx, c.sourceFiler)
return fs.WalkDir(sourceFs, sourceDir, c.cpWriteCallback(sourceDir, targetDir))
return g.Wait()
}

func (c *copy) cpFileToDir(sourcePath, targetDir string) error {
func (c *copy) cpFileToDir(ctx context.Context, sourcePath, targetDir string) error {
fileName := filepath.Base(sourcePath)
targetPath := path.Join(targetDir, fileName)

return c.cpFileToFile(sourcePath, targetPath)
return c.cpFileToFile(ctx, sourcePath, targetPath)
}

func (c *copy) cpFileToFile(sourcePath, targetPath string) error {
func (c *copy) cpFileToFile(ctx context.Context, sourcePath, targetPath string) error {
// Get reader for file at source path
r, err := c.sourceFiler.Read(c.ctx, sourcePath)
r, err := c.sourceFiler.Read(ctx, sourcePath)
if err != nil {
return err
}
defer r.Close()

if c.overwrite {
err = c.targetFiler.Write(c.ctx, targetPath, r, filer.OverwriteIfExists)
err = c.targetFiler.Write(ctx, targetPath, r, filer.OverwriteIfExists)
if err != nil {
return err
}
} else {
err = c.targetFiler.Write(c.ctx, targetPath, r)
err = c.targetFiler.Write(ctx, targetPath, r)
// skip if file already exists
if err != nil && errors.Is(err, fs.ErrExist) {
return c.emitFileSkippedEvent(sourcePath, targetPath)
return c.emitFileSkippedEvent(ctx, sourcePath, targetPath)
}
if err != nil {
return err
}
}
return c.emitFileCopiedEvent(sourcePath, targetPath)
return c.emitFileCopiedEvent(ctx, sourcePath, targetPath)
}

// TODO: emit these events on stderr
// TODO: add integration tests for these events
func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error {
func (c *copy) emitFileSkippedEvent(ctx context.Context, sourcePath, targetPath string) error {
fullSourcePath := sourcePath
if c.sourceScheme != "" {
fullSourcePath = path.Join(c.sourceScheme+":", sourcePath)
Expand All @@ -109,10 +148,12 @@ func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error {
event := newFileSkippedEvent(fullSourcePath, fullTargetPath)
template := "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n"

return cmdio.RenderWithTemplate(c.ctx, event, "", template)
c.mu.Lock()
defer c.mu.Unlock()
return cmdio.RenderWithTemplate(ctx, event, "", template)
}

func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error {
func (c *copy) emitFileCopiedEvent(ctx context.Context, sourcePath, targetPath string) error {
fullSourcePath := sourcePath
if c.sourceScheme != "" {
fullSourcePath = path.Join(c.sourceScheme+":", sourcePath)
Expand All @@ -125,7 +166,9 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error {
event := newFileCopiedEvent(fullSourcePath, fullTargetPath)
template := "{{.SourcePath}} -> {{.TargetPath}}\n"

return cmdio.RenderWithTemplate(c.ctx, event, "", template)
c.mu.Lock()
defer c.mu.Unlock()
return cmdio.RenderWithTemplate(ctx, event, "", template)
}

// hasTrailingDirSeparator checks if a path ends with a directory separator.
Expand Down Expand Up @@ -153,13 +196,20 @@ func newCpCommand() *cobra.Command {
When copying a file, if TARGET_PATH is a directory, the file will be created
inside the directory, otherwise the file is created at TARGET_PATH.
`,
Args: root.ExactArgs(2),
PreRunE: root.MustWorkspaceClient,
Args: root.ExactArgs(2),
}

var c copy
cmd.Flags().BoolVar(&c.overwrite, "overwrite", false, "overwrite existing files")
cmd.Flags().BoolVarP(&c.recursive, "recursive", "r", false, "recursively copy files from directory")
cmd.Flags().IntVar(&c.concurrency, "concurrency", defaultConcurrency, "number of parallel copy operations")

cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
if c.concurrency <= 0 {
return errInvalidConcurrency
}
return root.MustWorkspaceClient(cmd, args)
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
Expand Down Expand Up @@ -187,7 +237,6 @@ func newCpCommand() *cobra.Command {
c.targetScheme = "dbfs"
}

c.ctx = ctx
c.sourceFiler = sourceFiler
c.targetFiler = targetFiler

Expand All @@ -199,7 +248,7 @@ func newCpCommand() *cobra.Command {

// case 1: source path is a directory, then recursively create files at target path
if sourceInfo.IsDir() {
return c.cpDirToDir(sourcePath, targetPath)
return c.cpDirToDir(ctx, sourcePath, targetPath)
}

// If target path has a trailing separator, trim it and let case 2 handle it
Expand All @@ -210,11 +259,11 @@ func newCpCommand() *cobra.Command {
// case 2: source path is a file, and target path is a directory. In this case
// we copy the file to inside the directory
if targetInfo, err := targetFiler.Stat(ctx, targetPath); err == nil && targetInfo.IsDir() {
return c.cpFileToDir(sourcePath, targetPath)
return c.cpFileToDir(ctx, sourcePath, targetPath)
}

// case 3: source path is a file, and target path is a file
return c.cpFileToFile(sourcePath, targetPath)
return c.cpFileToFile(ctx, sourcePath, targetPath)
}

v := newValidArgs()
Expand Down
Loading