From 391dc0a67dada65653fe045e0572aa9a255cf3af Mon Sep 17 00:00:00 2001 From: DWesl <22566757+DWesl@users.noreply.github.com> Date: Mon, 25 Nov 2019 20:28:13 -0500 Subject: [PATCH 1/5] Add tests for linear algebra properties of remappers. Properties are taken from Bousserez and Henze (2018) and are one-sided inverse and idempotence. --- src/atmos_flux_inversion/tests.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/atmos_flux_inversion/tests.py b/src/atmos_flux_inversion/tests.py index 9e61bf5..ab6382e 100644 --- a/src/atmos_flux_inversion/tests.py +++ b/src/atmos_flux_inversion/tests.py @@ -3057,6 +3057,35 @@ def test_harder(self): [29, 32, 34], [43, 46, 48]]) + def test_matrix_properties(self): + """Test needed properties. + + extensive.T functions as a prolongation operator. + Prolonging then reducing some data should be an identity. + + extensive.T @ intensive projects data into the seen subspace. + This operation should be idempotent (follows from inverse). + """ + get_remappers = atmos_flux_inversion.remapper.get_remappers + test_sides = range(3, 10) + coarsen_to = range(2, 5) + for side1, side2, coarsening in itertools.product( + test_sides, test_sides, coarsen_to + ): + with self.subTest(side1=side1, side2=side2, coarsening=coarsening): + extensive, intensive = get_remappers((side1, side2), + coarsening) + extensive = extensive.reshape(np.prod(extensive.shape[:2]), + np.prod(extensive.shape[2:])) + intensive = intensive.reshape(np.prod(intensive.shape[:2]), + np.prod(intensive.shape[2:])) + + np_tst.assert_allclose(intensive.dot(extensive.T), + np.eye(intensive.shape[0])) + + proj_op = extensive.T.dot(intensive) + np_tst.assert_allclose(proj_op.dot(proj_op), proj_op) + class TestObservationCovariance(unittest2.TestCase): """Test the generation of observation covariances.""" From 62cf80ed0226cc5d8fb708209d1f3cd8c17892bd Mon Sep 17 00:00:00 2001 From: DWesl <22566757+DWesl@users.noreply.github.com> Date: Mon, 25 Nov 2019 21:20:45 -0500 Subject: [PATCH 2/5] Add function to find optimal prolongation operator. Follows Bousserez and Henke (2018) for minimum aggregation error. --- src/atmos_flux_inversion/remapper.py | 42 ++++++++++++ src/atmos_flux_inversion/tests.py | 98 ++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/src/atmos_flux_inversion/remapper.py b/src/atmos_flux_inversion/remapper.py index 3a93507..929c4de 100644 --- a/src/atmos_flux_inversion/remapper.py +++ b/src/atmos_flux_inversion/remapper.py @@ -8,6 +8,7 @@ import numpy as np from numpy import zeros, newaxis +from numpy.linalg import inv DTYPE = np.float64 @@ -51,3 +52,44 @@ def get_remappers(domain_size, block_side=3): intensive_remapper /= n_nz[:, :, newaxis, newaxis] return extensive_remapper, intensive_remapper + + +def get_optimal_prolongation(reduction, covariance): + """Find the optimal prolongation for the given reduction. + + The optimal prolongation operator depends on both the reduction + operator used and the covariance function assumed: + + .. math:: pro = cov @ red^T @ (red @ cov @ red^T)^{-1} + + This prolongation minimizes the aggregation error for the + inversion with the given reduction. + + Parameters + ---------- + reduction: array_like + covariance: LinearOperator + + Returns + ------- + prolongation: array_like + + Notes + ----- + The transpose of the `extensive` remapper from + :func:`get_remappers` will function as a prolongation operator and + corresponds to `covariance` being a multiple of the identity + matrix. + + References + ---------- + Bousserez, N. and D. Henze "Optimal and scalable methods to + approximate the solutions of large-scale Bayesian problems: theory + and application to atmospheric inversions and data assimilation" + *Quarterly Journal of the Royal Meteorological Society* + 2018. Vol. 144, pp. 365--390. :doi:`10.1002/qj.3209` + """ + cov_dot_red_T = covariance.dot(reduction.T) + return cov_dot_red_T.dot( + inv(reduction.dot(cov_dot_red_T)) + ) diff --git a/src/atmos_flux_inversion/tests.py b/src/atmos_flux_inversion/tests.py index ab6382e..a47c9d9 100644 --- a/src/atmos_flux_inversion/tests.py +++ b/src/atmos_flux_inversion/tests.py @@ -3087,6 +3087,104 @@ def test_matrix_properties(self): np_tst.assert_allclose(proj_op.dot(proj_op), proj_op) +class TestOptimalProlongation(unittest2.TestCase): + """Test properties of the optimal prolongation.""" + + def test_opt_matrix_properties(self): + """Test matrix properties of the optimal prolongation operator.""" + remapper = atmos_flux_inversion.remapper + get_remappers = remapper.get_remappers + get_optimal_prolongation = remapper.get_optimal_prolongation + from_function = (atmos_flux_inversion.correlations. + HomogeneousIsotropicCorrelation.from_function) + ExpCorr = (atmos_flux_inversion.correlations. + ExponentialCorrelation) + test_sides = range(5, 8) + coarsen_to = range(3, 5) + cov_funs = ( + lambda n1, n2: scipy.linalg.toeplitz(0.5 ** np.arange(n1 * n2)), + lambda n1, n2: from_function(ExpCorr(2), (n1, n2)), + ) + + for side1, side2, coarsening, cov_fun in itertools.product( + test_sides, test_sides, coarsen_to, cov_funs + ): + with self.subTest(side1=side1, side2=side2, coarsening=coarsening, + cov_fun=cov_fun): + extensive, intensive = get_remappers((side1, side2), + coarsening) + extensive = extensive.reshape(np.prod(extensive.shape[:2]), + np.prod(extensive.shape[2:])) + intensive = intensive.reshape(np.prod(intensive.shape[:2]), + np.prod(intensive.shape[2:])) + + cov = cov_fun(side1, side2) + optimal = get_optimal_prolongation(intensive, cov) + + np_tst.assert_allclose(intensive.dot(optimal), + np.eye(intensive.shape[0]), + # rtol=1e-4, atol=1e-6) + rtol=1e-7, atol=1e-15) + + proj_op = extensive.T.dot(intensive) + opt_proj_op = optimal.dot(intensive) + np_tst.assert_allclose( + opt_proj_op.dot(opt_proj_op), + opt_proj_op, + # rtol=1e-4, atol=1e-6 + rtol=1e-7, atol=1e-15 + ) + + n_obs = 4 + obs_op = np.eye(n_obs, intensive.shape[1]) + + cov_red = intensive.dot(cov.dot(intensive.T)) + obs_cov = np.eye(n_obs) + + # Using default prolongation + cov_proj_T = cov.dot(proj_op.T) + agg_err_base = obs_op.dot(( + cov + proj_op.dot(cov.dot(proj_op.T)) + # \Pi B = (B^T \Pi^T)^T = (B \Pi^T)^T + - cov_proj_T.T - cov_proj_T + ).dot(obs_op.T)) + + # Using optimal prolongation + obs_op_opt = obs_op.dot(optimal) + bht_red_opt = cov_red.dot(obs_op_opt.T) + cov_opt_proj_T = cov.dot(opt_proj_op.T) + agg_err_opt = obs_op.dot(( + cov + opt_proj_op.dot(cov.dot(opt_proj_op.T)) + # \Pi B = (B^T \Pi^T)^T = (B \Pi^T)^T + - cov_opt_proj_T.T - cov_opt_proj_T + ).dot(obs_op.T)) + avg_opt = bht_red_opt.dot( + la.solve( + obs_op_opt.dot(bht_red_opt) + agg_err_opt + obs_cov, + obs_op_opt + ) + ) + + # Assert aggregation error using optimal prolongation + # operator is not greater than that using naive + # prolongation operator. + self.assertLessEqual(la.norm(agg_err_opt), + la.norm(agg_err_base)) + + # Assert optimal aggregation error is difference + # between unprojected and projected covariance + np_tst.assert_allclose( + agg_err_opt, + obs_op.dot(cov.dot(obs_op.T)) - + obs_op.dot( + opt_proj_op.dot(cov.dot(opt_proj_op.T)) + .dot(obs_op.T)) + ) + + # Assert posterior covariance is positive definite + cholesky(cov_red - avg_opt.dot(cov_red)) + + class TestObservationCovariance(unittest2.TestCase): """Test the generation of observation covariances.""" From d1a4e00ce41ca4f814b76731f196892a03940cb6 Mon Sep 17 00:00:00 2001 From: DWesl <22566757+DWesl@users.noreply.github.com> Date: Mon, 25 Nov 2019 21:27:50 -0500 Subject: [PATCH 3/5] Allow dict(name=value) pattern in flake8 config. flake8-configurations provides the check. --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 3fcf207..ec2d483 100644 --- a/tox.ini +++ b/tox.ini @@ -92,7 +92,8 @@ commands= exclude = .git,__pycache__,build,dist max-complexity = 15 doctests = True -ignore = D413 +# C408 penalizes dict(name=value) calls +ignore = D413,C408 select = # pycodestyle E, From 1548a055a427e49dc068cff98d8f46e1e4fb28b8 Mon Sep 17 00:00:00 2001 From: DWesl <22566757+DWesl@users.noreply.github.com> Date: Wed, 11 Dec 2019 10:16:22 -0500 Subject: [PATCH 4/5] Add temporal "remapper" so I can get matrices to find optimal prolongations. Still need to modify the inversion script. --- src/atmos_flux_inversion/remapper.py | 38 ++++++++++ src/atmos_flux_inversion/tests.py | 104 +++++++++++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/src/atmos_flux_inversion/remapper.py b/src/atmos_flux_inversion/remapper.py index 929c4de..6845d85 100644 --- a/src/atmos_flux_inversion/remapper.py +++ b/src/atmos_flux_inversion/remapper.py @@ -93,3 +93,41 @@ def get_optimal_prolongation(reduction, covariance): return cov_dot_red_T.dot( inv(reduction.dot(cov_dot_red_T)) ) + + +def get_temporal_remappers(old_index, n_intervals, interval_type="week"): + """Get temporal remappers. + + Tries to emulate :func:`pandas.resample`. + + Parameters + ---------- + old_index: pd.DatetimeIndex + n_intervals: int + interval_type: {'week', 'day'} + + Returns + ------- + prolongation, reduction: np.ndarray[new, old] + """ + if interval_type == "day": + day = old_index.dayofyear + key = old_index.year * 1000 + (day - day[0]) // n_intervals + elif interval_type == "week": + week = old_index.week.values + week[week == 53] = 0 + key = old_index.year * 1000 + np.ceil((week - week[0]) / n_intervals) + else: + raise ValueError("interval_type must be 'weeks' or 'days'") + + vals, starts, counts = np.unique(key, return_index=True, + return_counts=True) + out_dim = len(vals) + prolongation = np.zeros((out_dim, len(old_index)), dtype=DTYPE) + reduction = np.zeros((out_dim, len(old_index)), dtype=DTYPE) + + for i, (start, count) in enumerate(zip(starts, counts)): + prolongation[i, start:start + count] = 1 + reduction[i, start:start + count] = 1. / count + + return prolongation.T, reduction diff --git a/src/atmos_flux_inversion/tests.py b/src/atmos_flux_inversion/tests.py index a47c9d9..d23df09 100644 --- a/src/atmos_flux_inversion/tests.py +++ b/src/atmos_flux_inversion/tests.py @@ -3087,6 +3087,110 @@ def test_matrix_properties(self): np_tst.assert_allclose(proj_op.dot(proj_op), proj_op) +class TestTemporalRemapper(unittest2.TestCase): + """Test properties of the temporal remappers.""" + + def test_matrix_properties(self): + """Test temporal remapping matrix properties.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="1D") + + for n_int, base_unit in itertools.product( + range(1, 5), ("day", "week") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + self.assertLessEqual(*reduction.shape) + self.assertGreaterEqual(*prolongation.shape) + + np_tst.assert_allclose(reduction.dot(prolongation), + np.eye(reduction.shape[0])) + proj_op = prolongation.dot(reduction) + np_tst.assert_allclose(proj_op.dot(proj_op), + proj_op) + + def test_identity(self): + """Test temporal remapping handles null case.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + for n_int, base_unit in itertools.product( + range(1, 5), ("day", "week") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + freq = "{n:d}{f:s}".format( + n=n_int, f=base_unit[0].upper() + ) + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq=freq) + + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + + self.assertEqual(reduction.shape[0], len(base_index)) + self.assertEqual(reduction.shape[1], len(base_index)) + np_tst.assert_allclose(reduction, np.eye(len(base_index))) + np_tst.assert_allclose(prolongation, np.eye(len(base_index))) + + def test_values(self): + """Test temporal remapping gives same answers as resample.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="6H") + + for n_int, base_unit, offset in itertools.product( + range(1, 5), ("day", "week"), range(4) + ): + with self.subTest(n_int=n_int, base_unit=base_unit, offset=offset): + freq = "{n:d}{f:s}".format( + n=n_int, f=base_unit[0].upper() + ) + index = base_index + pd.Timedelta(hours=6 * offset) + data = pd.DataFrame( + data=range(len(base_index)), + index=index + ) + + prolongation, reduction = get_temporal_remappers( + index, n_int, base_unit + ) + np_tst.assert_allclose( + reduction.dot(data.values), + data.resample(freq).mean() + ) + np_tst.assert_allclose( + prolongation.T.dot(data.values), + data.resample(freq).sum() + ) + + def test_fails_bad_unit(self): + """Test temporal remapping fails on unrecognized intervals.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="1D") + for n_int, base_unit in itertools.product( + range(1, 5), ("hour", "month", "year") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + with self.assertRaises(ValueError): + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + + class TestOptimalProlongation(unittest2.TestCase): """Test properties of the optimal prolongation.""" From 0c56e4dfa7392dc06735327c7fa04bc7515bc30c Mon Sep 17 00:00:00 2001 From: DWesl <22566757+DWesl@users.noreply.github.com> Date: Thu, 12 Dec 2019 09:54:48 -0500 Subject: [PATCH 5/5] Use optimal prolongation operators in script. --- examples/month_inversion_magic_dask.py | 105 ++++++++++++++++--------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/examples/month_inversion_magic_dask.py b/examples/month_inversion_magic_dask.py index e44da65..8b84126 100644 --- a/examples/month_inversion_magic_dask.py +++ b/examples/month_inversion_magic_dask.py @@ -33,6 +33,7 @@ import atmos_flux_inversion.variational import atmos_flux_inversion.correlations import atmos_flux_inversion.covariances +import atmos_flux_inversion.remapper from atmos_flux_inversion.util import kronecker_product from atmos_flux_inversion.linalg import asarray, kron from atmos_flux_inversion.noise import gaussian_noise @@ -558,15 +559,6 @@ def write_progress_message(msg): (len(TRUE_FLUXES_MATCHED.coords["dim_y"]), len(TRUE_FLUXES_MATCHED.coords["dim_x"])), is_cyclic=False)) -# spatial_correlation_remapper = np.full( -# # Grid points at full resolution, grid points at reduced resolution -# (spatial_correlations.shape[0], 1), -# # 1/Number of gridpoints combined in above mapping -# 1. / spatial_correlations.shape[0]) -# # reduced_spatial_correlations = ( -# # spatial_correlation_remapper.dot( -# # spatial_correlations.dot(spatial_correlation_remapper))) -import atmos_flux_inversion.remapper spatial_obs_op_remapper, spatial_correlation_remapper = ( atmos_flux_inversion.remapper.get_remappers( (len(TRUE_FLUXES_MATCHED.coords["dim_y"]), @@ -696,6 +688,41 @@ def write_progress_message(msg): print(reduced_spatial_covariance) flush_output_streams() +write_progress_message("Getting optimal prolongation operators: " + "Assumes separability") +NUMERICAL_STABILIZER = 1e-5 +spatial_obs_op_remapper_opt = ( + atmos_flux_inversion.remapper.get_optimal_prolongation( + spatial_correlation_remapper.reshape( + REDUCED_N_GRID_POINTS, N_GRID_POINTS + ), + # Using just the spatial covariance produces a singular matrix, so + # add the unscaled correlations to allow the matrix to be inverted + # This is probably due to the inclusion of the oceans in the + # domain, which have variances of zero for fluxes originating from + # the terrestrial biosphere. + spatial_covariance + NUMERICAL_STABILIZER * spatial_correlations, + ) +) +write_progress_message("Got optimal spatial prolongation operator") +if UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("D"): + interval_type = "day" +elif UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("W"): + interval_type = "week" +temporal_prolong, temporal_reduce = ( + atmos_flux_inversion.remapper.get_temporal_remappers( + aligned_influences.indexes["flux_time"], + int(UNCERTAINTY_TEMPORAL_RESOLUTION[:-1]), + interval_type + ) +) +write_progress_message("Got temporal prolongation operator") +temporal_prolong_opt = atmos_flux_inversion.remapper.get_optimal_prolongation( + temporal_reduce, temporal_correlations +) +write_progress_message("Got optimal temporal prolongation operator") + + ###################################################################### # Get an observation operator for the month # @@ -707,39 +734,41 @@ def write_progress_message(msg): # "spatial_obs_op_remapper_ds", # ) -reduced_influences = ( - aligned_influences - .groupby_bins( - "dim_x", - pd.interval_range( +reduced_influences_array = np.einsum( + "ijkl,klmn,jh->ihmn", + aligned_influences.values, + spatial_obs_op_remapper_opt.reshape( + NY, NX, REDUCED_NY, REDUCED_NX, + ), + temporal_prolong_opt +) +reduced_influences = xarray.DataArray( + data=reduced_influences_array, + dims=("observation", "reduced_flux_time", + "reduced_dim_y", "reduced_dim_x"), + coords=dict( + observation=aligned_influences.coords["observation"], + reduced_flux_time=( + reduced_temporal_correlation_ds.coords["flux_time"].rename( + flux_time="reduced_flux_time" + ) + ), + reduced_dim_y=pd.interval_range( 0, + (aligned_influences.indexes["dim_y"][-1] + + UNCERTAINTY_FLUX_RESOLUTION), + freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left" + ), + reduced_dim_x=pd.interval_range( + 0., (aligned_influences.indexes["dim_x"][-1] + UNCERTAINTY_FLUX_RESOLUTION), freq=UNCERTAINTY_FLUX_RESOLUTION, - closed="left") - # np.arange( - # -1, - # (aligned_influences.indexes["dim_x"][-1] + - # UNCERTAINTY_FLUX_RESOLUTION), - # UNCERTAINTY_FLUX_RESOLUTION), - ).sum("dim_x") - .groupby_bins( - "dim_y", - pd.interval_range( - 0, - (aligned_influences.indexes["dim_y"][-1] + - UNCERTAINTY_FLUX_RESOLUTION), - freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left") - # np.arange( - # -1, - # (aligned_influences.indexes["dim_y"][-1] + - # UNCERTAINTY_FLUX_RESOLUTION), - # UNCERTAINTY_FLUX_RESOLUTION), - ).sum("dim_y") - .resample(flux_time=UNCERTAINTY_TEMPORAL_RESOLUTION).sum("flux_time") -).rename(dim_x_bins="reduced_dim_x", dim_y_bins="reduced_dim_y", - flux_time="reduced_flux_time") -reduced_influences.load() + closed="left" + ), + wrf_proj=aligned_influences.coords["wrf_proj"], + ) +) print(datetime.datetime.now(UTC).strftime("%c"), "Have influence for monthly average plots") flush_output_streams()