diff --git a/api/units.go b/api/units.go index 67b4a9537..47ebfa8c4 100644 --- a/api/units.go +++ b/api/units.go @@ -76,6 +76,7 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str } var su schema.Unit + newUnit := false dec := json.NewDecoder(req.Body) err := dec.Decode(&su) if err != nil { @@ -105,11 +106,28 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str if len(su.Options) == 0 { err := errors.New("unit does not exist and options field empty") sendError(rw, http.StatusConflict, err) + return } else if err := ValidateOptions(su.Options); err != nil { sendError(rw, http.StatusBadRequest, err) + return } else { - ur.create(rw, su.Name, &su) + newUnit = true } + } else if eu.Name == su.Name && len(su.Options) > 0 { + // There is already a unit with the same name + // check the hashes if they do not match then we will + // create a new unit with the same name and later + // the job will be updated to this new unit. + // if su.Options == 0 then probably we don't want to update + // the Unit Options but only its target job state, in + // this case just ignore. + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + b := schema.MapSchemaUnitOptionsToUnitFile(eu.Options) + newUnit = !unit.MatchUnitFiles(a, b) + } + + if newUnit { + ur.create(rw, su.Name, &su) return } diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 69513e04f..7cca74c6a 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -75,9 +75,10 @@ var ( // flags used by all commands globalFlags = struct { - Debug bool - Version bool - Help bool + Debug bool + Version bool + Help bool + currentCommand string ClientDriver string ExperimentalAPI bool @@ -103,6 +104,7 @@ var ( Full bool NoLegend bool NoBlock bool + Replace bool BlockAttempts int Fields string SSHPort int @@ -287,6 +289,10 @@ func main() { } } + // We use this to know in which context are we: + // submit, load or start + globalFlags.currentCommand = cmd.Name + os.Exit(cmd.Run(cmd.Flags.Args())) } @@ -555,7 +561,7 @@ func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.Uni } if tmpl != nil { - warnOnDifferentLocalUnit(fileName, tmpl) + isLocalUnitDifferent(fileName, tmpl, true, false) uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options) log.Debugf("Template Unit(%s) found in registry", uni.Template) } else { @@ -663,6 +669,81 @@ func createUnit(name string, uf *unit.UnitFile) (*schema.Unit, error) { return &u, nil } +// checkReplaceUnitState checks if the unit should be replaced +// It takes a Unit object as a parameter +// It returns 0 on success and if the unit should be replaced, 1 if the +// unit should not be replaced; and any error acountered +func checkReplaceUnitState(unit *schema.Unit) (int, error) { + allowedReplace := map[string][]job.JobState{ + "submit": []job.JobState{ + job.JobStateInactive, + }, + "load": []job.JobState{ + job.JobStateInactive, + job.JobStateLoaded, + }, + "start": []job.JobState{ + job.JobStateInactive, + job.JobStateLoaded, + job.JobStateLaunched, + }, + } + + if allowedJobs, ok := allowedReplace[globalFlags.currentCommand]; ok { + for _, j := range allowedJobs { + if job.JobState(unit.DesiredState) == j { + return 0, nil + } + } + stderr("Warning: can not replace Unit(%s) in state '%s', use the appropriate command", unit.Name, unit.DesiredState) + } else { + return 1, fmt.Errorf("error 'replace' is not supported for this command") + } + + return 1, nil +} + +// checkUnitCreation checks if the unit should be created +// It takes a unit file path as a parameter. +// It returns 0 on success and if the unit should be created, 1 if the +// unit should not be created; and any error acountered +func checkUnitCreation(arg string) (int, error) { + name := unitNameMangle(arg) + + // First, check if there already exists a Unit by the given name in the Registry + unit, err := cAPI.Unit(name) + if err != nil { + return 1, fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) + } + + // check if the unit is running + if unit == nil { + if sharedFlags.Replace { + log.Debugf("Unit(%s) was not found in Registry", name) + } + // Create a new unit + return 0, nil + } + + // if sharedFlags.Replace is not set then we warn + different, err := isLocalUnitDifferent(arg, unit, !sharedFlags.Replace, false) + + // if sharedFlags.Replace is set then we fail for errors + if sharedFlags.Replace { + if err != nil { + return 1, err + } else if different { + return checkReplaceUnitState(unit) + } else { + stdout("Found same Unit(%s) in Registry, nothing to do", unit.Name) + } + } else if different == false { + log.Debugf("Found same Unit(%s) in Registry, no need to recreate it", name) + } + + return 1, nil +} + // lazyCreateUnits iterates over a set of unit names and, for each, attempts to // ensure that a unit by that name exists in the Registry, by checking a number // of conditions and acting on the first one that succeeds, in order of: @@ -680,14 +761,10 @@ func lazyCreateUnits(args []string) error { arg = maybeAppendDefaultUnitType(arg) name := unitNameMangle(arg) - // First, check if there already exists a Unit by the given name in the Registry - u, err := cAPI.Unit(name) + ret, err := checkUnitCreation(arg) if err != nil { - return fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) - } - if u != nil { - log.Debugf("Found Unit(%s) in Registry, no need to recreate it", name) - warnOnDifferentLocalUnit(arg, u) + return err + } else if ret != 0 { continue } @@ -726,24 +803,52 @@ func lazyCreateUnits(args []string) error { return nil } -func warnOnDifferentLocalUnit(loc string, su *schema.Unit) { - suf := schema.MapSchemaUnitOptionsToUnitFile(su.Options) - if _, err := os.Stat(loc); !os.IsNotExist(err) { - luf, err := getUnitFromFile(loc) - if err == nil && luf.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, loc) - return +// matchLocalFileAndUnit compares a file with a Unit +// Returns true if the contents of the file matches the unit one, false +// otherwise; and any error ocountered +func matchLocalFileAndUnit(file string, su *schema.Unit) (bool, error) { + result := false + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + + _, err := os.Stat(file) + if err == nil { + b, err := getUnitFromFile(file) + if err == nil { + result = unit.MatchUnitFiles(a, b) } } - if uni := unit.NewUnitNameInfo(path.Base(loc)); uni != nil && uni.IsInstance() { - file := path.Join(path.Dir(loc), uni.Template) - if _, err := os.Stat(file); !os.IsNotExist(err) { - tmpl, err := getUnitFromFile(file) - if err == nil && tmpl.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, uni.Template) - } + + return result, err +} + +func isLocalUnitDifferent(file string, su *schema.Unit, warnIfDifferent bool, fatal bool) (bool, error) { + result, err := matchLocalFileAndUnit(file, su) + if err == nil { + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, file) + } + return !result, nil + } else if fatal { + return false, err + } + + info := unit.NewUnitNameInfo(path.Base(file)) + if info == nil { + return false, fmt.Errorf("error extracting information from unit name %s", file) + } else if !info.IsInstance() { + return false, fmt.Errorf("error Unit %s does not seem to be a template unit", file) + } + + templFile := path.Join(path.Dir(file), info.Template) + result, err = matchLocalFileAndUnit(templFile, su) + if err == nil { + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, info.Template) } + return !result, nil } + + return false, err } func lazyLoadUnits(args []string) ([]*schema.Unit, error) { diff --git a/fleetctl/load.go b/fleetctl/load.go index 9b1127dab..a0f65edda 100644 --- a/fleetctl/load.go +++ b/fleetctl/load.go @@ -43,6 +43,7 @@ func init() { cmdLoadUnits.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are loaded, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdLoadUnits.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been loaded before exiting. Always the case for global units.") + cmdLoadUnits.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old loaded unit with a new one.") } func runLoadUnits(args []string) (exit int) { diff --git a/fleetctl/start.go b/fleetctl/start.go index 99e03c2d1..9af3318ba 100644 --- a/fleetctl/start.go +++ b/fleetctl/start.go @@ -51,6 +51,7 @@ func init() { cmdStartUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the units are launched, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdStartUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the units have launched before exiting. Always the case for global units.") + cmdStartUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old started unit with a new one.") } func runStartUnit(args []string) (exit int) { diff --git a/fleetctl/submit.go b/fleetctl/submit.go index 1297cff50..6c38a1352 100644 --- a/fleetctl/submit.go +++ b/fleetctl/submit.go @@ -33,6 +33,7 @@ Submit a directory of units with glob matching: func init() { cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") + cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old submitted unit with a new one.") } func runSubmitUnits(args []string) (exit int) { diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 78a150fd7..2e2f7806b 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -15,12 +15,22 @@ package functional import ( + "fmt" + "io/ioutil" + "os" "strings" "testing" "github.com/coreos/fleet/functional/platform" ) +const ( + tmpHelloService = "/tmp/hello.service" + fxtHelloService = "fixtures/units/hello.service" + tmpFixtures = "/tmp/fixtures" + numUnitsReplace = 9 +) + // TestUnitRunnable is the simplest test possible, deplying a single-node // cluster and ensuring a unit can enter an 'active' state func TestUnitRunnable(t *testing.T) { @@ -167,6 +177,42 @@ func TestUnitRestart(t *testing.T) { } +// TestUnitSubmitReplace() tests whether a command "fleetctl submit --replace +// hello.service" works or not. +func TestUnitSubmitReplace(t *testing.T) { + if err := replaceUnitCommon("submit"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("submit", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitLoadReplace() tests whether a command "fleetctl load --replace +// hello.service" works or not. +func TestUnitLoadReplace(t *testing.T) { + if err := replaceUnitCommon("load"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("load", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitStartReplace() tests whether a command "fleetctl start --replace +// hello.service" works or not. +func TestUnitStartReplace(t *testing.T) { + if err := replaceUnitCommon("start"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("start", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + func TestUnitSSHActions(t *testing.T) { cluster, err := platform.NewNspawnCluster("smoke") if err != nil { @@ -224,3 +270,274 @@ func TestUnitSSHActions(t *testing.T) { t.Errorf("Could not find expected string in journal output:\n%s", stdout) } } + +// replaceUnitCommon() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitCommon(cmd string) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, fxtHelloService); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + stdout, _, err := cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + return fmt.Errorf("Did not find 1 unit in cluster: \n%s", stdout) + } + if err := waitForActiveUnitsReplaceCmds(cluster, m, cmd, 1); err != nil { + return err + } + + // replace the unit and assert it shows up + err = genNewFleetService(tmpHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + if _, _, err := cluster.Fleetctl(m, cmd, "--replace", tmpHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + return fmt.Errorf("Did not find 1 unit in cluster: \n%s", stdout) + } + if err := waitForActiveUnitsReplaceCmds(cluster, m, cmd, 1); err != nil { + return err + } + os.Remove(tmpHelloService) + + if err := destroyUnitRetrying(cluster, m, fxtHelloService); err != nil { + return fmt.Errorf("Cannot destroy unit %v", fxtHelloService) + } + + return nil +} + +// replaceUnitMultiple() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitMultiple(cmd string, n int) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + if _, err := os.Stat(tmpFixtures); os.IsNotExist(err) { + os.Mkdir(tmpFixtures, 0755) + } + + var stdout string + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + tmpHelloFixture := fmt.Sprintf("/tmp/fixtures/hello%d.service", i) + + // generate a new service derived by fixtures, and store it under /tmp + err = copyFile(tmpHelloFixture, fxtHelloService) + if err != nil { + return fmt.Errorf("Failed to copy a temp fleet service: %v", err) + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, tmpHelloFixture); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + + stdout, _, err = cluster.Fleetctl(m, "list-unit-files", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run %s: %v", "list-unit-files", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != i { + return fmt.Errorf("Did not find %d units in cluster: \n%s", i, stdout) + } + if err := waitForActiveUnitsReplaceCmds(cluster, m, cmd, i); err != nil { + return err + } + + // generate a new service derived by fixtures, and store it under /tmp + err = genNewFleetService(curHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + } + + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + + // replace the unit and assert it shows up + if _, _, err = cluster.Fleetctl(m, cmd, "--replace", curHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, "list-unit-files", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run %s: %v", "list-unit-files", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != n { + return fmt.Errorf("Did not find %d units in cluster: \n%s", n, stdout) + } + if err := waitForActiveUnitsReplaceCmds(cluster, m, cmd, i); err != nil { + return err + } + } + + // clean up temp services under /tmp + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + os.Remove(curHelloService) + + if err := destroyUnitRetrying(cluster, m, fxtHelloService); err != nil { + return fmt.Errorf("Cannot destroy unit %v", fxtHelloService) + } + } + os.Remove(tmpFixtures) + + return nil +} + +// copyFile() +func copyFile(newFile, oldFile string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + err = ioutil.WriteFile(newFile, []byte(input), 0644) + if err != nil { + return err + } + return nil +} + +// genNewFleetService() is a helper for generating a temporary fleet service +// that reads from oldFile, replaces oldVal with newVal, and stores the result +// to newFile. +func genNewFleetService(newFile, oldFile, newVal, oldVal string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + lines := strings.Split(string(input), "\n") + + for i, line := range lines { + if strings.Contains(line, oldVal) { + lines[i] = strings.Replace(line, oldVal, newVal, len(oldVal)) + } + } + output := strings.Join(lines, "\n") + err = ioutil.WriteFile(newFile, []byte(output), 0644) + if err != nil { + return err + } + return nil +} + +// waitForActiveUnitsReplaceCmds() is a wrapper for waiting for N active units. +// The expected number of active units are given as a parameter "count". +// If cmd is "start", it expects that "count" active units are active. +// Otherwise, for "load" or "submit", it expects no active unit. +func waitForActiveUnitsReplaceCmds(cluster platform.Cluster, m platform.Member, cmd string, count int) error { + if cmd == "start" { + units, err := cluster.WaitForNActiveUnits(m, count) + if err != nil { + fmt.Errorf("%v", err) + } + _, found := units["hello.service"] + if len(units) != count || !found { + fmt.Errorf("Expected hello.service to be sole active unit, got %v", units) + } + } else { + // cmd is "load" or "submit", then there's no active unit + units, err := cluster.WaitForNActiveUnits(m, 0) + if err != nil { + fmt.Errorf("%v", err) + } + _, found := units["hello.service"] + if len(units) != 0 || !found { + fmt.Errorf("Expected hello.service to be sole active unit, got %v", units) + } + } + + return nil +} + +// destroyUnitRetrying() destroys the unit and ensure it disappears from the +// unit list. It could take a little time until the unit gets destroyed. +func destroyUnitRetrying(cluster platform.Cluster, m platform.Member, serviceFile string) error { + maxAttempts := 3 + found := false + var stdout string + var err error + for { + if _, _, err := cluster.Fleetctl(m, "destroy", serviceFile); err != nil { + return fmt.Errorf("Failed to destroy unit: %v", err) + } + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + return fmt.Errorf("Failed to run list-units: %v", err) + } + if strings.TrimSpace(stdout) == "" || maxAttempts == 0 { + found = true + break + } + maxAttempts-- + } + + if !found { + return fmt.Errorf("Did not find 0 units in cluster: \n%s", stdout) + } + + return nil +} diff --git a/registry/job.go b/registry/job.go index 8eeda2280..152962df6 100644 --- a/registry/job.go +++ b/registry/job.go @@ -335,14 +335,13 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { } opts := &etcd.SetOptions{ - PrevExist: etcd.PrevNoExist, + // Since we support replacing Unit state just ignore + // previous keys. + PrevExist: etcd.PrevIgnore, } key := r.prefixed(jobPrefix, u.Name, "object") _, err = r.kAPI.Set(r.ctx(), key, val, opts) if err != nil { - if isEtcdError(err, etcd.ErrorCodeNodeExist) { - err = errors.New("job already exists") - } return } diff --git a/unit/unit.go b/unit/unit.go index 2da58a6bb..6c54c0b91 100644 --- a/unit/unit.go +++ b/unit/unit.go @@ -136,6 +136,16 @@ func (u *UnitFile) Hash() Hash { return Hash(sha1.Sum(u.Bytes())) } +// MatchUnitFiles compares two unitFiles +// Returns true if the units match, false otherwise. +func MatchUnitFiles(a *UnitFile, b *UnitFile) bool { + if a.Hash() == b.Hash() { + return true + } + + return false +} + // RecognizedUnitType determines whether or not the given unit name represents // a recognized unit type. func RecognizedUnitType(name string) bool {