@@ -87,13 +87,24 @@ func NewClient(k kubernetes.Interface, root *cobra.Command, config *viper.Viper,
8787 return c , nil
8888}
8989
90- // Create creates (and queues) a Kubernetes job with the given name that executes
90+ // RunJob creates (and queues) a Kubernetes job with the given name that executes
9191// the given command with the given command-line arguments on the given container
9292// image. resources specifies the minimum required resources for execution.
9393func (c * Client ) RunJob (ctx context.Context , job * cloudrpc.JobSpec ) (* cloudrpc.JobStatus , error ) {
9494 if job .Version != inmap .Version {
9595 return nil , fmt .Errorf ("incorrect InMAP version: %s != %s" , job .Version , inmap .Version )
9696 }
97+
98+ status , err := c .Status (ctx , & cloudrpc.JobName {Name : job .Name , Version : job .Version })
99+ if err != nil {
100+ return nil , err
101+ }
102+ if status .Status != cloudrpc .Status_Failed { //TODO: status.Status != cloudrpc.Status_Missing && {
103+ // Only create the job if it is missing or failed.
104+ c .Delete (ctx , & cloudrpc.JobName {Name : job .Name , Version : job .Version })
105+ return status , nil
106+ }
107+
97108 if err := c .stageInputs (ctx , job ); err != nil {
98109 return nil , err
99110 }
@@ -107,20 +118,11 @@ func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.J
107118 k8sJob := createJob (userJobName (user , job .Name ), job .Cmd , job .Args , c .Image , core.ResourceList {
108119 core .ResourceMemory : resource .MustParse (fmt .Sprintf ("%dGi" , job .MemoryGB )),
109120 })
110- k8sJobResult , err : = c .jobControl .Create (k8sJob )
121+ _ , err = c .jobControl .Create (k8sJob )
111122 if err != nil {
112123 return nil , err
113124 }
114- return c .jobStatus (k8sJobResult )
115- }
116-
117- // Status returns the status of the given job.
118- func (c * Client ) Status (ctx context.Context , job * cloudrpc.JobName ) (* cloudrpc.JobStatus , error ) {
119- k8sJob , err := c .getk8sJob (ctx , job )
120- if err != nil {
121- return nil , err
122- }
123- return c .jobStatus (k8sJob )
125+ return c .Status (ctx , & cloudrpc.JobName {Name : job .Name , Version : job .Version })
124126}
125127
126128// Delete deletes the given job.
@@ -167,10 +169,36 @@ func userJobName(user, name string) string {
167169 return strings .Replace (user , "_" , "-" , - 1 ) + "-" + strings .Replace (name , "_" , "-" , - 1 )
168170}
169171
170- func (c * Client ) jobStatus (j * batch.Job ) (* cloudrpc.JobStatus , error ) {
171- return & cloudrpc.JobStatus {
172- Status : j .Status .String (),
173- }, nil
172+ // Status returns the status of the given job.
173+ func (c * Client ) Status (ctx context.Context , job * cloudrpc.JobName ) (* cloudrpc.JobStatus , error ) {
174+ s := new (cloudrpc.JobStatus )
175+ /*k8sJob, err := c.getk8sJob(ctx, job)
176+ if err != nil {
177+ return &cloudrpc.JobStatus{
178+ Status: cloudrpc.Status_Missing,
179+ Message: err.Error(),
180+ }, nil
181+ }
182+ for _, c := range k8sJob.Status.Conditions {
183+ if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
184+ s.Status = cloudrpc.Status_Complete
185+ s.StartTime = k8sJob.Status.StartTime.Time.Unix()
186+ s.CompletionTime = k8sJob.Status.CompletionTime.Time.Unix()
187+ } else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
188+ s.Status = cloudrpc.Status_Failed
189+ }
190+ }
191+ if k8sJob.Status.Active > 0 {
192+ s.Status = cloudrpc.Status_Running
193+ s.StartTime = k8sJob.Status.StartTime.Time.Unix()
194+ }*/
195+ //TODO: err = c.checkOutputs(ctx, name, k8sJob.Spec.Template.Spec.Containers[0].Command)
196+ err := c .checkOutputs (ctx , job .Name , []string {"inmap" , "run" , "steady" })
197+ if err != nil {
198+ s .Status = cloudrpc .Status_Failed
199+ s .Message = fmt .Sprintf ("job completed but the following error occurred when checking outputs: %s" , err )
200+ }
201+ return s , nil
174202}
175203
176204// createJob creates a Kubernetes job specification with the given name that executes the
0 commit comments