-
Notifications
You must be signed in to change notification settings - Fork 0
Failure Reporting and Dead Letter Queue #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,81 +37,7 @@ func NewCollectGithubRepoCmd() *cobra.Command { | |
| compression, _ := cmd.Flags().GetString("compression") | ||
| password, _ := cmd.Flags().GetString("password") | ||
|
|
||
| if format != "datanode" && format != "tim" && format != "trix" && format != "stim" { | ||
| return fmt.Errorf("invalid format: %s (must be 'datanode', 'tim', 'trix', or 'stim')", format) | ||
| } | ||
| if compression != "none" && compression != "gz" && compression != "xz" { | ||
| return fmt.Errorf("invalid compression: %s (must be 'none', 'gz', or 'xz')", compression) | ||
| } | ||
|
|
||
| prompter := ui.NewNonInteractivePrompter(ui.GetVCSQuote) | ||
| prompter.Start() | ||
| defer prompter.Stop() | ||
|
|
||
| var progressWriter io.Writer | ||
| if prompter.IsInteractive() { | ||
| bar := ui.NewProgressBar(-1, "Cloning repository") | ||
| progressWriter = ui.NewProgressWriter(bar) | ||
| } | ||
|
|
||
| dn, err := GitCloner.CloneGitRepository(repoURL, progressWriter) | ||
| if err != nil { | ||
| return fmt.Errorf("error cloning repository: %w", err) | ||
| } | ||
|
|
||
| var data []byte | ||
| if format == "tim" { | ||
| t, err := tim.FromDataNode(dn) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating tim: %w", err) | ||
| } | ||
| data, err = t.ToTar() | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing tim: %w", err) | ||
| } | ||
| } else if format == "stim" { | ||
| if password == "" { | ||
| return fmt.Errorf("password required for stim format") | ||
| } | ||
| t, err := tim.FromDataNode(dn) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating tim: %w", err) | ||
| } | ||
| data, err = t.ToSigil(password) | ||
| if err != nil { | ||
| return fmt.Errorf("error encrypting stim: %w", err) | ||
| } | ||
| } else if format == "trix" { | ||
| data, err = trix.ToTrix(dn, password) | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing trix: %w", err) | ||
| } | ||
| } else { | ||
| data, err = dn.ToTar() | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing DataNode: %w", err) | ||
| } | ||
| } | ||
|
|
||
| compressedData, err := compress.Compress(data, compression) | ||
| if err != nil { | ||
| return fmt.Errorf("error compressing data: %w", err) | ||
| } | ||
|
|
||
| if outputFile == "" { | ||
| outputFile = "repo." + format | ||
| if compression != "none" { | ||
| outputFile += "." + compression | ||
| } | ||
| } | ||
|
|
||
| err = os.WriteFile(outputFile, compressedData, defaultFilePermission) | ||
| if err != nil { | ||
| return fmt.Errorf("error writing DataNode to file: %w", err) | ||
| } | ||
|
|
||
| fmt.Fprintln(cmd.OutOrStdout(), "Repository saved to", outputFile) | ||
| return nil | ||
| return collectRepo(repoURL, outputFile, format, compression, password, cmd) | ||
| }, | ||
| } | ||
| cmd.Flags().String("output", "", "Output file for the DataNode") | ||
|
|
@@ -121,6 +47,84 @@ func NewCollectGithubRepoCmd() *cobra.Command { | |
| return cmd | ||
| } | ||
|
|
||
| func collectRepo(repoURL, outputFile, format, compression, password string, cmd *cobra.Command) error { | ||
| if format != "datanode" && format != "tim" && format != "trix" && format != "stim" { | ||
| return fmt.Errorf("invalid format: %s (must be 'datanode', 'tim', 'trix', or 'stim')", format) | ||
| } | ||
| if compression != "none" && compression != "gz" && compression != "xz" { | ||
| return fmt.Errorf("invalid compression: %s (must be 'none', 'gz', or 'xz')", compression) | ||
| } | ||
|
|
||
| prompter := ui.NewNonInteractivePrompter(ui.GetVCSQuote) | ||
| prompter.Start() | ||
| defer prompter.Stop() | ||
|
|
||
| var progressWriter io.Writer | ||
| if prompter.IsInteractive() { | ||
| bar := ui.NewProgressBar(-1, "Cloning repository") | ||
| progressWriter = ui.NewProgressWriter(bar) | ||
| } | ||
|
|
||
| dn, err := GitCloner.CloneGitRepository(repoURL, progressWriter) | ||
| if err != nil { | ||
| return fmt.Errorf("error cloning repository: %w", err) | ||
| } | ||
|
|
||
| var data []byte | ||
| if format == "tim" { | ||
| t, err := tim.FromDataNode(dn) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating tim: %w", err) | ||
| } | ||
| data, err = t.ToTar() | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing tim: %w", err) | ||
| } | ||
| } else if format == "stim" { | ||
| if password == "" { | ||
| return fmt.Errorf("password required for stim format") | ||
| } | ||
| t, err := tim.FromDataNode(dn) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating tim: %w", err) | ||
| } | ||
| data, err = t.ToSigil(password) | ||
| if err != nil { | ||
| return fmt.Errorf("error encrypting stim: %w", err) | ||
| } | ||
| } else if format == "trix" { | ||
| data, err = trix.ToTrix(dn, password) | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing trix: %w", err) | ||
| } | ||
| } else { | ||
| data, err = dn.ToTar() | ||
| if err != nil { | ||
| return fmt.Errorf("error serializing DataNode: %w", err) | ||
| } | ||
| } | ||
|
|
||
| compressedData, err := compress.Compress(data, compression) | ||
| if err != nil { | ||
| return fmt.Errorf("error compressing data: %w", err) | ||
| } | ||
|
|
||
| if outputFile == "" { | ||
| outputFile = "repo." + format | ||
| if compression != "none" { | ||
| outputFile += "." + compression | ||
| } | ||
| } | ||
|
Comment on lines
+112
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation for generating a default output filename will cause issues when collecting multiple repositories, as each collected repository will overwrite the previous one. The filename Note: This change requires importing the if outputFile == "" {
// Generate a unique filename from the repo URL to avoid overwriting files.
// e.g., https://github.com/owner/name.git -> name.<format>
base := path.Base(repoURL)
repoName := strings.TrimSuffix(base, path.Ext(base))
outputFile = repoName + "." + format
if compression != "none" {
outputFile += "." + compression
}
} |
||
|
|
||
| err = os.WriteFile(outputFile, compressedData, defaultFilePermission) | ||
| if err != nil { | ||
| return fmt.Errorf("error writing DataNode to file: %w", err) | ||
| } | ||
|
|
||
| fmt.Fprintln(cmd.OutOrStdout(), "Repository saved to", outputFile) | ||
| return nil | ||
| } | ||
|
|
||
| func init() { | ||
| collectGithubCmd.AddCommand(NewCollectGithubRepoCmd()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,9 @@ package cmd | |
|
|
||
| import ( | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "github.com/Snider/Borg/pkg/failures" | ||
| "github.com/Snider/Borg/pkg/github" | ||
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
@@ -17,13 +19,57 @@ var collectGithubReposCmd = &cobra.Command{ | |
| Short: "Collects all public repositories for a user or organization", | ||
| Args: cobra.ExactArgs(1), | ||
| RunE: func(cmd *cobra.Command, args []string) error { | ||
| failuresDir, _ := cmd.Flags().GetString("failures-dir") | ||
| onFailure, _ := cmd.Flags().GetString("on-failure") | ||
|
|
||
| manager, err := failures.NewManager(failuresDir, "github:repos:"+args[0]) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create failure manager: %w", err) | ||
| } | ||
| defer manager.Finalize() | ||
|
|
||
| repos, err := GithubClient.GetPublicRepos(cmd.Context(), args[0]) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| for _, repo := range repos { | ||
| fmt.Fprintln(cmd.OutOrStdout(), repo) | ||
|
|
||
| manager.SetTotal(len(repos)) | ||
|
|
||
| attempts := make(map[string]int) | ||
| for i := 0; i < len(repos); i++ { | ||
| repo := repos[i] | ||
| attempts[repo]++ | ||
|
|
||
| fmt.Fprintln(cmd.OutOrStdout(), "Collecting", repo) | ||
| err := collectRepo(repo, "", "datanode", "none", "", cmd) | ||
| if err != nil { | ||
| retryable := !strings.Contains(err.Error(), "not found") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Determining if an error is retryable by checking for the substring "not found" in the error message is brittle and not reliable. A better approach would be to use typed errors. The function that performs the cloning ( |
||
| manager.RecordFailure(&failures.Failure{ | ||
| URL: repo, | ||
| Error: err.Error(), | ||
| Retryable: retryable, | ||
| Attempts: attempts[repo], | ||
| }) | ||
|
|
||
| if onFailure == "stop" { | ||
| return fmt.Errorf("stopping on first failure: %w", err) | ||
| } else if onFailure == "prompt" { | ||
| fmt.Printf("Failed to collect %s. Would you like to (c)ontinue, (s)top, or (r)etry? ", repo) | ||
| var response string | ||
| fmt.Scanln(&response) | ||
|
Comment on lines
+58
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Note: This change requires importing the var response string
scanner := bufio.NewScanner(os.Stdin)
if scanner.Scan() {
response = scanner.Text()
} |
||
| switch response { | ||
| case "s": | ||
| return fmt.Errorf("stopping on user prompt") | ||
| case "r": | ||
| i-- // Retry the same repo | ||
| continue | ||
| default: | ||
| // Continue | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }, | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,105 @@ | ||||||||||||||
| package cmd | ||||||||||||||
|
|
||||||||||||||
| import ( | ||||||||||||||
| "encoding/json" | ||||||||||||||
| "fmt" | ||||||||||||||
| "os" | ||||||||||||||
| "path/filepath" | ||||||||||||||
| "time" | ||||||||||||||
|
|
||||||||||||||
| "github.com/Snider/Borg/pkg/failures" | ||||||||||||||
| "github.com/spf13/cobra" | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| var failuresCmd = &cobra.Command{ | ||||||||||||||
| Use: "failures", | ||||||||||||||
| Short: "Manage failures from collection runs", | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| var failuresShowCmd = &cobra.Command{ | ||||||||||||||
| Use: "show [run-directory]", | ||||||||||||||
| Short: "Show a summary of a failure report", | ||||||||||||||
| Args: cobra.ExactArgs(1), | ||||||||||||||
| RunE: func(cmd *cobra.Command, args []string) error { | ||||||||||||||
| reportPath := filepath.Join(args[0], "failures.json") | ||||||||||||||
| data, err := os.ReadFile(reportPath) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return fmt.Errorf("failed to read failure report: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| var report failures.FailureReport | ||||||||||||||
| if err := json.Unmarshal(data, &report); err != nil { | ||||||||||||||
| return fmt.Errorf("failed to parse failure report: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fmt.Printf("Collection: %s\n", report.Collection) | ||||||||||||||
| fmt.Printf("Started: %s\n", report.Started.Format(time.RFC3339)) | ||||||||||||||
| fmt.Printf("Completed: %s\n", report.Completed.Format(time.RFC3339)) | ||||||||||||||
| fmt.Printf("Total: %d\n", report.Stats.Total) | ||||||||||||||
| fmt.Printf("Success: %d\n", report.Stats.Success) | ||||||||||||||
| fmt.Printf("Failed: %d\n", report.Stats.Failed) | ||||||||||||||
|
|
||||||||||||||
| if len(report.Failures) > 0 { | ||||||||||||||
| fmt.Println("\nFailures:") | ||||||||||||||
| for _, f := range report.Failures { | ||||||||||||||
| fmt.Printf(" - URL: %s\n", f.URL) | ||||||||||||||
| fmt.Printf(" Error: %s\n", f.Error) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return nil | ||||||||||||||
| }, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| var failuresClearCmd = &cobra.Command{ | ||||||||||||||
| Use: "clear", | ||||||||||||||
| Short: "Clear old failure reports", | ||||||||||||||
| RunE: func(cmd *cobra.Command, args []string) error { | ||||||||||||||
| olderThan, _ := cmd.Flags().GetString("older-than") | ||||||||||||||
| failuresDir, _ := cmd.Flags().GetString("failures-dir") | ||||||||||||||
| if failuresDir == "" { | ||||||||||||||
| failuresDir = ".borg-failures" | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+60
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default value for the failures directory,
Suggested change
|
||||||||||||||
|
|
||||||||||||||
| duration, err := time.ParseDuration(olderThan) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return fmt.Errorf("invalid duration for --older-than: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| cutoff := time.Now().Add(-duration) | ||||||||||||||
|
|
||||||||||||||
| entries, err := os.ReadDir(failuresDir) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return fmt.Errorf("failed to read failures directory: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| for _, entry := range entries { | ||||||||||||||
| if entry.IsDir() { | ||||||||||||||
| runTime, err := time.Parse("2006-01-02T15-04-05", entry.Name()) | ||||||||||||||
| if err != nil { | ||||||||||||||
| // Ignore directories that don't match the timestamp format | ||||||||||||||
| continue | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| if runTime.Before(cutoff) { | ||||||||||||||
| runPath := filepath.Join(failuresDir, entry.Name()) | ||||||||||||||
| fmt.Printf("Removing old failure directory: %s\n", runPath) | ||||||||||||||
| if err := os.RemoveAll(runPath); err != nil { | ||||||||||||||
| fmt.Fprintf(os.Stderr, "failed to remove %s: %v\n", runPath, err) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return nil | ||||||||||||||
| }, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func init() { | ||||||||||||||
| RootCmd.AddCommand(failuresCmd) | ||||||||||||||
| failuresCmd.AddCommand(failuresShowCmd) | ||||||||||||||
| failuresCmd.AddCommand(failuresClearCmd) | ||||||||||||||
|
|
||||||||||||||
| failuresClearCmd.Flags().String("older-than", "720h", "Clear failures older than this duration (e.g., 7d, 24h)") | ||||||||||||||
| failuresClearCmd.Flags().String("failures-dir", ".borg-failures", "The directory where failures are stored") | ||||||||||||||
| } | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency across the tool, the
collect-github-repocommand should also implement failure reporting, similar to how it's done forcollect-github-repos. Currently, if this command fails, no failure report is generated, which might be unexpected for users given the new--on-failureand--failures-dirflags on the parentcollectcommand.