diff --git a/ipf/glue2/abstractservice.py b/ipf/glue2/abstractservice.py index d4ffd5c..d598b35 100644 --- a/ipf/glue2/abstractservice.py +++ b/ipf/glue2/abstractservice.py @@ -40,13 +40,16 @@ from .service import * from .endpoint import * from ipf.sysinfo import ResourceName +from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt + ####################################################################################################################### class AbstractService(Data): - def __init__(self, id): + def __init__(self, id, ipfinfo): Data.__init__(self,id) self.services = [] self.handles = [] + self.ipfinfo = ipfinfo def add(self, serv): self.services.append(serv) @@ -55,13 +58,14 @@ class AbstractServiceStep(Step): def __init__(self): Step.__init__(self) - self.requires = [ResourceName] + self.requires = [IPFInformation,ResourceName] self.produces = [AbstractService] self.services = [] def run(self): self.resource_name = self._getInput(ResourceName).resource_name - servlist = AbstractService(self.resource_name) + self.ipfinfo = [self._getInput(IPFInformation)] + servlist = AbstractService(self.resource_name,self.ipfinfo) service_paths = [] try: paths = os.environ["SERVICEPATH"] @@ -248,6 +252,7 @@ def get(self): def toJson(self): doc = {} + doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo) doc["StorageService"] = [] doc["ComputingService"] = [] doc["LoginService"] = [] diff --git a/ipf/glue2/accelerator_environment.py b/ipf/glue2/accelerator_environment.py index d23be40..a2661f1 100644 --- a/ipf/glue2/accelerator_environment.py +++ b/ipf/glue2/accelerator_environment.py @@ -62,6 +62,7 @@ def run(self): host_group.ID = "urn:glue2:AcceleratorEnvironment:%s.%s" % (host_group.Name,self.resource_name) host_group.ManagerID = "urn:glue2:ComputingManager:%s" % (self.resource_name) self.debug("host_group.id "+host_group.id) + self.debug("host_group.uas "+str(host_group.UsedAcceleratorSlots)) self._output(AcceleratorEnvironments(self.resource_name,host_groups)) @@ -111,6 +112,10 @@ def _groupHosts(self, hosts): host_group.TotalInstances += host.TotalInstances host_group.UsedInstances += host.UsedInstances host_group.UnavailableInstances += host.UnavailableInstances + #if host_group.UsedAcceleratorSlots is None: + # host_group.UsedAcceleratorSlots = 0 + #if host.UsedAcceleratorSlots is None: + # host.UsedAcceleratorSlots = 0 host_group.UsedAcceleratorSlots += host.UsedAcceleratorSlots if host_group.TotalAcceleratorSlots is None: host_group.TotalAcceleratorSlots = 0 @@ -139,6 +144,13 @@ def _goodHost(self, host): m = re.search("urn:glue2:ComputingShare:(\S+).%s" % self.resource_name,share) if self._includeQueue(m.group(1)): return True + # if the host is associated with a partition, check that it is a good one + if len(host.Partitions) == 0: + return True + partition_list = host.Partitions.split(',') + for share in partition_list: + if self._includeQueue(share): + return True return False ####################################################################################################################### @@ -456,37 +468,37 @@ def toJson(self): ####################################################################################################################### -class AcceleratorEnvironmentOgfJson(ResourceOgfJson): - data_cls = AcceleratorEnvironment - - def __init__(self, data): - ResourceOgfJson.__init__(self,data) - - def get(self): - return json.dumps(self.toJson(),sort_keys=True,indent=4) - - def toJson(self): - doc = ResourceOgfJson.toJson(self) - - doc["Platform"] = self.data.Platform - if self.data.PhysicalAccelerators is not None: - doc["PhysicalAccelerators"] = self.data.PhysicalAccelerators - if self.data.LogicalAccelerators is not None: - doc["LogicalAccelerators"] = self.data.LogicalAccelerators - if self.data.Vendor is not None: - doc["Vendor"] = self.data.Vendor - if self.data.Model is not None: - doc["Model"] = self.data.Model - if self.data.Version is not None: - doc["Version"] = self.data.Version - if self.data.ClockSpeed is not None: - doc["ClockSpeed"] = self.data.ClockSpeed - if self.data.Memory is not None: - doc["Memory"] = self.data.Memory - if self.data.ComputeCapability is not None: - doc["ComputeCapability"] = self.data.ComputeCapability - - return doc +#class AcceleratorEnvironmentOgfJson(ResourceOgfJson): +# data_cls = AcceleratorEnvironment +# +# def __init__(self, data): +# ResourceOgfJson.__init__(self,data) +# +# def get(self): +# return json.dumps(self.toJson(),sort_keys=True,indent=4) +# +# def toJson(self): +# doc = ResourceOgfJson.toJson(self) +# +# doc["Platform"] = self.data.Platform +# if self.data.PhysicalAccelerators is not None: +# doc["PhysicalAccelerators"] = self.data.PhysicalAccelerators +# if self.data.LogicalAccelerators is not None: +# doc["LogicalAccelerators"] = self.data.LogicalAccelerators +# if self.data.Vendor is not None: +# doc["Vendor"] = self.data.Vendor +# if self.data.Model is not None: +# doc["Model"] = self.data.Model +# if self.data.Version is not None: +# doc["Version"] = self.data.Version +# if self.data.ClockSpeed is not None: +# doc["ClockSpeed"] = self.data.ClockSpeed +# if self.data.Memory is not None: +# doc["Memory"] = self.data.Memory +# if self.data.ComputeCapability is not None: +# doc["ComputeCapability"] = self.data.ComputeCapability +# +# return doc ####################################################################################################################### @@ -514,10 +526,10 @@ def toJson(self): ####################################################################################################################### -class AcceleratorEnvironments(Data): - def __init__(self, id, accel_envs=[]): - Data.__init__(self,id) - self.accel_envs = accel_envs +#class AcceleratorEnvironments(Data): +# def __init__(self, id, accel_envs=[]): +# Data.__init__(self,id) +# self.accel_envs = accel_envs diff --git a/ipf/glue2/application.py b/ipf/glue2/application.py index 29979cc..9ea953e 100644 --- a/ipf/glue2/application.py +++ b/ipf/glue2/application.py @@ -22,6 +22,7 @@ from ipf.error import StepError from ipf.step import Step from ipf.sysinfo import ResourceName +from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt from .entity import * @@ -159,12 +160,13 @@ def toJson(self): ####################################################################################################################### class Applications(Data): - def __init__(self, resource_name): + def __init__(self, resource_name, ipfinfo): Data.__init__(self) self.id = resource_name self.environments = [] self.handles = [] self.resource_name = resource_name + self.ipfinfo = ipfinfo def add(self, env, handles): if env.AppVersion is None: @@ -207,6 +209,7 @@ def toJson(self): doc["ApplicationHandle"] = [] for handle in self.data.handles: doc["ApplicationHandle"].append(ApplicationHandleOgfJson(handle).toJson()) + doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo) return doc ####################################################################################################################### @@ -217,13 +220,14 @@ def __init__(self): self.description = "produces a document containing GLUE 2 ApplicationEnvironment and ApplicationHandle" self.time_out = 30 - self.requires = [ResourceName] + self.requires = [IPFInformation,ResourceName] self.produces = [Applications] self.resource_name = None def run(self): self.resource_name = self._getInput(ResourceName).resource_name + self.ipfinfo = [self._getInput(IPFInformation)] self._output(self._run()) def _run(self): diff --git a/ipf/glue2/compute.py b/ipf/glue2/compute.py index 106335f..fa23d26 100644 --- a/ipf/glue2/compute.py +++ b/ipf/glue2/compute.py @@ -25,6 +25,7 @@ from ipf.sysinfo import ResourceName from ipf.step import Step +from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt from computing_activity import ComputingActivities, ComputingActivityTeraGridXml, ComputingActivityOgfJson from computing_manager import ComputingManager, ComputingManagerTeraGridXml, ComputingManagerOgfJson from computing_manager_accel_info import ComputingManagerAcceleratorInfo, ComputingManagerAcceleratorInfoOgfJson @@ -35,6 +36,8 @@ from execution_environment import ExecutionEnvironmentTeraGridXml from execution_environment import ExecutionEnvironmentOgfJson from accelerator_environment import AcceleratorEnvironments +from accelerator_environment import AcceleratorEnvironmentsOgfJson +from accelerator_environment import AcceleratorEnvironment from accelerator_environment import AcceleratorEnvironmentOgfJson from location import Location, LocationOgfJson, LocationTeraGridXml ####################################################################################################################### @@ -45,13 +48,14 @@ def __init__(self): self.description = "creates a single data containing all nonsensitive compute-related information" self.time_out = 5 - self.requires = [ResourceName,Location, + self.requires = [IPFInformation,ResourceName,Location, ComputingService,ComputingShares,ComputingManager,ExecutionEnvironments,AcceleratorEnvironments,ComputingManagerAcceleratorInfo,ComputingShareAcceleratorInfo] self.produces = [Public] def run(self): public = Public() public.resource_name = self._getInput(ResourceName).resource_name + public.ipfinfo = [self._getInput(IPFInformation)] # the old TeraGridXML wants a site_name, so just derive it public.site_name = public.resource_name[public.resource_name.find(".")+1:] public.location = [self._getInput(Location)] @@ -72,6 +76,7 @@ class Public(Data): def __init__(self): Data.__init__(self) + self.ipfinfo = [] self.location = [] self.service = [] self.share = [] @@ -80,6 +85,9 @@ def __init__(self): self.accelenvironment = [] def fromJson(self, doc): + self.ipfinfo = [] + for idoc in doc.get("Ipfinfo",[]): + self.ipfinfo.append(ipfinfo().fromJson(idoc)) self.location = [] for ldoc in doc.get("Location",[]): self.location.append(Location().fromJson(ldoc)) @@ -159,6 +167,8 @@ def get(self): def toJson(self): doc = {} + if self.data.ipfinfo is not None: + doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo) if len(self.data.location) > 0: doc["Location"] = map(lambda location: LocationOgfJson(location).toJson(),self.data.location) if self.data.service is not None: @@ -197,11 +207,12 @@ def __init__(self): self.description = "creates a single data containing all sensitive compute-related information" self.time_out = 5 - self.requires = [ResourceName,ComputingActivities] + self.requires = [IPFInformation,ResourceName,ComputingActivities] self.produces = [Private] def run(self): private = Private() + private.ipfinfo = [self._getInput(IPFInformation)] private.resource_name = self._getInput(ResourceName).resource_name # the old TeraGridXML wants a site_name, so just derive it private.site_name = private.resource_name[private.resource_name.find(".")+1:] @@ -280,6 +291,7 @@ def toJson(self): if len(self.data.activity) > 0: doc["ComputingActivity"] = map(lambda activity: ComputingActivityOgfJson(activity).toJson(), self.data.activity) + doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo) return doc ####################################################################################################################### diff --git a/ipf/glue2/computing_manager.py b/ipf/glue2/computing_manager.py index 41cb1d2..241c910 100644 --- a/ipf/glue2/computing_manager.py +++ b/ipf/glue2/computing_manager.py @@ -127,21 +127,6 @@ def _addExecutionEnvironment(self, exec_env): def _addAcceleratorEnvironment(self, exec_env): self.ResourceID.append(exec_env.ID) - #self.ComputingManagerAcceleratorInfoID.append(exec_env.ID) - #if exec_env.PhysicalAccelerators is not None: - # if self.TotalPhysicalAccelerators == None: - # self.TotalPhysicalAccelerators = 0 - # self.TotalPhysicalAccelerators = self.TotalPhysicalAccelerators + exec_env.TotalInstances * exec_env.PhysicalAccelerators - #if exec_env.LogicalAccelerators is not None: - # if self.TotalLogicalAccelerators == None: - # self.TotalLogicalAccelerators = 0 - # self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + exec_env.TotalInstances * exec_env.LogicalAccelerators - # self.TotalSlots = self.TotalLogicalAccelerators - - #if len(self.ResourceID) == 1: - # self.Homogeneous = True - #else: - # self.Homogeneous = False def _addComputingShare(self, share): if self.SlotsUsedByLocalJobs == None: diff --git a/ipf/glue2/computing_manager_accel_info.py b/ipf/glue2/computing_manager_accel_info.py index 0a6357e..f56b804 100644 --- a/ipf/glue2/computing_manager_accel_info.py +++ b/ipf/glue2/computing_manager_accel_info.py @@ -93,7 +93,7 @@ def _addAcceleratorEnvironment(self, accel_env): if accel_env.LogicalAccelerators is not None: if self.TotalLogicalAccelerators == None: self.TotalLogicalAccelerators = 0 - self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators + self.TotalLogicalAccelerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators self.TotalSlots = self.TotalLogicalAccelerators if accel_env.UsedAcceleratorSlots is not None: if self.UsedAcceleratorSlots == None: diff --git a/ipf/glue2/computing_share_accel_info.py b/ipf/glue2/computing_share_accel_info.py index cc52981..a7f49e9 100644 --- a/ipf/glue2/computing_share_accel_info.py +++ b/ipf/glue2/computing_share_accel_info.py @@ -107,11 +107,6 @@ def _addAcceleratorEnvironment(self, accel_env): if self.TotalAcceleratorSlots == None: self.TotalAcceleratorSlots = 0 self.TotalAcceleratorSlots = self.TotalAcceleratorSlots + accel_env.TotalAcceleratorSlots - #self.TotalPhysicalAccelerators = self.TotalPhysicalAccelerators + accel_env.TotalInstances * accel_env.PhysicalAccelerators - #if self.TotalLogicalAccelerators == None: - # self.TotalLogicalAccelerators = 0 - #self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators - #self.TotalSlots = self.TotalLogicalAccelerators if accel_env.UsedAcceleratorSlots is not None: if self.UsedAcceleratorSlots == None: self.UsedAcceleratorSlots = 0 diff --git a/ipf/glue2/execution_environment.py b/ipf/glue2/execution_environment.py index 1399fd4..15b27a9 100644 --- a/ipf/glue2/execution_environment.py +++ b/ipf/glue2/execution_environment.py @@ -48,6 +48,9 @@ def __init__(self): self._acceptParameter("queues", "An expression describing the queues to include (optional). The syntax is a series of + and - where is either a queue name or a '*'. '+' means include '-' means exclude. The expression is processed in order and the value for a queue at the end determines if it is shown.", False) + self._acceptParameter("partitions", + "An expression describing the partitions to include (optional). The syntax is a series of + and - where is either a partition name or a '*'. '+' means include '-' means exclude. The expression is processed in order and the value for a partition at the end determines if it is shown.", + False) self.resource_name = None @@ -138,6 +141,14 @@ def _goodHost(self, host): m = re.search("urn:glue2:ComputingShare:(\S+).%s" % self.resource_name,share) if self._includeQueue(m.group(1)): return True + + # if the host is associated with a partition, check that it is a good one + if len(host.Partitions) == 0: + return True + partition_list = host.Partitions.split(',') + for share in partition_list: + if self._includeQueue(share): + return True return False ####################################################################################################################### @@ -190,6 +201,9 @@ def __init__(self): self.OSName = sysName.lower() self.OSVersion = release + #for filtering nodes by partition: + self.Partitions = None # string + def __str__(self): return json.dumps(ExecutionEnvironmentOgfJson(self).toJson(),sort_keys=True,indent=4) diff --git a/ipf/glue2/modules.py b/ipf/glue2/modules.py index 14944ef..c2faee5 100644 --- a/ipf/glue2/modules.py +++ b/ipf/glue2/modules.py @@ -238,7 +238,7 @@ def _run(self): self.support_contact = self.params.get("default_support_contact",False) - apps = application.Applications(self.resource_name) + apps = application.Applications(self.resource_name, self.ipfinfo) module_paths = [] try: diff --git a/ipf/glue2/openstack.py b/ipf/glue2/openstack.py index 2375ab7..4862e17 100644 --- a/ipf/glue2/openstack.py +++ b/ipf/glue2/openstack.py @@ -365,7 +365,7 @@ def _run(self): auth_url=auth_url) # restrict this to public images only? - apps = glue2.application.Applications(self.resource_name) + apps = glue2.application.Applications(self.resource_name,self.ipfinfo) for image in nova.images.list(): #print(" metadata: %s" % image.metadata) #print(dir(image)) diff --git a/ipf/glue2/slurm.py b/ipf/glue2/slurm.py index bed7f4a..a746527 100644 --- a/ipf/glue2/slurm.py +++ b/ipf/glue2/slurm.py @@ -378,7 +378,7 @@ def _run(self): raise StepError("scontrol failed: "+output+"\n") reservation_strs = output.split("\n\n") try: - reservations = map(self._getReservation,reservation_strs) + reservations = map(lambda share: self.includeQueue(share.PartitionName),map(self._getReservation,reservation_strs)) except: reservations = [] @@ -482,7 +482,8 @@ def _run(self): raise StepError("scontrol failed: "+output+"\n") reservation_strs = output.split("\n\n") try: - reservations = map(self._getReservation,reservation_strs) + #reservations = map(self._getReservation,reservation_strs) + reservations = map(lambda share: self.includeQueue(share.PartitionName),map(self._getReservation,reservation_strs)) except: reservations = [] @@ -549,6 +550,9 @@ def _getNode(self, node_str): m = re.search("RealMemory=(\S+)",node_str) # MB if m is not None: node.MainMemorySize = int(m.group(1)) + m = re.search("Partitions=(\S+)",node_str) + if m is not None: + node.Partitions = m.group(1) m = re.search("State=(\S+)",node_str) if m is not None: node.TotalInstances = 1 @@ -726,7 +730,8 @@ def _run(self): raise StepError("scontrol failed: "+output+"\n") reservation_strs = output.split("\n\n") try: - reservations = map(self._getReservation,reservation_strs) + #reservations = map(self._getReservation,reservation_strs) + reservations = map(lambda share: self.includeQueue(share.PartitionName),map(self._getReservation,reservation_strs)) except: reservations = [] @@ -769,7 +774,6 @@ def _run(self): accel_env.UsedInstances = sum(map(lambda node_name: node_map[node_name].UsedInstances,node_names)) accel_env.UnavailableInstances = sum(map(lambda node_name: node_map[node_name].UnavailableInstances, node_names)) - self.debug("accel_env.PhysicalAccelerators "+accel_env.PhysicalAccelerators) # remove nodes that are part of a current reservation so that they aren't counted twice for node_name in node_names: try: @@ -822,9 +826,14 @@ def _getNode(self, node_str): if len(greslist) == 2: node.UsedAcceleratorSlots = int(greslist[1]) node.Type = "" - elif len(greslist) == 3: - node.UsedAcceleratorSlots = int(greslist[2]) - node.Type = greslist[1] + elif len(greslist) >= 3: + endindex = greslist[2].find("(") + if endindex == -1: + node.UsedAcceleratorSlots = int(greslist[2]) + else: + uas = greslist[2][:endindex] + node.UsedAcceleratorSlots = int(uas) + node.Type = greslist[1] m = re.search("State=(\S+)",node_str) if m is not None: diff --git a/ipf/glue2/step.py b/ipf/glue2/step.py index 8c4d436..403a56a 100644 --- a/ipf/glue2/step.py +++ b/ipf/glue2/step.py @@ -34,3 +34,29 @@ def _includeQueue(self, queue_name, no_queue_name_return=False): else: self.warning("can't parse part of Queues expression: "+tok) return goodSoFar + + def _includePartition(self, partition_name, no_partition_name_return=False): + if partition_name == None: + return no_partition_name_return + if partition_name == "": + return no_partition_name_return + + try: + expression = self.params["partitions"] + except KeyError: + return True + + toks = expression.split() + goodSoFar = False + for tok in toks: + if tok[0] == '+': + partition = tok[1:] + if (partition == "*") or (partition == partition_name): + goodSoFar = True + elif tok[0] == '-': + partition = tok[1:] + if (partition == "*") or (partition == partition_name): + goodSoFar = False + else: + self.warning("can't parse part of Partitions expression: "+tok) + return goodSoFar diff --git a/ipf/ipfinfo.py b/ipf/ipfinfo.py index 26565ba..97d5c07 100644 --- a/ipf/ipfinfo.py +++ b/ipf/ipfinfo.py @@ -27,6 +27,8 @@ from ipf.paths import IPF_PARENT_PATH, IPF_ETC_PATH, IPF_WORKFLOW_PATHS, IPF_VAR_PATH from ipf.sysinfo import ResourceName +from glue2.entity import * + ####################################################################################################################### class IPFVersionStep(Step): @@ -178,7 +180,8 @@ def _run(self): #defined_workflows.append(workflow_files) workflow_info = {} for workflowfile in workflow_files: - if os.path.isfile(workflowfile): + if workflowfile.endswith("json"): + if os.path.isfile(os.path.join(workflow_dir,workflowfile)): with open(os.path.join(workflow_dir,workflowfile)) as json_data: d = json.load(json_data) info_file = "" @@ -187,12 +190,13 @@ def _run(self): info_file = step["params"]["path"] try: if info_file: - timestamp = os.path.getmtime(os.path.join(IPF_VAR_PATH,info_file)) + timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(os.path.join(IPF_VAR_PATH,info_file))).isoformat() else: timestamp = "" except OSError: timestamp = "" - workflow_info = json.dumps({"name": d["name"], "info_file": info_file, "timestamp": timestamp}) + #workflow_info = json.dumps({"name": d["name"], "info_file": info_file, "timestamp": timestamp}) + workflow_info = {"name": d["name"], "info_file": info_file, "timestamp": timestamp, "workflow_file": workflowfile} defined_workflows.append(workflow_info) return defined_workflows @@ -227,23 +231,60 @@ def __init__(self): self.description = "produces a document with basic information about a host" self.time_out = 5 - self.requires = [IPFVersion,IPFWorkflows,ResourceName] + self.requires = [IPFVersion,IPFWorkflows,SiteName] self.produces = [IPFInformation] def run(self): - self._output(IPFInformation(self._getInput(IPFVersion).ipf_version, - self._getInput(IPFWorkflows).workflows, - self._getInput(ResourceName).resource_name)) + ipfinfo = IPFInformation() + ipfinfo.ipf_version = self._getInput(IPFVersion) + ipfinfo.workflows = self._getInput(IPFWorkflows) + ipfinfo.resource_name = self._getInput(SiteName) + #self._output(IPFInformation(self._getInput(IPFVersion).ipf_version, + # self._getInput(IPFWorkflows).workflows, + # self._getInput(ResourceName).resource_name)) + self._output(ipfinfo) ####################################################################################################################### -class IPFInformation(Data): - def __init__(self, ipf_version, workflows, resource_name): - Data.__init__(self,ipf_version) - self.ipf_version = ipf_version - self.workflows = workflows - self.resource_name = resource_name +#class IPFInformation(Data): +# def __init__(self, ipf_version, workflows, resource_name): +# Data.__init__(self, ipf_version) +# self.ipf_version = ipf_version +# self.workflows = workflows +# self.resource_name = resource_name + +class IPFInformation(Entity): + + DEFAULT_VALIDITY = 60*60*24 # seconds + + def __init__(self): + Entity.__init__(self) + + self.ipf_version = None + self.workflows = None + self.resource_name = None + self.id = None + #self.Address = None # street address (string) + #self.Place = None # town/city (string) + #self.Country = None # (string) + #self.PostCode = None # postal code (string) + #self.Latitude = None # degrees + #self.Longitude = None # degrees + + def fromJson(self, doc): + # Entity + if "CreationTime" in doc: + self.CreationTime = textToDateTime(doc["CreationTime"]) + else: + self.CreationTime = datetime.datetime.now(tzoffset(0)) + self.Validity = doc.get("Validity",Location.DEFAULT_VALIDITY) + self.ipf_version = doc.get("ipf_version","unknown") + self.type = "IPF" + self.ID = "urn:glue2:PublisherInfo:%s" % str.join('-',self.type,self.ipf_version) + self.id = self.ID + self.workflows = doc.get("workflows","unknown") + self.resource_name = doc.get("resource_name","unknown") ####################################################################################################################### @@ -258,14 +299,34 @@ def get(self): ####################################################################################################################### -class IPFInformationJson(Representation): +class IPFInformationJson(EntityOgfJson): data_cls = IPFInformation def __init__(self, data): - Representation.__init__(self,Representation.MIME_TEXT_PLAIN,data) + EntityOgfJson.__init__(self,data) def get(self): - #return "IPF version %s installed at %s is running the workflows: %s\n" % (self.data.ipf_version,IPF_PARENT_PATH,self.data.workflows) - return json.dumps({"IPFVersion": self.data.ipf_version, "Location": IPF_PARENT_PATH, "hostname": self.data.resource_name, "workflows": self.data.workflows}) + #return json.dumps(self.toJson(),sort_keys=True,indent=4) + return self.toJson() + +# def get(self): +# #return "IPF version %s installed at %s is running the workflows: %s\n" % (self.data.ipf_version,IPF_PARENT_PATH,self.data.workflows) +# return json.loads({"IPFInfo": {"IPFVersion": self.data.ipf_version, "Location": IPF_PARENT_PATH, "hostname": self.data.resource_name, "workflows": self.data.workflows}}) + + def toJson(self): + doc = EntityOgfJson.toJson(self) + + doc["Location"] = IPF_PARENT_PATH + if self.data.ipf_version is not None: + #doc["IPFVersion"] = IPFVersionJson(self.data.ipf_version).get() + doc["Version"] = IPFVersionTxt(self.data.ipf_version).get() + if self.data.resource_name is not None: + doc["Hostname"] = SiteNameTxt(self.data.resource_name).get() + if self.data.workflows is not None: + doc["Workflows"] = IPFWorkflowsTxt(self.data.workflows).get() + doc["Type"] = "IPF" + s = '-' + doc["ID"] = "urn:glue2:PublisherInfo:%s" % s.join((doc["Type"],doc["Version"])) + return doc ####################################################################################################################### diff --git a/ipf/log.py b/ipf/log.py index e0630ac..071e76e 100644 --- a/ipf/log.py +++ b/ipf/log.py @@ -214,9 +214,12 @@ def handle(self): self._seek() while line: line = self.file.readline() - if line: + if not line.endswith("\n"): self.callback(self.path,line) - self._savePosition() + self._savePosition() + else: + break + ####################################################################################################################### diff --git a/ipf/xsede/configure_workflows.py b/ipf/xsede/configure_workflows.py index d7f69ac..1e45d83 100644 --- a/ipf/xsede/configure_workflows.py +++ b/ipf/xsede/configure_workflows.py @@ -38,7 +38,7 @@ def configure(): setResourceName(resource_name,compute_json) setLocation(compute_json) updateFilePublishPaths(resource_name,compute_json) - addXsedeAmqpToCompute(compute_json) + publish_to_xsede = addXsedeAmqpToCompute(compute_json) writeComputeWorkflow(resource_name,compute_json) writePeriodicComputeWorkflow(resource_name) @@ -57,7 +57,8 @@ def configure(): setResourceName(resource_name,activity_json) updateActivityLogFile(resource_name,activity_json) updateFilePublishPaths(resource_name,activity_json) - addXsedeAmqpToActivity(activity_json,compute_json) + if (publish_to_xsede): + addXsedeAmqpToActivity(activity_json,compute_json) writeActivityWorkflow(resource_name,activity_json) writeActivityInit(resource_name,module_names,env_vars) @@ -76,8 +77,9 @@ def configure(): updateFilePublishPaths(resource_name,extmodules_json) updateFilePublishPaths(resource_name,services_json) #addXsedeAmqpToModules(modules_json,compute_json) - addXsedeAmqpToExtModules(extmodules_json,compute_json) - addXsedeAmqpToAbstractServices(services_json,compute_json) + if (publish_to_xsede): + addXsedeAmqpToExtModules(extmodules_json,compute_json) + addXsedeAmqpToAbstractServices(services_json,compute_json) #writeModulesWorkflow(resource_name,modules_json) writeExtModulesWorkflow(resource_name,extmodules_json) writeAbstractServicesWorkflow(resource_name,services_json) @@ -87,8 +89,9 @@ def configure(): #writeModulesInit(resource_name,module_names,env_vars) writeExtModulesInit(resource_name,module_names,env_vars) writeAbstractServicesInit(resource_name,module_names,env_vars) - ipfinfo_json = getIPFInfoJson() - addXsedeAmqpToIPFInfo(ipfinfo_json,compute_json) + ipfinfo_json = getIPFInfoJson() + if (publish_to_xsede): + addXsedeAmqpToIPFInfo(ipfinfo_json,compute_json) writeIPFInfoWorkflow(ipfinfo_json) writeIPFInfoInit(resource_name,module_names,env_vars) @@ -171,7 +174,7 @@ def setResourceName(resource_name, workflow_json): def setLocation(compute_json): for step_json in compute_json["steps"]: if step_json["name"] == "ipf.glue2.location.LocationStep": - updateLocationStep(step_json) + updateLocationStep(step_json["params"]["location"]) return raise Exception("didn't find a LocationStep to modify") @@ -228,7 +231,7 @@ def updateFilePublishPaths(resource_name, workflow_json): def addXsedeAmqpToCompute(compute_json, ask=True): answer = options("Do you wish to publish to the XSEDE AMQP service?",["yes","no"],"yes") if answer == "no": - return + return False answer = options("Will you authenticate using an X.509 certificate and key or a username and password?", ["X.509","username/password"],"X.509") if answer == "X.509": @@ -269,6 +272,7 @@ def addXsedeAmqpToCompute(compute_json, ask=True): amqp_step["params"]["publish"] = ["ipf.glue2.compute.PrivateOgfJson"] amqp_step["params"]["exchange"] = "glue2.computing_activities" compute_json["steps"].append(amqp_step) + return True def updateActivityLogFile(resource_name, activity_json): res_name = resource_name.split(".")[0]