Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*.manifest text
*.bat text
*.cmd text
*.sh text
*.sh text eol=lf
*.txt text
*.dat text
*.rc text
Expand Down
5 changes: 5 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,9 @@
<PackageVersion Include="Serilog.Sinks.Seq" Version="9.0.0" />
<PackageVersion Include="Swashbuckle.AspNetCore" Version="9.0.1" />
</ItemGroup>
<ItemGroup Label="Packages for SQL Server Database project">
<PackageVersion Include="ErikEJ.DacFX.SqlServer.Rules" Version="3.0.0" />
<PackageVersion Include="ErikEJ.DacFX.TSQLSmellSCA" Version="3.0.0" />
<PackageVersion Include="Microsoft.SqlServer.Dacpacs.Master" Version="160.2.4" />
</ItemGroup>
</Project>
2 changes: 2 additions & 0 deletions Eventuous.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@
</Folder>
<Folder Name="/Relational/SqlServer/"/>
<Folder Name="/Relational/SqlServer/src/">
<Project Path="src/SqlServer/src/Eventuous.SqlServer.Database/Eventuous.SqlServer.Database.csproj" />
<Project Path="src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj"/>
</Folder>
<Folder Name="/Relational/SqlServer/test/">
<Project Path="src/SqlServer/test/Eventuous.Tests.SqlServer.Database/Eventuous.Tests.SqlServer.Database.csproj" />
<Project Path="src/SqlServer/test/Eventuous.Tests.SqlServer/Eventuous.Tests.SqlServer.csproj"/>
</Folder>
<Folder Name="/Samples/"/>
Expand Down
7 changes: 7 additions & 0 deletions nuget.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<clear />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
</packageSources>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="MSBuild.Sdk.SqlProj/3.2.0">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<OutputType>Database</OutputType>
<DeployToDatabase>True</DeployToDatabase>
<SqlServerVersion>Sql160</SqlServerVersion>
<GenerateCreateScript>True</GenerateCreateScript>
<RunScriptsFromReferences>True</RunScriptsFromReferences>
<RunSqlCodeAnalysis>True</RunSqlCodeAnalysis>
<!-- For additional properties that can be set here, please refer to https://github.com/rr-wfm/MSBuild.Sdk.SqlProj#model-properties -->
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ErikEJ.DacFX.SqlServer.Rules">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="ErikEJ.DacFX.TSQLSmellSCA">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SqlServer.Dacpacs.Master" DacpacName="master" DatabaseVariableLiteralValue="master" SuppressMissingDependenciesErrors="False" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
CREATE PROCEDURE eventuous.append_events
@stream_name NVARCHAR(850),
@expected_version INT,
@created DATETIME2(7) NULL,
@messages eventuous.StreamMessage READONLY
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

-- Note: This procedure is wrapped in a transaction by the caller. This explains why there is no explicit transaction here within the procedure.

DECLARE
@current_version INT,
@stream_id INT,
@position BIGINT,
@count_messages INT,
@new_version INT;

-- capture inserted rows to compute final position
DECLARE @inserted TABLE (
GlobalPosition BIGINT
);

SELECT @count_messages = COUNT(1) FROM @messages;

EXEC eventuous.check_stream
@stream_name = @stream_name,
@expected_version = @expected_version,
@current_version = @current_version OUTPUT,
@stream_id = @stream_id OUTPUT;

SET @new_version = @current_version + @count_messages;

BEGIN TRY

/*
If another writer raced us, the unique constraint (StreamId,StreamPosition) will throw here.
Translate to WrongExpectedVersion in the CATCH below.
*/
INSERT INTO eventuous.Messages (
MessageId,
MessageType,
StreamId,
StreamPosition,
JsonData,
JsonMetadata,
Created
)
OUTPUT inserted.GlobalPosition
INTO @inserted (GlobalPosition)
SELECT
message_id,
message_type,
@stream_id,
@current_version + CAST(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS INT),
json_data,
json_metadata,
ISNULL(@created, SYSUTCDATETIME())
FROM @messages;

UPDATE s
SET [Version] = @new_version
FROM eventuous.Streams s
WHERE s.StreamId = @stream_id
AND s.[Version] = @current_version;

IF @@ROWCOUNT = 0
BEGIN
DECLARE @streamUpdateErrorMessage NVARCHAR(4000) = CONCAT(
N'WrongExpectedVersion: concurrent update detected for stream ',
CAST(@stream_id AS NVARCHAR(20))
);
;THROW 50000, @streamUpdateErrorMessage, 1;
END

END TRY
BEGIN CATCH
DECLARE @errmsg NVARCHAR(2048) = ERROR_MESSAGE();

IF ERROR_NUMBER() IN (
2627, -- Violation of PRIMARY KEY or UNIQUE constraint
2601 -- Cannot insert duplicate key row in object with unique index
)
AND (@errmsg LIKE N'%UQ_StreamIdAndStreamPosition%')
BEGIN
-- Must BEGIN with "WrongExpectedVersion" for the client detection of OptimisticConcurrencyException
DECLARE @clientMsg NVARCHAR(4000) =
N'WrongExpectedVersion: duplicate append for stream '
+ CAST(@stream_id AS NVARCHAR(20))
+ N' with expected_version=' + CAST(@expected_version AS NVARCHAR(20))
+ N'. SQL: ' + @errmsg;

THROW 50000, @clientMsg, 1;
END;
ELSE
BEGIN
;THROW;
END;
END CATCH;

-- final GlobalPosition value to return
SELECT @position = (
SELECT MAX(GlobalPosition)
FROM @inserted
);

SELECT
@new_version current_version,
@position position;
END;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE PROCEDURE eventuous.check_stream
@stream_name NVARCHAR(850),
@expected_version INT,
@current_version INT OUTPUT,
@stream_id INT OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE @customErrorMessage NVARCHAR(200);

SELECT
@current_version = [Version],
@stream_id = StreamId
FROM eventuous.Streams
WHERE StreamName = @stream_name;

IF @stream_id IS NULL
BEGIN
IF @expected_version = -2 -- Any
OR @expected_version = -1 -- NoStream
BEGIN
BEGIN TRY
SET @current_version = -1;
INSERT INTO eventuous.Streams (
StreamName,
[Version]
) VALUES (
@stream_name,
@current_version
);

SET @stream_id = SCOPE_IDENTITY();
END TRY
BEGIN CATCH
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already exists', @expected_version);
THROW 50000, @customErrorMessage, 1;
END;
ELSE
BEGIN
;THROW;
END;
END CATCH;
END;
ELSE
BEGIN
;THROW 50001, N'StreamNotFound', 1;
END;
END
ELSE
BEGIN
IF @expected_version != -2 AND @expected_version != @current_version
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
THROW 50000, @customErrorMessage, 1;
END;
END;
END;
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE PROCEDURE eventuous.read_all_forwards
@from_position BIGINT,
@count INT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

SELECT TOP (@count)
m.MessageId,
m.MessageType,
m.StreamPosition,
m.GlobalPosition,
m.JsonData,
m.JsonMetadata,
m.Created,
s.StreamName
FROM eventuous.Messages m
JOIN eventuous.Streams s ON m.StreamId = s.StreamId
WHERE m.GlobalPosition >= @from_position
ORDER BY m.GlobalPosition;
END;
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
CREATE PROCEDURE eventuous.read_stream_backwards
@stream_name NVARCHAR(850),
@from_position INT,
@count INT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE
@current_version INT,
@stream_id INT;

SELECT
@current_version = [Version],
@stream_id = StreamId
FROM eventuous.Streams
WHERE StreamName = @stream_name;

IF @stream_id IS NULL
BEGIN
;THROW 50001, 'StreamNotFound', 1;
END;

-- nothing to read / invalid request
IF @count <= 0
BEGIN
RETURN;
END;

-- Validate the starting position for backwards read.
IF @from_position < 0 -- A negative starting position is invalid
OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream
BEGIN
RETURN;
END;

SELECT TOP (@count)
MessageId,
MessageType,
StreamPosition,
GlobalPosition,
JsonData,
JsonMetadata,
Created
FROM eventuous.Messages
WHERE StreamId = @stream_id
AND StreamPosition <= @from_position
ORDER BY StreamPosition DESC;
END;
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
CREATE PROCEDURE eventuous.read_stream_forwards
@stream_name NVARCHAR(850),
@from_position INT,
@count INT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE
@current_version INT,
@stream_id INT;

SELECT
@current_version = [Version],
@stream_id = StreamId
FROM eventuous.Streams
WHERE StreamName = @stream_name;

IF @stream_id IS NULL
BEGIN
;THROW 50001, 'StreamNotFound', 1;
END;

IF @current_version < @from_position
BEGIN
RETURN;
END;

SELECT TOP (@count)
MessageId,
MessageType,
StreamPosition,
GlobalPosition,
JsonData,
JsonMetadata,
Created
FROM eventuous.Messages
WHERE StreamId = @stream_id
AND StreamPosition >= @from_position
ORDER BY StreamPosition;

END;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE PROCEDURE eventuous.read_stream_sub
@stream_id INT,
@stream_name NVARCHAR(850),
@from_position INT,
@count INT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

SELECT TOP (@count)
MessageId,
MessageType,
StreamPosition,
GlobalPosition,
JsonData,
JsonMetadata,
Created,
@stream_name StreamName
FROM eventuous.Messages
WHERE StreamId = @stream_id
AND StreamPosition >= @from_position
ORDER BY GlobalPosition;
END;
Loading