|
58 | 58 | targetKsOpts = make(map[string]string) |
59 | 59 | httpClient = throttlebase.SetupHTTPClient(time.Second) |
60 | 60 | sourceThrottlerAppName = throttlerapp.VStreamerName |
61 | | - targetThrottlerAppName = throttlerapp.VReplicationName |
| 61 | + targetThrottlerAppName = throttlerapp.VPlayerName |
62 | 62 | ) |
63 | 63 |
|
64 | 64 | const ( |
@@ -1241,18 +1241,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { |
1241 | 1241 | for _, tab := range customerTablets { |
1242 | 1242 | waitForRowCountInTablet(t, tab, keyspace, workflow, 5) |
1243 | 1243 | // Confirm that we updated the stats on the target tablets as expected. |
1244 | | - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
1245 | | - require.NoError(t, err) |
1246 | | - require.NotEqual(t, "{}", jsVal) |
1247 | | - // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2} |
1248 | | - vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int() |
1249 | | - require.Greater(t, vstreamerThrottledCount, int64(0)) |
1250 | | - // We only need to do this stat check once. |
1251 | | - val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) |
1252 | | - require.NoError(t, err) |
1253 | | - throttledCount, err := strconv.ParseInt(val, 10, 64) |
1254 | | - require.NoError(t, err) |
1255 | | - require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount) |
| 1244 | + confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName) |
1256 | 1245 | } |
1257 | 1246 | }) |
1258 | 1247 | t.Run("unthrottle-app-product", func(t *testing.T) { |
@@ -1287,12 +1276,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { |
1287 | 1276 | for _, tab := range customerTablets { |
1288 | 1277 | waitForRowCountInTablet(t, tab, keyspace, workflow, 8) |
1289 | 1278 | // Confirm that we updated the stats on the target tablets as expected. |
1290 | | - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
1291 | | - require.NoError(t, err) |
1292 | | - require.NotEqual(t, "{}", jsVal) |
1293 | | - // The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} |
1294 | | - vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int() |
1295 | | - require.Greater(t, vplayerThrottledCount, int64(0)) |
| 1279 | + confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName) |
1296 | 1280 | } |
1297 | 1281 | }) |
1298 | 1282 | t.Run("unthrottle-app-customer", func(t *testing.T) { |
@@ -1856,3 +1840,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e |
1856 | 1840 | func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) { |
1857 | 1841 | execQuery(t, dbConn, "rollback") |
1858 | 1842 | } |
| 1843 | + |
| 1844 | +// confirmVReplicationThrottling confirms that the throttling related metrics reflect that |
| 1845 | +// the workflow is being throttled as expected, via the expected app name, and that this |
| 1846 | +// is impacting the lag as expected. |
| 1847 | +// The tablet passed should be a target tablet for the given workflow while the keyspace |
| 1848 | +// name provided should be the source keyspace as the target tablet stats note the stream's |
| 1849 | +// source keyspace and shard. |
| 1850 | +func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) { |
| 1851 | + const ( |
| 1852 | + sleepTime = 5 * time.Second |
| 1853 | + zv = int64(0) |
| 1854 | + ) |
| 1855 | + time.Sleep(sleepTime) // To be sure that we accrue some lag |
| 1856 | + |
| 1857 | + jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
| 1858 | + require.NoError(t, err) |
| 1859 | + require.NotEqual(t, "{}", jsVal) |
| 1860 | + // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} |
| 1861 | + throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int() |
| 1862 | + require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal) |
| 1863 | + |
| 1864 | + val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) |
| 1865 | + require.NoError(t, err) |
| 1866 | + require.NotEqual(t, "", val) |
| 1867 | + throttledCountTotal, err := strconv.ParseInt(val, 10, 64) |
| 1868 | + require.NoError(t, err) |
| 1869 | + require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val) |
| 1870 | + |
| 1871 | + // We do not calculate replication lag for the vcopier as it's not replicating |
| 1872 | + // events. |
| 1873 | + if appname != throttlerapp.VCopierName { |
| 1874 | + jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"}) |
| 1875 | + require.NoError(t, err) |
| 1876 | + require.NotEqual(t, "{}", jsVal) |
| 1877 | + // The JSON value looks like this: {"product.0.cproduct.4": 6} |
| 1878 | + vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int() |
| 1879 | + require.NoError(t, err) |
| 1880 | + // Take off 1 second to deal with timing issues in the test. |
| 1881 | + minLagSecs := int64(int64(sleepTime.Seconds()) - 1) |
| 1882 | + require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal) |
| 1883 | + |
| 1884 | + val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"}) |
| 1885 | + require.NoError(t, err) |
| 1886 | + require.NotEqual(t, "", val) |
| 1887 | + vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64) |
| 1888 | + require.NoError(t, err) |
| 1889 | + require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val) |
| 1890 | + } |
| 1891 | +} |
0 commit comments