diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index ee27b38..2a3ce18 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ on: branches: - master pull_request: - branches: + branches: - master jobs: diff --git a/paper2020/results_analysis.py b/paper2020/results_analysis.py index 420c4d8..d52a02d 100644 --- a/paper2020/results_analysis.py +++ b/paper2020/results_analysis.py @@ -453,7 +453,7 @@ def plot_fluxes(data_array): time_before_observation="flux_time" ).rename( dict(time_before_observation="flux_time")) - for here_infl in INFLUENCE_TEMPORAL_ONLY], + for here_infl in INFLUENCE_TEMPORAL_ONLY], "observation_time" ) OBSERVATIONAL_CONSTRAINT = ALIGNED_TEMPORAL_INFLUENCES.sum("observation_time") diff --git a/paper2020/run_inversion_osse.py b/paper2020/run_inversion_osse.py index 1ce0827..9a7c550 100644 --- a/paper2020/run_inversion_osse.py +++ b/paper2020/run_inversion_osse.py @@ -34,6 +34,7 @@ import atmos_flux_inversion.variational import atmos_flux_inversion.correlations import atmos_flux_inversion.covariances +import atmos_flux_inversion.remapper from atmos_flux_inversion.util import kronecker_product from atmos_flux_inversion.linalg import asarray, kron from atmos_flux_inversion.noise import gaussian_noise @@ -563,15 +564,6 @@ def write_progress_message(msg): (len(TRUE_FLUXES_MATCHED.coords["dim_y"]), len(TRUE_FLUXES_MATCHED.coords["dim_x"])), is_cyclic=False)) -# spatial_correlation_remapper = np.full( -# # Grid points at full resolution, grid points at reduced resolution -# (spatial_correlations.shape[0], 1), -# # 1/Number of gridpoints combined in above mapping -# 1. / spatial_correlations.shape[0]) -# # reduced_spatial_correlations = ( -# # spatial_correlation_remapper.dot( -# # spatial_correlations.dot(spatial_correlation_remapper))) -import atmos_flux_inversion.remapper spatial_obs_op_remapper, spatial_correlation_remapper = ( atmos_flux_inversion.remapper.get_remappers( (len(TRUE_FLUXES_MATCHED.coords["dim_y"]), @@ -696,6 +688,41 @@ def write_progress_message(msg): print(reduced_spatial_covariance) flush_output_streams() +write_progress_message("Getting optimal prolongation operators: " + "Assumes separability") +NUMERICAL_STABILIZER = 1e-5 +spatial_obs_op_remapper_opt = ( + atmos_flux_inversion.remapper.get_optimal_prolongation( + spatial_correlation_remapper.reshape( + REDUCED_N_GRID_POINTS, N_GRID_POINTS + ), + # Using just the spatial covariance produces a singular matrix, so + # add the unscaled correlations to allow the matrix to be inverted + # This is probably due to the inclusion of the oceans in the + # domain, which have variances of zero for fluxes originating from + # the terrestrial biosphere. + spatial_covariance + NUMERICAL_STABILIZER * spatial_correlations, + ) +) +write_progress_message("Got optimal spatial prolongation operator") +if UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("D"): + interval_type = "day" +elif UNCERTAINTY_TEMPORAL_RESOLUTION.endswith("W"): + interval_type = "week" +temporal_prolong, temporal_reduce = ( + atmos_flux_inversion.remapper.get_temporal_remappers( + aligned_influences.indexes["flux_time"], + int(UNCERTAINTY_TEMPORAL_RESOLUTION[:-1]), + interval_type + ) +) +write_progress_message("Got temporal prolongation operator") +temporal_prolong_opt = atmos_flux_inversion.remapper.get_optimal_prolongation( + temporal_reduce, temporal_correlations +) +write_progress_message("Got optimal temporal prolongation operator") + + ###################################################################### # Get an observation operator for the month # @@ -707,39 +734,41 @@ def write_progress_message(msg): # "spatial_obs_op_remapper_ds", # ) -reduced_influences = ( - aligned_influences - .groupby_bins( - "dim_x", - pd.interval_range( +reduced_influences_array = np.einsum( + "ijkl,klmn,jh->ihmn", + aligned_influences.values, + spatial_obs_op_remapper_opt.reshape( + NY, NX, REDUCED_NY, REDUCED_NX, + ), + temporal_prolong_opt +) +reduced_influences = xarray.DataArray( + data=reduced_influences_array, + dims=("observation", "reduced_flux_time", + "reduced_dim_y", "reduced_dim_x"), + coords=dict( + observation=aligned_influences.coords["observation"], + reduced_flux_time=( + reduced_temporal_correlation_ds.coords["flux_time"].rename( + flux_time="reduced_flux_time" + ) + ), + reduced_dim_y=pd.interval_range( + 0., + (aligned_influences.indexes["dim_y"][-1] + + UNCERTAINTY_FLUX_RESOLUTION), + freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left" + ), + reduced_dim_x=pd.interval_range( 0., (aligned_influences.indexes["dim_x"][-1] + UNCERTAINTY_FLUX_RESOLUTION), freq=UNCERTAINTY_FLUX_RESOLUTION, - closed="left") - # np.arange( - # -1, - # (aligned_influences.indexes["dim_x"][-1] + - # UNCERTAINTY_FLUX_RESOLUTION), - # UNCERTAINTY_FLUX_RESOLUTION), - ).sum("dim_x") - .groupby_bins( - "dim_y", - pd.interval_range( - 0, - (aligned_influences.indexes["dim_y"][-1] + - UNCERTAINTY_FLUX_RESOLUTION), - freq=UNCERTAINTY_FLUX_RESOLUTION, closed="left") - # np.arange( - # -1, - # (aligned_influences.indexes["dim_y"][-1] + - # UNCERTAINTY_FLUX_RESOLUTION), - # UNCERTAINTY_FLUX_RESOLUTION), - ).sum("dim_y") - .resample(flux_time=UNCERTAINTY_TEMPORAL_RESOLUTION).sum("flux_time") -).rename(dim_x_bins="reduced_dim_x", dim_y_bins="reduced_dim_y", - flux_time="reduced_flux_time") -reduced_influences.load() + closed="left" + ), + wrf_proj=aligned_influences.coords["wrf_proj"], + ) +) print(datetime.datetime.now(UTC).strftime("%c"), "Have influence for monthly average plots") flush_output_streams() diff --git a/src/atmos_flux_inversion/remapper.py b/src/atmos_flux_inversion/remapper.py index 3a93507..6845d85 100644 --- a/src/atmos_flux_inversion/remapper.py +++ b/src/atmos_flux_inversion/remapper.py @@ -8,6 +8,7 @@ import numpy as np from numpy import zeros, newaxis +from numpy.linalg import inv DTYPE = np.float64 @@ -51,3 +52,82 @@ def get_remappers(domain_size, block_side=3): intensive_remapper /= n_nz[:, :, newaxis, newaxis] return extensive_remapper, intensive_remapper + + +def get_optimal_prolongation(reduction, covariance): + """Find the optimal prolongation for the given reduction. + + The optimal prolongation operator depends on both the reduction + operator used and the covariance function assumed: + + .. math:: pro = cov @ red^T @ (red @ cov @ red^T)^{-1} + + This prolongation minimizes the aggregation error for the + inversion with the given reduction. + + Parameters + ---------- + reduction: array_like + covariance: LinearOperator + + Returns + ------- + prolongation: array_like + + Notes + ----- + The transpose of the `extensive` remapper from + :func:`get_remappers` will function as a prolongation operator and + corresponds to `covariance` being a multiple of the identity + matrix. + + References + ---------- + Bousserez, N. and D. Henze "Optimal and scalable methods to + approximate the solutions of large-scale Bayesian problems: theory + and application to atmospheric inversions and data assimilation" + *Quarterly Journal of the Royal Meteorological Society* + 2018. Vol. 144, pp. 365--390. :doi:`10.1002/qj.3209` + """ + cov_dot_red_T = covariance.dot(reduction.T) + return cov_dot_red_T.dot( + inv(reduction.dot(cov_dot_red_T)) + ) + + +def get_temporal_remappers(old_index, n_intervals, interval_type="week"): + """Get temporal remappers. + + Tries to emulate :func:`pandas.resample`. + + Parameters + ---------- + old_index: pd.DatetimeIndex + n_intervals: int + interval_type: {'week', 'day'} + + Returns + ------- + prolongation, reduction: np.ndarray[new, old] + """ + if interval_type == "day": + day = old_index.dayofyear + key = old_index.year * 1000 + (day - day[0]) // n_intervals + elif interval_type == "week": + week = old_index.week.values + week[week == 53] = 0 + key = old_index.year * 1000 + np.ceil((week - week[0]) / n_intervals) + else: + raise ValueError("interval_type must be 'weeks' or 'days'") + + vals, starts, counts = np.unique(key, return_index=True, + return_counts=True) + out_dim = len(vals) + prolongation = np.zeros((out_dim, len(old_index)), dtype=DTYPE) + reduction = np.zeros((out_dim, len(old_index)), dtype=DTYPE) + + for i, (start, count) in enumerate(zip(starts, counts)): + prolongation[i, start:start + count] = 1 + reduction[i, start:start + count] = 1. / count + + return prolongation.T, reduction diff --git a/src/atmos_flux_inversion/tests.py b/src/atmos_flux_inversion/tests.py index 9e61bf5..d23df09 100644 --- a/src/atmos_flux_inversion/tests.py +++ b/src/atmos_flux_inversion/tests.py @@ -3057,6 +3057,237 @@ def test_harder(self): [29, 32, 34], [43, 46, 48]]) + def test_matrix_properties(self): + """Test needed properties. + + extensive.T functions as a prolongation operator. + Prolonging then reducing some data should be an identity. + + extensive.T @ intensive projects data into the seen subspace. + This operation should be idempotent (follows from inverse). + """ + get_remappers = atmos_flux_inversion.remapper.get_remappers + test_sides = range(3, 10) + coarsen_to = range(2, 5) + for side1, side2, coarsening in itertools.product( + test_sides, test_sides, coarsen_to + ): + with self.subTest(side1=side1, side2=side2, coarsening=coarsening): + extensive, intensive = get_remappers((side1, side2), + coarsening) + extensive = extensive.reshape(np.prod(extensive.shape[:2]), + np.prod(extensive.shape[2:])) + intensive = intensive.reshape(np.prod(intensive.shape[:2]), + np.prod(intensive.shape[2:])) + + np_tst.assert_allclose(intensive.dot(extensive.T), + np.eye(intensive.shape[0])) + + proj_op = extensive.T.dot(intensive) + np_tst.assert_allclose(proj_op.dot(proj_op), proj_op) + + +class TestTemporalRemapper(unittest2.TestCase): + """Test properties of the temporal remappers.""" + + def test_matrix_properties(self): + """Test temporal remapping matrix properties.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="1D") + + for n_int, base_unit in itertools.product( + range(1, 5), ("day", "week") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + self.assertLessEqual(*reduction.shape) + self.assertGreaterEqual(*prolongation.shape) + + np_tst.assert_allclose(reduction.dot(prolongation), + np.eye(reduction.shape[0])) + proj_op = prolongation.dot(reduction) + np_tst.assert_allclose(proj_op.dot(proj_op), + proj_op) + + def test_identity(self): + """Test temporal remapping handles null case.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + for n_int, base_unit in itertools.product( + range(1, 5), ("day", "week") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + freq = "{n:d}{f:s}".format( + n=n_int, f=base_unit[0].upper() + ) + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq=freq) + + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + + self.assertEqual(reduction.shape[0], len(base_index)) + self.assertEqual(reduction.shape[1], len(base_index)) + np_tst.assert_allclose(reduction, np.eye(len(base_index))) + np_tst.assert_allclose(prolongation, np.eye(len(base_index))) + + def test_values(self): + """Test temporal remapping gives same answers as resample.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="6H") + + for n_int, base_unit, offset in itertools.product( + range(1, 5), ("day", "week"), range(4) + ): + with self.subTest(n_int=n_int, base_unit=base_unit, offset=offset): + freq = "{n:d}{f:s}".format( + n=n_int, f=base_unit[0].upper() + ) + index = base_index + pd.Timedelta(hours=6 * offset) + data = pd.DataFrame( + data=range(len(base_index)), + index=index + ) + + prolongation, reduction = get_temporal_remappers( + index, n_int, base_unit + ) + np_tst.assert_allclose( + reduction.dot(data.values), + data.resample(freq).mean() + ) + np_tst.assert_allclose( + prolongation.T.dot(data.values), + data.resample(freq).sum() + ) + + def test_fails_bad_unit(self): + """Test temporal remapping fails on unrecognized intervals.""" + get_temporal_remappers = (atmos_flux_inversion.remapper + .get_temporal_remappers) + + base_index = pd.date_range("2010-01-01T00:00", + "2010-02-01T00:00", + freq="1D") + for n_int, base_unit in itertools.product( + range(1, 5), ("hour", "month", "year") + ): + with self.subTest(n_int=n_int, base_unit=base_unit): + with self.assertRaises(ValueError): + prolongation, reduction = get_temporal_remappers( + base_index, n_int, base_unit + ) + + +class TestOptimalProlongation(unittest2.TestCase): + """Test properties of the optimal prolongation.""" + + def test_opt_matrix_properties(self): + """Test matrix properties of the optimal prolongation operator.""" + remapper = atmos_flux_inversion.remapper + get_remappers = remapper.get_remappers + get_optimal_prolongation = remapper.get_optimal_prolongation + from_function = (atmos_flux_inversion.correlations. + HomogeneousIsotropicCorrelation.from_function) + ExpCorr = (atmos_flux_inversion.correlations. + ExponentialCorrelation) + test_sides = range(5, 8) + coarsen_to = range(3, 5) + cov_funs = ( + lambda n1, n2: scipy.linalg.toeplitz(0.5 ** np.arange(n1 * n2)), + lambda n1, n2: from_function(ExpCorr(2), (n1, n2)), + ) + + for side1, side2, coarsening, cov_fun in itertools.product( + test_sides, test_sides, coarsen_to, cov_funs + ): + with self.subTest(side1=side1, side2=side2, coarsening=coarsening, + cov_fun=cov_fun): + extensive, intensive = get_remappers((side1, side2), + coarsening) + extensive = extensive.reshape(np.prod(extensive.shape[:2]), + np.prod(extensive.shape[2:])) + intensive = intensive.reshape(np.prod(intensive.shape[:2]), + np.prod(intensive.shape[2:])) + + cov = cov_fun(side1, side2) + optimal = get_optimal_prolongation(intensive, cov) + + np_tst.assert_allclose(intensive.dot(optimal), + np.eye(intensive.shape[0]), + # rtol=1e-4, atol=1e-6) + rtol=1e-7, atol=1e-15) + + proj_op = extensive.T.dot(intensive) + opt_proj_op = optimal.dot(intensive) + np_tst.assert_allclose( + opt_proj_op.dot(opt_proj_op), + opt_proj_op, + # rtol=1e-4, atol=1e-6 + rtol=1e-7, atol=1e-15 + ) + + n_obs = 4 + obs_op = np.eye(n_obs, intensive.shape[1]) + + cov_red = intensive.dot(cov.dot(intensive.T)) + obs_cov = np.eye(n_obs) + + # Using default prolongation + cov_proj_T = cov.dot(proj_op.T) + agg_err_base = obs_op.dot(( + cov + proj_op.dot(cov.dot(proj_op.T)) + # \Pi B = (B^T \Pi^T)^T = (B \Pi^T)^T + - cov_proj_T.T - cov_proj_T + ).dot(obs_op.T)) + + # Using optimal prolongation + obs_op_opt = obs_op.dot(optimal) + bht_red_opt = cov_red.dot(obs_op_opt.T) + cov_opt_proj_T = cov.dot(opt_proj_op.T) + agg_err_opt = obs_op.dot(( + cov + opt_proj_op.dot(cov.dot(opt_proj_op.T)) + # \Pi B = (B^T \Pi^T)^T = (B \Pi^T)^T + - cov_opt_proj_T.T - cov_opt_proj_T + ).dot(obs_op.T)) + avg_opt = bht_red_opt.dot( + la.solve( + obs_op_opt.dot(bht_red_opt) + agg_err_opt + obs_cov, + obs_op_opt + ) + ) + + # Assert aggregation error using optimal prolongation + # operator is not greater than that using naive + # prolongation operator. + self.assertLessEqual(la.norm(agg_err_opt), + la.norm(agg_err_base)) + + # Assert optimal aggregation error is difference + # between unprojected and projected covariance + np_tst.assert_allclose( + agg_err_opt, + obs_op.dot(cov.dot(obs_op.T)) - + obs_op.dot( + opt_proj_op.dot(cov.dot(opt_proj_op.T)) + .dot(obs_op.T)) + ) + + # Assert posterior covariance is positive definite + cholesky(cov_red - avg_opt.dot(cov_red)) + class TestObservationCovariance(unittest2.TestCase): """Test the generation of observation covariances.""" diff --git a/tox.ini b/tox.ini index 8457e6f..6ef1d96 100644 --- a/tox.ini +++ b/tox.ini @@ -101,9 +101,10 @@ exclude = stubs max-complexity = 15 doctests = True +# C408 penalizes dict(name=value) calls ignore = D413, C408 per-file-ignores = - JAMES2019/*.py:E402 + paper2020/*.py:E402 select = # pycodestyle E,