Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions Documentation/chronicle/event-store-subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Event Store Subscriptions

Event store subscriptions let one event store consume events produced in another event store. You configure subscriptions on a target event store and point each subscription to a source event store and a set of event types.

## list

Lists all event store subscriptions configured for the target event store.

```bash
cratis chronicle event-store-subscriptions list --event-store <TARGET_EVENT_STORE>
```

### Examples

List subscriptions for the `system` event store:

```bash
cratis chronicle event-store-subscriptions list --event-store system
```

List in plain format:

```bash
cratis chronicle event-store-subscriptions list --event-store system --output plain
```

## add

Adds an event store subscription to the target event store.

```bash
cratis chronicle event-store-subscriptions add <SUBSCRIPTION_ID> <SOURCE_EVENT_STORE> <EVENT_TYPES> --event-store <TARGET_EVENT_STORE>
```

### Arguments

| Argument | Description |
|---|---|
| `SUBSCRIPTION_ID` | Unique identifier for the subscription. |
| `SOURCE_EVENT_STORE` | Event store to subscribe from. |
| `EVENT_TYPES` | Comma-separated list of event type IDs to include. |

### Examples

Add a subscription from `default` to `system` for one event type:

```bash
cratis chronicle event-store-subscriptions add orders-from-default default MyCompany.Sales.OrderPlaced --event-store system
```

Add a subscription with multiple event types:

```bash
cratis chronicle event-store-subscriptions add sales-feed default MyCompany.Sales.OrderPlaced,MyCompany.Sales.OrderCancelled --event-store system
```

## remove

Removes an event store subscription from the target event store.

```bash
cratis chronicle event-store-subscriptions remove <SUBSCRIPTION_ID> --event-store <TARGET_EVENT_STORE>
```

The command prompts for confirmation before proceeding. Pass `--yes` to skip the prompt in automated workflows.

### Arguments

| Argument | Description |
|---|---|
| `SUBSCRIPTION_ID` | Identifier of the subscription to remove. Use `event-store-subscriptions list` to retrieve IDs. |

### Options

| Flag | Description |
|---|---|
| `-y, --yes` | Skip confirmation prompt. |

### Examples

Remove a subscription interactively:

```bash
cratis chronicle event-store-subscriptions remove orders-from-default --event-store system
```

Remove without confirmation:

```bash
cratis chronicle event-store-subscriptions remove orders-from-default --event-store system --yes
```
1 change: 1 addition & 0 deletions Documentation/chronicle/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Commands that operate within a specific event store or namespace accept the foll
| `event-types` | List and inspect registered event type definitions. |
| `events` | Query events from an event sequence or retrieve the tail sequence number. |
| `observers` | List, inspect, replay, and retry observers. |
| `event-store-subscriptions` | Manage cross-event-store subscriptions for event forwarding. |
| `failed-partitions` | List and inspect partitions where an observer has failed. |
| `projections` | List and inspect projection definitions. |
| `read-models` | List, query, and inspect read model instances and snapshots. |
Expand Down
2 changes: 2 additions & 0 deletions Documentation/chronicle/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
href: events.md
- name: Observers
href: observers.md
- name: Event Store Subscriptions
href: event-store-subscriptions.md
- name: Failed Partitions
href: failed-partitions.md
- name: Projections
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using context = Cratis.Cli.Integration.Chronicle.for_EventStoreSubscriptions.when_adding_and_removing_event_store_subscription.context;

namespace Cratis.Cli.Integration.Chronicle.for_EventStoreSubscriptions;

[Collection(ChronicleCollection.Name)]
public class when_adding_and_removing_event_store_subscription(context context) : CliGiven<context>(context)
{
public class context : given.a_connected_cli
{
public CliCommandResult AddResult = null!;
public CliCommandResult RemoveResult = null!;
public bool SubscriptionAppearedInList;
public string SubscriptionId = null!;

async Task Because()
{
SubscriptionId = $"integration-test-subscription-{Guid.NewGuid():N}";
AddResult = await RunCliAsync(
"chronicle",
"event-store-subscriptions",
"add",
SubscriptionId,
"system",
"Integration.Test.EventType",
"--event-store",
"system");

var listResult = await RunCliAsync("chronicle", "event-store-subscriptions", "list", "--event-store", "system");
var subscriptions = JsonDocument.Parse(listResult.StandardOutput).RootElement;
var testSubscription = subscriptions.EnumerateArray()
.FirstOrDefault(subscription => subscription.GetProperty("identifier").GetString() == SubscriptionId);
SubscriptionAppearedInList = testSubscription.ValueKind != JsonValueKind.Undefined;

if (SubscriptionAppearedInList)
{
RemoveResult = await RunCliAsync("chronicle", "event-store-subscriptions", "remove", SubscriptionId, "--event-store", "system", "--yes");
}
}
}

[Fact] void should_return_success_for_add() => Context.AddResult.ExitCode.ShouldEqual(ExitCodes.Success);

[Fact] void should_contain_added_message() => Context.AddResult.StandardOutput.ShouldContain("added");

[Fact] void should_show_subscription_in_list() => Context.SubscriptionAppearedInList.ShouldBeTrue();

[Fact] void should_return_success_for_remove() => Context.RemoveResult.ExitCode.ShouldEqual(ExitCodes.Success);

[Fact] void should_contain_removed_message() => Context.RemoveResult.StandardOutput.ShouldContain("removed");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Observation.EventStoreSubscriptions;

namespace Cratis.Cli.Commands.Chronicle.EventStoreSubscriptions;

/// <summary>
/// Adds an event store subscription.
/// </summary>
[LlmDescription("Adds an event store subscription to a target event store.")]
[CliCommand("add", "Add an event store subscription", Branch = typeof(ChronicleBranch.EventStoreSubscriptions))]
[CliExample("chronicle", "event-store-subscriptions", "add", "orders-from-default", "default", "MyCompany.Sales.OrderPlaced")]
[LlmOutputAdvice("plain", "Plain outputs a simple confirmation message.")]
[LlmOption("<SUBSCRIPTION_ID>", "string", "The unique subscription identifier (positional)")]
[LlmOption("<SOURCE_EVENT_STORE>", "string", "The source event store to subscribe to (positional)")]
[LlmOption("<EVENT_TYPES>", "string", "Comma-separated event types to include in the subscription (positional)")]
public class AddEventStoreSubscriptionCommand : ChronicleCommand<AddEventStoreSubscriptionSettings>
{
/// <inheritdoc/>
protected override async Task<int> ExecuteCommandAsync(IServices services, AddEventStoreSubscriptionSettings settings, string format)
{
var eventTypes = settings.EventTypes
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
.Select(eventType => new Cratis.Chronicle.Contracts.Events.EventType { Id = eventType })
.ToList();

await services.EventStoreSubscriptions.Add(new AddEventStoreSubscriptions
{
TargetEventStore = settings.ResolveEventStore(),
Subscriptions =
[
new EventStoreSubscriptionDefinition
{
Identifier = settings.SubscriptionId,
SourceEventStore = settings.SourceEventStore,
EventTypes = eventTypes
}
]
});

OutputFormatter.WriteMessage(format, $"Event store subscription '{settings.SubscriptionId}' added.");
return ExitCodes.Success;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Cli.Commands.Chronicle.EventStoreSubscriptions;

/// <summary>
/// Settings for adding an event store subscription.
/// </summary>
public class AddEventStoreSubscriptionSettings : EventStoreSettings
{
/// <summary>
/// Gets or sets the subscription identifier.
/// </summary>
[CommandArgument(0, "<SUBSCRIPTION_ID>")]
[Description("Unique identifier for the subscription")]
public string SubscriptionId { get; set; } = string.Empty;

/// <summary>
/// Gets or sets the source event store.
/// </summary>
[CommandArgument(1, "<SOURCE_EVENT_STORE>")]
[Description("Source event store to subscribe from")]
public string SourceEventStore { get; set; } = string.Empty;

/// <summary>
/// Gets or sets a comma-separated list of event types.
/// </summary>
[CommandArgument(2, "<EVENT_TYPES>")]
[Description("Comma-separated event types to subscribe to")]
public string EventTypes { get; set; } = string.Empty;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Observation.EventStoreSubscriptions;

namespace Cratis.Cli.Commands.Chronicle.EventStoreSubscriptions;

/// <summary>
/// Lists event store subscriptions for a target event store.
/// </summary>
[LlmDescription("Lists event store subscriptions configured for the target event store.")]
[CliCommand("list", "List event store subscriptions", Branch = typeof(ChronicleBranch.EventStoreSubscriptions))]
[CliExample("chronicle", "event-store-subscriptions", "list", "--event-store", "system")]
[LlmOutputAdvice("plain", "Use plain for consistency with other listing commands.")]
public class ListEventStoreSubscriptionsCommand : ChronicleCommand<EventStoreSettings>
{
/// <inheritdoc/>
protected override async Task<int> ExecuteCommandAsync(IServices services, EventStoreSettings settings, string format)
{
var subscriptions = await services.EventStoreSubscriptions.GetSubscriptions(new GetEventStoreSubscriptionsRequest
{
TargetEventStore = settings.ResolveEventStore()
});

OutputFormatter.Write(
format,
subscriptions,
["Identifier", "SourceEventStore", "EventTypes"],
subscription =>
[
subscription.Identifier,
subscription.SourceEventStore,
string.Join(", ", (subscription.EventTypes ?? []).Select(eventType => eventType.Id))
]);

return ExitCodes.Success;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Observation.EventStoreSubscriptions;

namespace Cratis.Cli.Commands.Chronicle.EventStoreSubscriptions;

/// <summary>
/// Removes an event store subscription.
/// </summary>
[LlmDescription("Removes an event store subscription from a target event store. Destructive — prompts for confirmation unless --yes is specified.")]
[CliCommand("remove", "Remove an event store subscription", Branch = typeof(ChronicleBranch.EventStoreSubscriptions), DynamicCompletion = "event-store-subscriptions")]
[CliExample("chronicle", "event-store-subscriptions", "remove", "orders-from-default")]
[LlmOutputAdvice("plain", "Plain outputs a simple confirmation message.")]
[LlmOption("<SUBSCRIPTION_ID>", "string", "The unique subscription identifier to remove (positional)")]
public class RemoveEventStoreSubscriptionCommand : ChronicleCommand<RemoveEventStoreSubscriptionSettings>
{
/// <inheritdoc/>
protected override async Task<int> ExecuteCommandAsync(IServices services, RemoveEventStoreSubscriptionSettings settings, string format)
{
if (!ConfirmationHelper.ShouldProceed(settings, $"Are you sure you want to remove event store subscription '{settings.SubscriptionId}'?"))
{
OutputFormatter.WriteMessage(format, "Aborted.");
return ExitCodes.Success;
}

await services.EventStoreSubscriptions.Remove(new RemoveEventStoreSubscriptions
{
TargetEventStore = settings.ResolveEventStore(),
SubscriptionIds = [settings.SubscriptionId]
});

OutputFormatter.WriteMessage(format, $"Event store subscription '{settings.SubscriptionId}' removed.");
return ExitCodes.Success;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Cli.Commands.Chronicle.EventStoreSubscriptions;

/// <summary>
/// Settings for removing an event store subscription.
/// </summary>
public class RemoveEventStoreSubscriptionSettings : EventStoreSettings
{
/// <summary>
/// Gets or sets the subscription identifier.
/// </summary>
[CommandArgument(0, "<SUBSCRIPTION_ID>")]
[Description("Subscription identifier (from 'cratis chronicle event-store-subscriptions list')")]
public string SubscriptionId { get; set; } = string.Empty;
}
13 changes: 13 additions & 0 deletions Source/Cli/Commands/Completions/DynamicCompleteCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Jobs;
using Cratis.Chronicle.Contracts.Observation.EventStoreSubscriptions;
using Cratis.Cli.Commands.Chronicle;

namespace Cratis.Cli.Commands.Completions;
Expand Down Expand Up @@ -155,6 +156,18 @@ protected override async Task<int> ExecuteCommandAsync(IServices services, Dynam

break;

case "event-store-subscriptions":
var subscriptions = await services.EventStoreSubscriptions.GetSubscriptions(new GetEventStoreSubscriptionsRequest
{
TargetEventStore = eventStore
});
foreach (var subscription in subscriptions ?? [])
{
Console.WriteLine(subscription.Identifier);
}

break;

case "contexts":
var config = CliConfiguration.Load();
foreach (var name in config.Contexts.Keys)
Expand Down
6 changes: 5 additions & 1 deletion Source/Cli/Registration/ChronicleBranch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Cratis.Cli.Registration;
/// Chronicle server commands branch. Contains all sub-branches for event stores,
/// observers, events, etc.
/// </summary>
[CliBranch("chronicle", "Commands for interacting with a Chronicle server. Contains sub-branches for event stores, namespaces, event types, events, observers, projections, read models, jobs, failed partitions, recommendations, identities, auth, users, and applications.")]
[CliBranch("chronicle", "Commands for interacting with a Chronicle server. Contains sub-branches for event stores, namespaces, event types, events, observers, event store subscriptions, projections, read models, jobs, failed partitions, recommendations, identities, auth, users, and applications.")]
public static class ChronicleBranch
{
/// <summary>Event store management.</summary>
Expand All @@ -32,6 +32,10 @@ public static class Events { }
[CliBranch("observers", "Manage observers (projections, reactors, reducers, client observers). Supports listing, inspecting, replaying, and recovering failed partitions.")]
public static class Observers { }

/// <summary>Event store subscription management.</summary>
[CliBranch("event-store-subscriptions", "Manage event store subscriptions for cross-store event flow. Supports listing, adding, and removing subscriptions.")]
public static class EventStoreSubscriptions { }

/// <summary>Failed partition inspection.</summary>
[CliBranch("failed-partitions", "List and inspect observer partitions that have failed and are paused. Use to diagnose and recover from processing failures.")]
public static class FailedPartitions { }
Expand Down
Loading