diff --git a/api/units.go b/api/units.go index 67b4a9537..89e414fef 100644 --- a/api/units.go +++ b/api/units.go @@ -76,6 +76,7 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str } var su schema.Unit + newUnit := false dec := json.NewDecoder(req.Body) err := dec.Decode(&su) if err != nil { @@ -111,6 +112,21 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str ur.create(rw, su.Name, &su) } return + } else if eu.Name == su.Name && len(su.Options) > 0 { + // There is already a unit with the same name + // check the hashes if they do not match then we will + // create a new unit with the same name and later + // the job will be update to this new unit. + // if su.Options == 0 then probably we don't want to update + // the Unit options but only its target state. + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + b := schema.MapSchemaUnitOptionsToUnitFile(eu.Options) + newUnit = !unit.MatchUnitFiles(a, b) + } + + if newUnit { + ur.create(rw, su.Name, &su) + return } if len(su.DesiredState) == 0 { diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 69513e04f..e102d616e 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -103,6 +103,7 @@ var ( Full bool NoLegend bool NoBlock bool + Replace bool BlockAttempts int Fields string SSHPort int @@ -555,7 +556,7 @@ func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.Uni } if tmpl != nil { - warnOnDifferentLocalUnit(fileName, tmpl) + isLocalUnitDifferent(fileName, tmpl, true, false) uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options) log.Debugf("Template Unit(%s) found in registry", uni.Template) } else { @@ -632,10 +633,11 @@ func findUnits(args []string) (sus []schema.Unit, err error) { return filtered, nil } -func createUnit(name string, uf *unit.UnitFile) (*schema.Unit, error) { +func createUnit(name string, uf *unit.UnitFile, oldUnit *schema.Unit) (*schema.Unit, error) { if uf == nil { return nil, fmt.Errorf("nil unit provided") } + u := schema.Unit{ Name: name, Options: schema.MapUnitFileToSchemaUnitOptions(uf), @@ -663,6 +665,47 @@ func createUnit(name string, uf *unit.UnitFile) (*schema.Unit, error) { return &u, nil } +// checkUnitCreation checks if the unit should be created +// It takes a unit file path as a prameter. +// It returns 0 on success and if the unit should be created, 1 if the +// unit should not be created; and any error acountered +func checkUnitCreation(arg string, old *schema.Unit) (int, error) { + name := unitNameMangle(arg) + + // First, check if there already exists a Unit by the given name in the Registry + unit, err := cAPI.Unit(name) + if err != nil { + return 1, fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) + } + + // check if the unit is running + if unit == nil { + if sharedFlags.Replace { + log.Debugf("Unit(%s) was not found in Registry", name) + } + return 0, nil + } + + // if sharedFlags.Replace is not set then we warn + different, err := isLocalUnitDifferent(arg, unit, !sharedFlags.Replace, false) + + // if sharedFlags.Replace is set then we fail for errors + if sharedFlags.Replace { + if err != nil { + return 1, err + } else if different { + *old = *unit + return 0, nil + } else { + stdout("Found same Unit(%s) in Registry, nothing to do", unit.Name) + } + } + + log.Debugf("Found same Unit(%s) in Registry, no need to recreate it", name) + + return 1, nil +} + // lazyCreateUnits iterates over a set of unit names and, for each, attempts to // ensure that a unit by that name exists in the Registry, by checking a number // of conditions and acting on the first one that succeeds, in order of: @@ -677,17 +720,14 @@ func lazyCreateUnits(args []string) error { errchan := make(chan error) var wg sync.WaitGroup for _, arg := range args { + var oldUnit schema.Unit arg = maybeAppendDefaultUnitType(arg) name := unitNameMangle(arg) - // First, check if there already exists a Unit by the given name in the Registry - u, err := cAPI.Unit(name) + ret, err := checkUnitCreation(arg, &oldUnit) if err != nil { - return fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) - } - if u != nil { - log.Debugf("Found Unit(%s) in Registry, no need to recreate it", name) - warnOnDifferentLocalUnit(arg, u) + return err + } else if ret != 0 { continue } @@ -699,7 +739,7 @@ func lazyCreateUnits(args []string) error { return err } - _, err = createUnit(name, uf) + _, err = createUnit(name, uf, &oldUnit) if err != nil { return err } @@ -726,24 +766,52 @@ func lazyCreateUnits(args []string) error { return nil } -func warnOnDifferentLocalUnit(loc string, su *schema.Unit) { - suf := schema.MapSchemaUnitOptionsToUnitFile(su.Options) - if _, err := os.Stat(loc); !os.IsNotExist(err) { - luf, err := getUnitFromFile(loc) - if err == nil && luf.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, loc) - return +// matchLocalFileAndUnit compares a file with a Unit +// Returns true if the contents of the file matches the unit one, false +// otherwise; and any error ocountered +func matchLocalFileAndUnit(file string, su *schema.Unit) (bool, error) { + result := false + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + + _, err := os.Stat(file) + if err == nil { + b, err := getUnitFromFile(file) + if err == nil { + result = unit.MatchUnitFiles(a, b) } } - if uni := unit.NewUnitNameInfo(path.Base(loc)); uni != nil && uni.IsInstance() { - file := path.Join(path.Dir(loc), uni.Template) - if _, err := os.Stat(file); !os.IsNotExist(err) { - tmpl, err := getUnitFromFile(file) - if err == nil && tmpl.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, uni.Template) - } + + return result, err +} + +func isLocalUnitDifferent(file string, su *schema.Unit, warnIfDifferent bool, fatal bool) (bool, error) { + result, err := matchLocalFileAndUnit(file, su) + if err == nil { + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, file) } + return !result, nil + } else if fatal { + return false, err + } + + info := unit.NewUnitNameInfo(path.Base(file)) + if info == nil { + return false, fmt.Errorf("error extracting information from unit name %s", file) + } else if !info.IsInstance() { + return false, fmt.Errorf("error unit name %s does not seem to be a template unit", file) } + + templFile := path.Join(path.Dir(file), info.Template) + result, err = matchLocalFileAndUnit(templFile, su) + if err == nil { + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, info.Template) + } + return !result, nil + } + + return false, err } func lazyLoadUnits(args []string) ([]*schema.Unit, error) { diff --git a/fleetctl/load.go b/fleetctl/load.go index 0f7b0923c..1114f946f 100644 --- a/fleetctl/load.go +++ b/fleetctl/load.go @@ -43,6 +43,7 @@ func init() { cmdLoadUnits.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are loaded, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdLoadUnits.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been loaded before exiting. Always the case for global units.") + cmdLoadUnits.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old unit and load the new one.") } func runLoadUnits(args []string) (exit int) { diff --git a/fleetctl/start.go b/fleetctl/start.go index 2ded29539..a4236f116 100644 --- a/fleetctl/start.go +++ b/fleetctl/start.go @@ -51,6 +51,7 @@ func init() { cmdStartUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the units are launched, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdStartUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the units have launched before exiting. Always the case for global units.") + cmdStartUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old unit and start the new one.") } func runStartUnit(args []string) (exit int) { diff --git a/fleetctl/submit.go b/fleetctl/submit.go index cbfe03f93..8a1cde01b 100644 --- a/fleetctl/submit.go +++ b/fleetctl/submit.go @@ -33,6 +33,7 @@ Submit a directory of units with glob matching: func init() { cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") + cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old unit with the new one.") } func runSubmitUnits(args []string) (exit int) { diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 78a150fd7..e6019bcfb 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -15,12 +15,22 @@ package functional import ( + "fmt" + "io/ioutil" + "os" "strings" "testing" "github.com/coreos/fleet/functional/platform" ) +const ( + tmpHelloService = "/tmp/hello.service" + fxtHelloService = "fixtures/units/hello.service" + tmpFixtures = "/tmp/fixtures" + numUnitsReplace = 9 +) + // TestUnitRunnable is the simplest test possible, deplying a single-node // cluster and ensuring a unit can enter an 'active' state func TestUnitRunnable(t *testing.T) { @@ -167,6 +177,42 @@ func TestUnitRestart(t *testing.T) { } +// TestUnitSubmitReplace() tests whether a command "fleetctl submit --replace +// hello.service" works or not. +func TestUnitSubmitReplace(t *testing.T) { + if err := replaceUnitCommon("submit"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("submit", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitLoadReplace() tests whether a command "fleetctl load --replace +// hello.service" works or not. +func TestUnitLoadReplace(t *testing.T) { + if err := replaceUnitCommon("load"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("load", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitStartReplace() tests whether a command "fleetctl start --replace +// hello.service" works or not. +func TestUnitStartReplace(t *testing.T) { + if err := replaceUnitCommon("start"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("start", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + func TestUnitSSHActions(t *testing.T) { cluster, err := platform.NewNspawnCluster("smoke") if err != nil { @@ -224,3 +270,240 @@ func TestUnitSSHActions(t *testing.T) { t.Errorf("Could not find expected string in journal output:\n%s", stdout) } } + +// replaceUnitCommon() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitCommon(cmd string) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, fxtHelloService); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + stdout, _, err := cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + return fmt.Errorf("Did not find 1 unit in cluster: \n%s", stdout) + } + + // replace the unit and assert it shows up + err = genNewFleetService(tmpHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + if _, _, err := cluster.Fleetctl(m, cmd, "--replace", tmpHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + return fmt.Errorf("Did not find 1 unit in cluster: \n%s", stdout) + } + os.Remove(tmpHelloService) + + if err := destroyUnitRetrying(cluster, m, fxtHelloService); err != nil { + return fmt.Errorf("Cannot destroy unit %v", fxtHelloService) + } + + return nil +} + +// replaceUnitMultiple() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitMultiple(cmd string, n int) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + var listUnitCmd string + if cmd == "submit" { + listUnitCmd = "list-unit-files" + } else { + listUnitCmd = "list-units" + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + if _, err := os.Stat(tmpFixtures); os.IsNotExist(err) { + os.Mkdir(tmpFixtures, 0755) + } + + var stdout string + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + tmpHelloFixture := fmt.Sprintf("/tmp/fixtures/hello%d.service", i) + + // generate a new service derived by fixtures, and store it under /tmp + err = copyFile(tmpHelloFixture, fxtHelloService) + if err != nil { + return fmt.Errorf("Failed to copy a temp fleet service: %v", err) + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, tmpHelloFixture); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + + stdout, _, err = cluster.Fleetctl(m, listUnitCmd, "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run %s: %v", listUnitCmd, err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != i { + return fmt.Errorf("Did not find %d units in cluster: \n%s", i, stdout) + } + + // generate a new service derived by fixtures, and store it under /tmp + err = genNewFleetService(curHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + } + + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + + // replace the unit and assert it shows up + if _, _, err = cluster.Fleetctl(m, cmd, "--replace", curHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, listUnitCmd, "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run %s: %v", listUnitCmd, err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != n { + return fmt.Errorf("Did not find %d units in cluster: \n%s", n, stdout) + } + } + + // clean up temp services under /tmp + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + os.Remove(curHelloService) + + if err := destroyUnitRetrying(cluster, m, fxtHelloService); err != nil { + return fmt.Errorf("Cannot destroy unit %v", fxtHelloService) + } + } + os.Remove(tmpFixtures) + + return nil +} + +// copyFile() +func copyFile(newFile, oldFile string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + err = ioutil.WriteFile(newFile, []byte(input), 0644) + if err != nil { + return err + } + return nil +} + +// genNewFleetService() is a helper for generating a temporary fleet service +// that reads from oldFile, replaces oldVal with newVal, and stores the result +// to newFile. +func genNewFleetService(newFile, oldFile, newVal, oldVal string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + lines := strings.Split(string(input), "\n") + + for i, line := range lines { + if strings.Contains(line, oldVal) { + lines[i] = strings.Replace(line, oldVal, newVal, len(oldVal)) + } + } + output := strings.Join(lines, "\n") + err = ioutil.WriteFile(newFile, []byte(output), 0644) + if err != nil { + return err + } + return nil +} + +// destroyUnitRetrying() destroys the unit and ensure it disappears from the +// unit list. It could take a little time until the unit gets destroyed. +func destroyUnitRetrying(cluster platform.Cluster, m platform.Member, serviceFile string) error { + maxAttempts := 3 + found := false + var stdout string + var err error + for { + if _, _, err := cluster.Fleetctl(m, "destroy", serviceFile); err != nil { + return fmt.Errorf("Failed to destroy unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + if strings.TrimSpace(stdout) == "" || maxAttempts == 0 { + found = true + break + } + maxAttempts-- + } + + if !found { + return fmt.Errorf("Did not find 0 units in cluster: \n%s", stdout) + } + + return nil +} diff --git a/registry/job.go b/registry/job.go index 8eeda2280..d406947ea 100644 --- a/registry/job.go +++ b/registry/job.go @@ -335,7 +335,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { } opts := &etcd.SetOptions{ - PrevExist: etcd.PrevNoExist, + PrevExist: etcd.PrevIgnore, } key := r.prefixed(jobPrefix, u.Name, "object") _, err = r.kAPI.Set(r.ctx(), key, val, opts) diff --git a/unit/unit.go b/unit/unit.go index 2da58a6bb..6c54c0b91 100644 --- a/unit/unit.go +++ b/unit/unit.go @@ -136,6 +136,16 @@ func (u *UnitFile) Hash() Hash { return Hash(sha1.Sum(u.Bytes())) } +// MatchUnitFiles compares two unitFiles +// Returns true if the units match, false otherwise. +func MatchUnitFiles(a *UnitFile, b *UnitFile) bool { + if a.Hash() == b.Hash() { + return true + } + + return false +} + // RecognizedUnitType determines whether or not the given unit name represents // a recognized unit type. func RecognizedUnitType(name string) bool {