Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_pl
import { BlueGreenPluginFactory } from "./plugins/bluegreen/blue_green_plugin_factory";
import { GlobalDbFailoverPluginFactory } from "./plugins/gdb_failover/global_db_failover_plugin_factory";
import { FullServicesContainer } from "./utils/full_services_container";
import { GdbReadWriteSplittingPluginFactory } from "./plugins/read_write_splitting/gdb_read_write_splitting_plugin_factory";

/*
Type alias used for plugin factory sorting. It holds a reference to a plugin
Expand All @@ -65,6 +66,7 @@ export class ConnectionPluginChainBuilder {
["staleDns", { factory: StaleDnsPluginFactory, weight: 500 }],
["bg", { factory: BlueGreenPluginFactory, weight: 550 }],
["readWriteSplitting", { factory: ReadWriteSplittingPluginFactory, weight: 600 }],
["gdbReadWriteSplitting", { factory: GdbReadWriteSplittingPluginFactory, weight: 610 }],
["failover", { factory: FailoverPluginFactory, weight: 700 }],
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
["gdbFailover", { factory: GlobalDbFailoverPluginFactory, weight: 720 }],
Expand All @@ -87,6 +89,7 @@ export class ConnectionPluginChainBuilder {
[StaleDnsPluginFactory, 500],
[BlueGreenPluginFactory, 550],
[ReadWriteSplittingPluginFactory, 600],
[GdbReadWriteSplittingPluginFactory, 610],
[FailoverPluginFactory, 700],
[Failover2PluginFactory, 710],
[GlobalDbFailoverPluginFactory, 720],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { ReadWriteSplittingPlugin } from "./read_write_splitting_plugin";
import { WrapperProperties } from "../../wrapper_property";
import { HostInfo } from "../../host_info";
import { RdsUtils } from "../../utils/rds_utils";
import { ReadWriteSplittingError } from "../../utils/errors";
import { Messages } from "../../utils/messages";
import { logger } from "../../../logutils";
import { ClientWrapper } from "../../client_wrapper";
import { equalsIgnoreCase } from "../../utils/utils";

export class GdbReadWriteSplittingPlugin extends ReadWriteSplittingPlugin {
protected readonly rdsUtils: RdsUtils = new RdsUtils();

protected restrictWriterToHomeRegion: boolean;
protected restrictReaderToHomeRegion: boolean;

protected isInitialized: boolean = false;
protected homeRegion: string;

protected initSettings(initHostInfo: HostInfo, properties: Map<string, any>): void {
if (this.isInitialized) {
return;
}
this.restrictWriterToHomeRegion = WrapperProperties.GDB_RW_RESTRICT_WRITER_TO_HOME_REGION.get(properties);
this.restrictReaderToHomeRegion = WrapperProperties.GDB_RW_RESTRICT_READER_TO_HOME_REGION.get(properties);

this.homeRegion = WrapperProperties.GDB_RW_HOME_REGION.get(properties);
if (!this.homeRegion) {
const rdsUrlType = this.rdsUtils.identifyRdsType(initHostInfo.host);
if (rdsUrlType.hasRegion) {
this.homeRegion = this.rdsUtils.getRdsRegion(initHostInfo.host);
}
}

if (!this.homeRegion) {
throw new ReadWriteSplittingError(Messages.get("GdbReadWriteSplittingPlugin.missingHomeRegion", initHostInfo.host));
}

logger.debug(Messages.get("GdbReadWriteSplittingPlugin.parameterValue", "gdbRwHomeRegion", this.homeRegion));

this.isInitialized = true;
}

override async connect(
hostInfo: HostInfo,
props: Map<string, any>,
isInitialConnection: boolean,
connectFunc: () => Promise<ClientWrapper>
): Promise<ClientWrapper> {
this.initSettings(hostInfo, props);
return super.connect(hostInfo, props, isInitialConnection, connectFunc);
}

override setWriterClient(writerTargetClient: ClientWrapper | undefined, writerHostInfo: HostInfo) {
if (
this.restrictWriterToHomeRegion &&
writerHostInfo != null &&
!equalsIgnoreCase(this.rdsUtils.getRdsRegion(writerHostInfo.host), this.homeRegion)
) {
throw new ReadWriteSplittingError(
Messages.get("GdbReadWriteSplittingPlugin.cantConnectWriterOutOfHomeRegion", writerHostInfo.host, this.homeRegion)
);
}
super.setWriterClient(writerTargetClient, writerHostInfo);
}

protected getReaderHostCandidates(): HostInfo[] {
if (this.restrictReaderToHomeRegion) {
const hostsInRegion: HostInfo[] = this.pluginService
.getHosts()
.filter((x) => equalsIgnoreCase(this.rdsUtils.getRdsRegion(x.host), this.homeRegion));

if (hostsInRegion.length === 0) {
throw new ReadWriteSplittingError(Messages.get("GdbReadWriteSplittingPlugin.noAvailableReadersInHomeRegion", this.homeRegion));
}
return hostsInRegion;
}
return super.getReaderHostCandidates();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { ConnectionPluginFactory } from "../../plugin_factory";
import { PluginService } from "../../plugin_service";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can remove this import

import { ConnectionPlugin } from "../../connection_plugin";
import { AwsWrapperError } from "../../utils/errors";
import { Messages } from "../../utils/messages";
import { FullServicesContainer } from "../../utils/full_services_container";

export class GdbReadWriteSplittingPluginFactory extends ConnectionPluginFactory {
private static gdbReadWriteSplittingPlugin: any;

async getInstance(servicesContainer: FullServicesContainer, properties: Map<string, any>): Promise<ConnectionPlugin> {
try {
if (!GdbReadWriteSplittingPluginFactory.gdbReadWriteSplittingPlugin) {
GdbReadWriteSplittingPluginFactory.gdbReadWriteSplittingPlugin = await import("./gdb_read_write_splitting_plugin");
}
return new GdbReadWriteSplittingPluginFactory.gdbReadWriteSplittingPlugin.GdbReadWriteSplittingPlugin(
servicesContainer.getPluginService(),
properties
);
} catch (error: any) {
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "gdbReadWriteSplittingPlugin"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
limitations under the License.
*/

import { AbstractConnectionPlugin } from "../../abstract_connection_plugin";
import { HostInfo, FailoverError, HostRole } from "../../index";
import { HostInfo, HostRole } from "../../index";
import { PluginService } from "../../plugin_service";
import { HostListProviderService } from "../../host_list_provider_service";
import { Messages } from "../../utils/messages";
Expand Down Expand Up @@ -65,7 +64,7 @@ export class ReadWriteSplittingPlugin extends AbstractReadWriteSplittingPlugin {
if (!isInitialConnection || this._hostListProviderService?.isStaticHostListProvider()) {
return result;
}
const currentRole = this.pluginService.getCurrentHostInfo()?.role;
const currentRole = await this.pluginService.getHostRole(result);

if (currentRole == HostRole.UNKNOWN) {
logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorVerifyingInitialHostRole"));
Expand Down Expand Up @@ -182,7 +181,7 @@ export class ReadWriteSplittingPlugin extends AbstractReadWriteSplittingPlugin {
}
}

protected getReaderHostCandidates(): HostInfo[] | undefined {
protected getReaderHostCandidates(): HostInfo[] {
return this.pluginService.getHosts();
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
2 changes: 2 additions & 0 deletions common/lib/utils/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class FailoverFailedError extends FailoverError {}

export class TransactionResolutionUnknownError extends FailoverError {}

export class ReadWriteSplittingError extends AwsWrapperError {}

export class LoginError extends AwsWrapperError {}

export class AwsTimeoutError extends AwsWrapperError {}
Expand Down
8 changes: 7 additions & 1 deletion common/lib/utils/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ const MESSAGES: Record<string, string> = {
"GlobalDbFailoverPlugin.currentFailoverMode": "Current Global DB failover mode: %s",
"GlobalDbFailoverPlugin.failoverElapsed": "Global DB failover elapsed time: %s ms",
"GlobalDbFailoverPlugin.candidateNull": "Candidate host is null for role: %s",
"GlobalDbFailoverPlugin.unableToConnect": "Unable to establish a connection during Global DB failover."
"GlobalDbFailoverPlugin.unableToConnect": "Unable to establish a connection during Global DB failover.",
"GdbReadWriteSplittingPlugin.missingHomeRegion":
"Unable to parse home region from endpoint '%s'. Please ensure you have set the 'gdbRwHomeRegion' connection parameter.",
"GdbReadWriteSplittingPlugin.cantConnectWriterOutOfHomeRegion":
"Writer connection to '%s' is not allowed since it is out of home region '%s'.",
"GdbReadWriteSplittingPlugin.noAvailableReadersInHomeRegion": "No available reader nodes in home region '%s'.",
"GdbReadWriteSplittingPlugin.parameterValue": "%s=%s"
};

export class Messages {
Expand Down
18 changes: 18 additions & 0 deletions common/lib/wrapper_property.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,24 @@ export class WrapperProperties {
["writer", "none"]
);

static readonly GDB_RW_HOME_REGION = new WrapperProperty<string>(
"gdbRwHomeRegion",
"Specifies the home region for read/write splitting.",
null
);

static readonly GDB_RW_RESTRICT_WRITER_TO_HOME_REGION = new WrapperProperty<boolean>(
"gdbRwRestrictWriterToHomeRegion",
"Prevents connections to a writer node outside of the defined home region.",
true
);

static readonly GDB_RW_RESTRICT_READER_TO_HOME_REGION = new WrapperProperty<boolean>(
"gdbRwRestrictReaderToHomeRegion",
"Prevents connections to a reader node outside of the defined home region.",
true
);

private static readonly PREFIXES = [
WrapperProperties.MONITORING_PROPERTY_PREFIX,
ClusterTopologyMonitorImpl.MONITORING_PROPERTY_PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("iam authentication", () => {
beforeEach(async () => {
logger.info(`Test started: ${expect.getState().currentTestName}`);
jest.useFakeTimers({
doNotFake: ["nextTick"]
doNotFake: ["nextTick", "setTimeout", "setInterval", "clearTimeout", "clearInterval", "setImmediate", "clearImmediate"]
});
client = null;
env = await TestEnvironment.getCurrent();
Expand Down
Loading