Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

Enhancements and Fixes
----------------------

- Added functionality to the DatalinkRecordMixin and the DatalinkResultsMixin
to handle basic parsing of JSON entries [#709]


Deprecations and Removals
Expand Down
11 changes: 11 additions & 0 deletions docs/dal/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,17 @@ DatalinkResults using
>>> next(datalink.bysemantics("#this")).content_type
'application/fits'

Cloud Access Information [subject to change]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As data becomes available from different cloud providers, some services are
including cloud access information in the associated Datalink in JSON format.
The ``get_cloud_params`` and ``iter_get_cloud_params`` functions for
Records and Results respectively return an ``astropy.Table`` with the
parameters specified to access data via the cloud service provider specified.

More generic functions, ``parse_json_params`` and ``iter_parse_json_params``,
act directly on the Record or Results object and can parse JSON columns with
a given column name, key, and optionally parameters to match.

Server-side processing
----------------------
Expand Down
206 changes: 206 additions & 0 deletions pyvo/dal/adhoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import warnings
import copy
import requests
import json

from .query import DALResults, DALQuery, DALService, Record
from .exceptions import DALServiceError
Expand Down Expand Up @@ -396,6 +397,71 @@ def iter_datalinks(self, preserve_order=False):
yield from self._iter_datalinks_from_dlblock(
preserve_order=preserve_order)

def iter_get_cloud_params(
self,
provider: str,
colname: str = "cloud_access",
verbose: bool = False,
**match_params
):
"""
Iterate over all Records in a DalResult and return parsed cloud parameters.

Parameters
----------
provider : str
Name of the data provider.
colname : str, optional
The column containing cloud access JSON, by default "cloud_access".
verbose : bool, optional
Whether to print debug text, by default False.
**match_params : str, optional
Further parameters on which to match beyond provider.

Returns
-------
astropy.Table
A table containing the JSON parameters separated into columns, each
row corresponding to matching JSON entries from each Record.

"""
for irow, dl_results in enumerate(self.iter_datalinks()):

products = dl_results.bysemantics("#this")

for jrow, row in enumerate(products):
# if no colname column, there is nothing to do
jsontxt = row._guess_cloud_column(colname=colname)
if jsontxt:
access_points = row.parse_json_params(
json_txt=jsontxt,
json_key=provider,
verbose=verbose,
**match_params
)
access_points.add_column([jrow]*len(access_points), name="datalink_row", index=0)
if jrow == 0:
new_dl_table = access_points
else:
for row in access_points.iterrows():
new_dl_table.add_row(row)
else:
# no json column, continue
if verbose:
print(f'No column {colname} found for Results row {irow}, datalink row {jrow}')
new_dl_table = TableElement(VOTableFile()).to_table()

# do the json parsing
cloud_params = new_dl_table
cloud_params.add_column([irow]*len(cloud_params), name="record_row", index=0)
if irow == 0:
new_table = cloud_params
else:
for row in cloud_params.iterrows():
new_table.add_row(row)

return new_table


class DatalinkRecordMixin:
"""
Expand Down Expand Up @@ -445,6 +511,146 @@ def getdataset(self, timeout=None):
# this should go to Record.getdataset()
return super().getdataset(timeout=timeout)

@staticmethod
def parse_json_params(
json_txt: str,
json_key: str,
verbose: bool = False,
**match_params
):
"""Parse information stored as JSON by key

Parameters
----------
json_txt : str
Text interpreted as JSON
json_key : str
The primary key by which to filter JSON results
verbose : bool, optional
Whether to print progress and errors, by default False
**match_params : str, optional
Further parameters on which to match beyond json_key.

Returns
-------
astropy.Table
A table containing the JSON parameters separated into columns, each
row representing a matching JSON entry.

"""

# init results table (avoiding adding import of astropy.table.Table)
new_table = TableElement(VOTableFile()).to_table()

jsonDict = json.loads(json_txt)
if json_key not in jsonDict and verbose:
print(f'No key "{json_key}" found in json_txt given.')
else:
# Expected format is a dictionary of providers as keys, with lists
# of dictionaries, for example:
# {"json_key1": [{"param1": p1val1, "param2": p2val1},
# {"param1": p1val2, "param2": p2val2}]
# "json_key2": [{"param1": p1val3, "param2": p2val3}]
# }
jkey_params = jsonDict[json_key]
if isinstance(jkey_params, dict):
jkey_params = [jkey_params]
checks = []
col_init = False
for params in jkey_params:
for k, value in match_params.items():
checks.append(params.get(k, value) == value)

if all(checks):
if not col_init:
colnames = list(params.keys())
colvals = [[] for _ in colnames]
col_init = True
for idx, val in enumerate(params.values()):
colvals[idx].append(val)
try:
new_table.add_columns(cols=colvals, names=colnames)
except UnboundLocalError:
pass

return new_table

def _guess_cloud_column(self, colname="cloud_access"):
"""returns a guess for a URI to a data product in row.

This tries a few heuristics based on how cloud access or records might
be marked up. This will return None if row does not look as if
it contained a cloud access column.
"""
if hasattr(self, colname):
return getattr(self, colname)

if colname in self:
return self[colname]

cloud_access = self.getbyutype("adhoc:cloudstorage")
if cloud_access:
return cloud_access

cloud_access = self.getbyucd("meta.ref.cloudstorage")
if cloud_access:
return cloud_access

def get_cloud_params(
self,
provider: str,
colname: str = "cloud_access",
verbose: bool = False,
**match_params
):
"""Parse cloud information stored as JSON by provider

Parameters
----------
provider: str
Name of the data provider
colname: str, optional
The column name containing the cloud access JSON, by default "cloud_access"
verbose: bool, optional
If True, print progress and debug text, by default False
**match_params
Further parameters on which to match beyond provider.

Return
------
astropy.Table
A table containing the JSON parameters separated into columns,
each row being a unique JSON entry and/or from a different DatalinkRecord.

"""
dl_results = self.getdatalink()
products = dl_results.bysemantics("#this")

for irow, row in enumerate(products):
# if no colname column, there is nothing to do
cloud_json = row._guess_cloud_column(colname=colname)
if cloud_json:
access_points = row.parse_json_params(
json_txt=cloud_json,
json_key=provider,
verbose=verbose,
**match_params
)
access_points.add_column([irow]*len(access_points), name="datalink_row", index=0)
if irow == 0:
new_table = access_points
else:
for row in access_points.iterrows():
new_table.add_row(row)
else:
# no json column, return None
if verbose:
print(f'No column {colname} found for row {irow}')
new_table = None
break

return new_table


class DatalinkService(DALService, AvailabilityMixin, CapabilityMixin):
"""
Expand Down
Loading
Loading