Skip to content

Commit 4417909

Browse files
authored
Merge pull request #21 from sapcc/delete-from-target
support deletion of target-side objects without source-side counterpart
2 parents 1df47ea + 04ff6e2 commit 4417909

File tree

11 files changed

+306
-15
lines changed

11 files changed

+306
-15
lines changed

CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
# v2.4.0 (TBD)
1+
# v2.4 (TBD)
22

33
New features:
4+
- swift-http-import can now clean up objects on the target side that have been deleted on the source side. To enable
5+
this behavior, set the new `jobs[].cleanup.strategy` configuration option to `delete`. Or set it to `report` to report
6+
such objects without deleting them.
47
- Initial support for Swift symlinks has been added. When a Swift source contains a object that is a symlink to another
58
object, the object is also uploaded as a symlink on the target side, thus avoiding duplicate transfers of identical
69
files. In this version, only those symlinks are considered that point to objects which are transferred in the same
@@ -9,7 +12,7 @@ New features:
912
Changes:
1013
- Switch the Swift backend from [ncw/swift](https://github.com/ncw/swift) to
1114
[Schwift](https://github.com/majewsky/schwift). This is important to facilitate some of the new features above.
12-
- When deleting a file on the target side (usually after an upload error), do not log an error if the DELETE request
15+
- When deleting a file on the target side (after an upload error), do not log an error if the DELETE request
1316
returns 404 (Not Found).
1417

1518
Bugfixes:

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [Transfer behavior: Segmenting on the target side](#transfer-behavior-segmenting-on-the-target-side)
1111
* [Transfer behavior: Expiring objects](#transfer-behavior-expiring-objects)
1212
* [Transfer behavior: Symlinks](#transfer-behavior-symlinks)
13+
* [Transfer behavior: Delete objects on the target side](#transfer-behavior-delete-objects-on-the-target-side)
1314
* [Performance](#performance)
1415
* [Log output](#log-output)
1516
* [StatsD metrics](#statsd-metrics)
@@ -324,6 +325,30 @@ However, the link target must be transferred **in the same job**. (This restrict
324325
Otherwise, the symlink will be transferred as a regular object, possibly resulting in duplication of file contents on
325326
the target side.
326327

328+
### Transfer behavior: Delete objects on the target side
329+
330+
By default, swift-http-import will only copy files from the source side to the target side. To enable the deletion of
331+
objects that exist on the target side, but not on the source side, set the `jobs[].cleanup.strategy` configuration
332+
option to `delete`.
333+
[(Link to full example config file)](./examples/transfer-delete-on-target.yaml)
334+
335+
```yaml
336+
jobs:
337+
- from:
338+
url: http://de.archive.ubuntu.com/ubuntu/
339+
to:
340+
container: mirror
341+
object_prefix: ubuntu-repos
342+
cleanup:
343+
strategy: delete
344+
```
345+
346+
Another possible value for `jobs[].cleanup.strategy` is `report`, which will log objects that `delete` would clean
347+
up without actually touching them.
348+
349+
When combined with `jobs[].only` and/or `jobs[].expect`, cleanup will delete all files excluded by those filters, even
350+
if the same file exists on the source side. This is the same behavior as if `--delete-excluded` is given to rsync.
351+
327352
### Performance
328353

329354
By default, only a single worker thread will be transferring files. You can scale this up by including a `workers` section at the top level like so:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
swift:
2+
auth_url: https://my.keystone.local:5000/v3
3+
user_name: uploader
4+
user_domain_name: Default
5+
project_name: datastore
6+
project_domain_name: Default
7+
password: 20g82rzg235oughq
8+
9+
jobs:
10+
- from:
11+
url: http://de.archive.ubuntu.com/ubuntu/
12+
to:
13+
container: mirror
14+
object_prefix: ubuntu-repos
15+
cleanup:
16+
strategy: delete

main.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,24 +92,38 @@ func runPipeline(config *objects.Configuration, report chan<- actors.ReportEvent
9292

9393
//start the pipeline actors
9494
var wg sync.WaitGroup
95-
queue := make(chan objects.File, 10)
95+
var wgTransfer sync.WaitGroup
96+
queue1 := make(chan objects.File, 10) //will be closed by scraper when it's done
97+
queue2 := make(chan actors.FileInfoForCleaner, 10) //will be closed by us when all transferors are done
9698

9799
actors.Start(&actors.Scraper{
98100
Context: ctx,
99101
Jobs: config.Jobs,
100-
Output: queue,
102+
Output: queue1,
101103
Report: report,
102104
}, &wg)
103105

104106
for i := uint(0); i < config.WorkerCounts.Transfer; i++ {
105107
actors.Start(&actors.Transferor{
106108
Context: ctx,
107-
Input: queue,
109+
Input: queue1,
110+
Output: queue2,
108111
Report: report,
109-
}, &wg)
112+
}, &wg, &wgTransfer)
110113
}
111114

112-
//wait for pipeline actors to finish
115+
actors.Start(&actors.Cleaner{
116+
Context: ctx,
117+
Input: queue2,
118+
Report: report,
119+
}, &wg)
120+
121+
//wait for transfer phase to finish
122+
wgTransfer.Wait()
123+
//signal to cleaner to start its work
124+
close(queue2)
125+
//wait for remaining workers to finish
113126
wg.Wait()
127+
114128
// signal.Reset(os.Interrupt, syscall.SIGTERM)
115129
}

pkg/actors/actor.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,16 @@ type Actor interface {
2929
}
3030

3131
//Start runs the given Actor in its own goroutine.
32-
func Start(a Actor, wg *sync.WaitGroup) {
33-
wg.Add(1)
32+
func Start(a Actor, wgs ...*sync.WaitGroup) {
33+
for _, wg := range wgs {
34+
wg.Add(1)
35+
}
3436
go func() {
35-
defer wg.Done()
37+
defer func() {
38+
for _, wg := range wgs {
39+
wg.Done()
40+
}
41+
}()
3642
a.Run()
3743
}()
3844
}

pkg/actors/cleaner.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*******************************************************************************
2+
*
3+
* Copyright 2018 SAP SE
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You should have received a copy of the License along with this
8+
* program. If not, you may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*******************************************************************************/
19+
20+
package actors
21+
22+
import (
23+
"context"
24+
"sort"
25+
26+
"github.com/majewsky/schwift"
27+
"github.com/sapcc/swift-http-import/pkg/objects"
28+
"github.com/sapcc/swift-http-import/pkg/util"
29+
)
30+
31+
//FileInfoForCleaner contains information about a transferred file for the Cleaner actor.
32+
type FileInfoForCleaner struct {
33+
objects.File
34+
Failed bool
35+
}
36+
37+
//Cleaner is an actor that cleans up unknown objects on the target side (i.e.
38+
//those objects which do not exist on the source side).
39+
type Cleaner struct {
40+
Context context.Context
41+
Input <-chan FileInfoForCleaner
42+
Report chan<- ReportEvent
43+
}
44+
45+
//Run implements the Actor interface.
46+
func (c *Cleaner) Run() {
47+
isJobFailed := make(map[*objects.Job]bool)
48+
isFileTransferred := make(map[*objects.Job]map[string]bool) //string = object name incl. prefix (if any)
49+
50+
//collect information about transferred files from the transferors
51+
//(we don't need to check Context.Done in the loop; when the process is
52+
//interrupted, main() will close our Input and we will move on)
53+
for info := range c.Input {
54+
//ignore all files in jobs where no cleanup is configured
55+
job := info.File.Job
56+
if job.Cleanup.Strategy == objects.KeepUnknownFiles {
57+
continue
58+
}
59+
60+
if info.Failed {
61+
isJobFailed[job] = true
62+
}
63+
64+
m, exists := isFileTransferred[job]
65+
if !exists {
66+
m = make(map[string]bool)
67+
isFileTransferred[job] = m
68+
}
69+
m[info.File.TargetObject().Name()] = true
70+
}
71+
if c.Context.Err() != nil {
72+
util.Log(util.LogInfo, "skipping cleanup phase: interrupt was received")
73+
return
74+
}
75+
if len(isJobFailed) > 0 {
76+
util.Log(util.LogInfo,
77+
"skipping cleanup phase for %d job(s) because of failed file transfers",
78+
len(isJobFailed))
79+
}
80+
81+
//perform cleanup if it is safe to do so
82+
for job, transferred := range isFileTransferred {
83+
if c.Context.Err() != nil {
84+
//interrupt received
85+
return
86+
}
87+
if !isJobFailed[job] {
88+
c.performCleanup(job, transferred)
89+
}
90+
}
91+
}
92+
93+
func (c *Cleaner) performCleanup(job *objects.Job, isFileTransferred map[string]bool) {
94+
//collect objects to cleanup
95+
var objs []*schwift.Object
96+
for objectName := range job.Target.FileExists {
97+
if isFileTransferred[objectName] {
98+
continue
99+
}
100+
objs = append(objs, job.Target.Container.Object(objectName))
101+
}
102+
sort.Slice(objs, func(i, j int) bool {
103+
return objs[i].Name() < objs[j].Name()
104+
})
105+
106+
//perform cleanup according to selected strategy
107+
switch job.Cleanup.Strategy {
108+
case objects.ReportUnknownFiles:
109+
for _, obj := range objs {
110+
util.Log(util.LogInfo, "found unknown object on target side: %s", obj.FullName())
111+
}
112+
113+
case objects.DeleteUnknownFiles:
114+
numDeleted, _, err := job.Target.Container.Account().BulkDelete(objs, nil, nil)
115+
c.Report <- ReportEvent{IsCleanup: true, CleanedUpObjectCount: int64(numDeleted)}
116+
if err != nil {
117+
util.Log(util.LogError, "cleanup of %d objects on target side failed: %s", len(objs), err.Error())
118+
if berr, ok := err.(schwift.BulkError); ok {
119+
for _, oerr := range berr.ObjectErrors {
120+
util.Log(util.LogError, "DELETE "+oerr.Error())
121+
}
122+
}
123+
}
124+
}
125+
}

pkg/actors/report.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type ReportEvent struct {
3636

3737
IsFile bool
3838
FileTransferResult objects.TransferResult
39+
40+
IsCleanup bool
41+
CleanedUpObjectCount int64
3942
}
4043

4144
//Report is an actor that counts scraped directories and transferred files.
@@ -63,6 +66,7 @@ func (r *Report) Run() {
6366
filesFound int64
6467
filesFailed int64
6568
filesTransferred int64
69+
filesCleanedUp int64
6670
statter statsd.Statter
6771
)
6872

@@ -95,6 +99,8 @@ func (r *Report) Run() {
9599
case objects.TransferFailed:
96100
filesFailed++
97101
}
102+
case mark.IsCleanup:
103+
filesCleanedUp += mark.CleanedUpObjectCount
98104
}
99105
}
100106

@@ -109,6 +115,7 @@ func (r *Report) Run() {
109115
gauge("last_run.files_found", filesFound, 1.0)
110116
gauge("last_run.files_transfered", filesTransferred, 1.0)
111117
gauge("last_run.files_failed", filesFailed, 1.0)
118+
gauge("last_run.files_cleaned_up", filesCleanedUp, 1.0)
112119
if filesFailed > 0 || directoriesFailed > 0 {
113120
gauge("last_run.success", 0, 1.0)
114121
r.ExitCode = 1
@@ -125,6 +132,9 @@ func (r *Report) Run() {
125132
util.Log(util.LogInfo, "%d files found, %d transferred, %d failed",
126133
filesFound, filesTransferred, filesFailed,
127134
)
135+
if filesCleanedUp > 0 {
136+
util.Log(util.LogInfo, "%d old files cleaned up", filesCleanedUp)
137+
}
128138

129139
duration := time.Since(r.StartTime)
130140
gauge("last_run.duration_seconds", int64(duration.Seconds()), 1.0)

pkg/actors/transfer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
type Transferor struct {
3333
Context context.Context
3434
Input <-chan objects.File
35+
Output chan<- FileInfoForCleaner
3536
Report chan<- ReportEvent
3637
}
3738

@@ -57,6 +58,7 @@ LOOP:
5758
if result == objects.TransferFailed {
5859
filesToRetry = append(filesToRetry, file)
5960
} else {
61+
t.Output <- FileInfoForCleaner{File: file, Failed: false}
6062
t.Report <- ReportEvent{IsFile: true, FileTransferResult: result}
6163
}
6264
}
@@ -74,6 +76,7 @@ LOOP:
7476
if !aborted && t.Context.Err() == nil {
7577
result = file.PerformTransfer()
7678
}
79+
t.Output <- FileInfoForCleaner{File: file, Failed: result == objects.TransferFailed}
7780
t.Report <- ReportEvent{IsFile: true, FileTransferResult: result}
7881
}
7982

pkg/objects/config.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ type JobConfiguration struct {
9696
ImmutableFilePattern string `yaml:"immutable"`
9797
Segmenting *SegmentingConfiguration `yaml:"segmenting"`
9898
Expiration ExpirationConfiguration `yaml:"expiration"`
99+
Cleanup CleanupConfiguration `yaml:"cleanup"`
99100
}
100101

101102
//SegmentingConfiguration contains the "segmenting" section of a JobConfiguration.
@@ -114,6 +115,23 @@ type ExpirationConfiguration struct {
114115
DelaySeconds uint32 `yaml:"delay_seconds"`
115116
}
116117

118+
//CleanupStrategy is an enum of legal values for the jobs[].cleanup.strategy configuration option.
119+
type CleanupStrategy string
120+
121+
const (
122+
//KeepUnknownFiles is the default cleanup strategy.
123+
KeepUnknownFiles CleanupStrategy = ""
124+
//DeleteUnknownFiles is another strategy.
125+
DeleteUnknownFiles CleanupStrategy = "delete"
126+
//ReportUnknownFiles is another strategy.
127+
ReportUnknownFiles CleanupStrategy = "report"
128+
)
129+
130+
//CleanupConfiguration contains the "cleanup" section of a JobConfiguration.
131+
type CleanupConfiguration struct {
132+
Strategy CleanupStrategy `yaml:"strategy"`
133+
}
134+
117135
//SourceUnmarshaler provides a yaml.Unmarshaler implementation for the Source interface.
118136
type SourceUnmarshaler struct {
119137
src Source
@@ -154,6 +172,7 @@ type Job struct {
154172
Matcher Matcher
155173
Segmenting *SegmentingConfiguration
156174
Expiration ExpirationConfiguration
175+
Cleanup CleanupConfiguration
157176
}
158177

159178
//Compile validates the given JobConfiguration, then creates and prepares a Job from it.
@@ -195,11 +214,17 @@ func (cfg JobConfiguration) Compile(name string, swift SwiftLocation) (job *Job,
195214
cfg.Expiration.Enabled = *cfg.Expiration.EnabledIn
196215
}
197216

217+
ufs := cfg.Cleanup.Strategy
218+
if ufs != KeepUnknownFiles && ufs != DeleteUnknownFiles && ufs != ReportUnknownFiles {
219+
errors = append(errors, fmt.Errorf("invalid value for %s.cleanup.strategy: %q", name, ufs))
220+
}
221+
198222
job = &Job{
199223
Source: cfg.Source.src,
200224
Target: cfg.Target,
201225
Segmenting: cfg.Segmenting,
202226
Expiration: cfg.Expiration,
227+
Cleanup: cfg.Cleanup,
203228
}
204229

205230
//compile patterns into regexes

0 commit comments

Comments
 (0)