Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches:
- master
pull_request:
branches:
branches:
- master

jobs:
Expand Down
2 changes: 1 addition & 1 deletion paper2020/results_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def plot_fluxes(data_array):
time_before_observation="flux_time"
).rename(
dict(time_before_observation="flux_time"))
for here_infl in INFLUENCE_TEMPORAL_ONLY],
for here_infl in INFLUENCE_TEMPORAL_ONLY],
"observation_time"
)
OBSERVATIONAL_CONSTRAINT = ALIGNED_TEMPORAL_INFLUENCES.sum("observation_time")
Expand Down
105 changes: 67 additions & 38 deletions paper2020/run_inversion_osse.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import atmos_flux_inversion.variational
import atmos_flux_inversion.correlations
import atmos_flux_inversion.covariances
import atmos_flux_inversion.remapper
from atmos_flux_inversion.util import kronecker_product
from atmos_flux_inversion.linalg import asarray, kron
from atmos_flux_inversion.noise import gaussian_noise
Expand Down Expand Up @@ -563,15 +564,6 @@ def write_progress_message(msg):
(len(TRUE_FLUXES_MATCHED.coords["dim_y"]),
len(TRUE_FLUXES_MATCHED.coords["dim_x"])),
is_cyclic=False))
# spatial_correlation_remapper = np.full(
# # Grid points at full resolution, grid points at reduced resolution
# (spatial_correlations.shape[0], 1),
# # 1/Number of gridpoints combined in above mapping
# 1. / spatial_correlations.shape[0])
# # reduced_spatial_correlations = (
# # spatial_correlation_remapper.dot(
# # spatial_correlations.dot(spatial_correlation_remapper)))
import atmos_flux_inversion.remapper
spatial_obs_op_remapper, spatial_correlation_remapper = (
atmos_flux_inversion.remapper.get_remappers(
(len(TRUE_FLUXES_MATCHED.coords["dim_y"]),
Expand Down Expand Up @@ -696,6 +688,41 @@ def write_progress_message(msg):
print(reduced_spatial_covariance)
flush_output_streams()

write_progress_message("Getting optimal prolongation operators: "
"Assumes separability")
NUMERICAL_STABILIZER = 1e-5
spatial_obs_op_remapper_opt = (
atmos_flux_inversion.remapper.get_optimal_prolongation(
spatial_correlation_remapper.reshape(
REDUCED_N_GRID_POINTS, N_GRID_POINTS
),
# Using just the spatial covariance produces a singular matrix, so
# add the unscaled correlations to allow the matrix to be inverted
# This is probably due to the inclusion of the oceans in the
# domain, which have variances of zero for fluxes originating from
# the terrestrial biosphere.
spatial_covariance + NUMERICAL_STABILIZER * spatial_correlations,
)
)
write_progress_message("Got optimal spatial prolongation operator")
if UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("D"):
interval_type = "day"
elif UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("W"):
interval_type = "week"
temporal_prolong, temporal_reduce = (
atmos_flux_inversion.remapper.get_temporal_remappers(
aligned_influences.indexes["flux_time"],
int(UNCERTAINTY_TEMPORAL_RESOLUTION[:-1]),
interval_type
)
)
write_progress_message("Got temporal prolongation operator")
temporal_prolong_opt = atmos_flux_inversion.remapper.get_optimal_prolongation(
temporal_reduce, temporal_correlations
)
write_progress_message("Got optimal temporal prolongation operator")


######################################################################
# Get an observation operator for the month
#
Expand All @@ -707,39 +734,41 @@ def write_progress_message(msg):
# "spatial_obs_op_remapper_ds",
# )

reduced_influences = (
aligned_influences
.groupby_bins(
"dim_x",
pd.interval_range(
reduced_influences_array = np.einsum(
"ijkl,klmn,jh->ihmn",
aligned_influences.values,
spatial_obs_op_remapper_opt.reshape(
NY, NX, REDUCED_NY, REDUCED_NX,
),
temporal_prolong_opt
)
reduced_influences = xarray.DataArray(
data=reduced_influences_array,
dims=("observation", "reduced_flux_time",
"reduced_dim_y", "reduced_dim_x"),
coords=dict(
observation=aligned_influences.coords["observation"],
reduced_flux_time=(
reduced_temporal_correlation_ds.coords["flux_time"].rename(
flux_time="reduced_flux_time"
)
),
reduced_dim_y=pd.interval_range(
0.,
(aligned_influences.indexes["dim_y"][-1] +
UNCERTAINTY_FLUX_RESOLUTION),
freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left"
),
reduced_dim_x=pd.interval_range(
0.,
(aligned_influences.indexes["dim_x"][-1] +
UNCERTAINTY_FLUX_RESOLUTION),
freq=UNCERTAINTY_FLUX_RESOLUTION,
closed="left")
# np.arange(
# -1,
# (aligned_influences.indexes["dim_x"][-1] +
# UNCERTAINTY_FLUX_RESOLUTION),
# UNCERTAINTY_FLUX_RESOLUTION),
).sum("dim_x")
.groupby_bins(
"dim_y",
pd.interval_range(
0,
(aligned_influences.indexes["dim_y"][-1] +
UNCERTAINTY_FLUX_RESOLUTION),
freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left")
# np.arange(
# -1,
# (aligned_influences.indexes["dim_y"][-1] +
# UNCERTAINTY_FLUX_RESOLUTION),
# UNCERTAINTY_FLUX_RESOLUTION),
).sum("dim_y")
.resample(flux_time=UNCERTAINTY_TEMPORAL_RESOLUTION).sum("flux_time")
).rename(dim_x_bins="reduced_dim_x", dim_y_bins="reduced_dim_y",
flux_time="reduced_flux_time")
reduced_influences.load()
closed="left"
),
wrf_proj=aligned_influences.coords["wrf_proj"],
)
)
print(datetime.datetime.now(UTC).strftime("%c"),
"Have influence for monthly average plots")
flush_output_streams()
Expand Down
80 changes: 80 additions & 0 deletions src/atmos_flux_inversion/remapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import numpy as np
from numpy import zeros, newaxis
from numpy.linalg import inv

DTYPE = np.float64

Expand Down Expand Up @@ -51,3 +52,82 @@ def get_remappers(domain_size, block_side=3):
intensive_remapper /= n_nz[:, :, newaxis, newaxis]

return extensive_remapper, intensive_remapper


def get_optimal_prolongation(reduction, covariance):
"""Find the optimal prolongation for the given reduction.

The optimal prolongation operator depends on both the reduction
operator used and the covariance function assumed:

.. math:: pro = cov @ red^T @ (red @ cov @ red^T)^{-1}

This prolongation minimizes the aggregation error for the
inversion with the given reduction.

Parameters
----------
reduction: array_like
covariance: LinearOperator

Returns
-------
prolongation: array_like

Notes
-----
The transpose of the `extensive` remapper from
:func:`get_remappers` will function as a prolongation operator and
corresponds to `covariance` being a multiple of the identity
matrix.

References
----------
Bousserez, N. and D. Henze "Optimal and scalable methods to
approximate the solutions of large-scale Bayesian problems: theory
and application to atmospheric inversions and data assimilation"
*Quarterly Journal of the Royal Meteorological Society*
2018. Vol. 144, pp. 365--390. :doi:`10.1002/qj.3209`
"""
cov_dot_red_T = covariance.dot(reduction.T)
return cov_dot_red_T.dot(
inv(reduction.dot(cov_dot_red_T))
)


def get_temporal_remappers(old_index, n_intervals, interval_type="week"):
"""Get temporal remappers.

Tries to emulate :func:`pandas.resample`.

Parameters
----------
old_index: pd.DatetimeIndex
n_intervals: int
interval_type: {'week', 'day'}

Returns
-------
prolongation, reduction: np.ndarray[new, old]
"""
if interval_type == "day":
day = old_index.dayofyear
key = old_index.year * 1000 + (day - day[0]) // n_intervals
elif interval_type == "week":
week = old_index.week.values
week[week == 53] = 0
key = old_index.year * 1000 + np.ceil((week - week[0]) / n_intervals)
else:
raise ValueError("interval_type must be 'weeks' or 'days'")

vals, starts, counts = np.unique(key, return_index=True,
return_counts=True)
out_dim = len(vals)
prolongation = np.zeros((out_dim, len(old_index)), dtype=DTYPE)
reduction = np.zeros((out_dim, len(old_index)), dtype=DTYPE)

for i, (start, count) in enumerate(zip(starts, counts)):
prolongation[i, start:start + count] = 1
reduction[i, start:start + count] = 1. / count

return prolongation.T, reduction
Loading