@@ -107,23 +107,21 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
107107 var document = await this . AsyncApiDocumentReader . ReadAsync ( responseStream , cancellationToken ) . ConfigureAwait ( false ) ;
108108 if ( document is not V3AsyncApiDocument v3Document ) throw new NotSupportedException ( "Synapse only supports AsyncAPI v3.0.0 at the moment" ) ;
109109 this . Document = v3Document ;
110- var operationId = this . AsyncApi . OperationRef ;
110+ if ( string . IsNullOrWhiteSpace ( this . AsyncApi . Operation ) ) throw new NullReferenceException ( "The 'operation' parameter must be set when performing an AsyncAPI v3 call" ) ;
111+ var operationId = this . AsyncApi . Operation ;
111112 if ( operationId . IsRuntimeExpression ( ) ) operationId = await this . Task . Workflow . Expressions . EvaluateAsync < string > ( operationId , this . Task . Input , this . GetExpressionEvaluationArguments ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
112113 if ( string . IsNullOrWhiteSpace ( operationId ) ) throw new NullReferenceException ( "The operation ref cannot be null or empty" ) ;
113- var operation = this . Document . Operations . FirstOrDefault ( o => o . Key == operationId ) ;
114- if ( operation . Value == null ) throw new NullReferenceException ( $ "Failed to find an operation with id '{ operationId } ' in AsyncAPI document at '{ uri } '") ;
114+ this . Operation = this . Document . Operations . FirstOrDefault ( o => o . Key == operationId ) ;
115+ if ( this . Operation . Value == null ) throw new NullReferenceException ( $ "Failed to find an operation with id '{ operationId } ' in AsyncAPI document at '{ uri } '") ;
115116 if ( this . AsyncApi . Authentication != null ) this . Authorization = await AuthorizationInfo . CreateAsync ( this . AsyncApi . Authentication , this . ServiceProvider , this . Task . Workflow . Definition , cancellationToken ) . ConfigureAwait ( false ) ;
116117 switch ( this . Operation . Value . Action )
117118 {
118119 case V3OperationAction . Receive :
119120 await this . BuildMessagePayloadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
120121 await this . BuildMessageHeadersAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
121122 break ;
122- case V3OperationAction . Send :
123-
124- break ;
125- default :
126- throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ;
123+ case V3OperationAction . Send : break ;
124+ default : throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ;
127125 }
128126 }
129127
@@ -134,16 +132,16 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
134132 /// <returns>A new awaitable <see cref="Task"/></returns>
135133 protected virtual async Task BuildMessagePayloadAsync ( CancellationToken cancellationToken = default )
136134 {
137- if ( this . AsyncApi == null || this . Operation == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
135+ if ( this . AsyncApi == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
138136 if ( this . Task . Input == null ) this . MessagePayload = new { } ;
139- if ( this . AsyncApi . Payload == null ) return ;
137+ if ( this . AsyncApi . Message ? . Payload == null ) return ;
140138 var arguments = this . GetExpressionEvaluationArguments ( ) ;
141139 if ( this . Authorization != null )
142140 {
143141 arguments ??= new Dictionary < string , object > ( ) ;
144142 arguments . Add ( "authorization" , this . Authorization ) ;
145143 }
146- this . MessagePayload = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Payload , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
144+ this . MessagePayload = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Message . Payload , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
147145 }
148146
149147 /// <summary>
@@ -153,30 +151,27 @@ protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancella
153151 /// <returns>A new awaitable <see cref="Task"/></returns>
154152 protected virtual async Task BuildMessageHeadersAsync ( CancellationToken cancellationToken = default )
155153 {
156- if ( this . AsyncApi == null || this . Operation == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
157- if ( this . AsyncApi . Headers == null ) return ;
154+ if ( this . AsyncApi == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
155+ if ( this . AsyncApi . Message ? . Headers == null ) return ;
158156 var arguments = this . GetExpressionEvaluationArguments ( ) ;
159157 if ( this . Authorization != null )
160158 {
161159 arguments ??= new Dictionary < string , object > ( ) ;
162160 arguments . Add ( "authorization" , this . Authorization ) ;
163161 }
164- this . MessageHeaders = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Headers , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
162+ this . MessageHeaders = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Message . Headers , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
165163 }
166164
167165 /// <inheritdoc/>
168166 protected override Task DoExecuteAsync ( CancellationToken cancellationToken )
169167 {
170168 if ( this . AsyncApi == null || this . Document == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
171- switch ( this . Operation . Value . Action )
169+ return this . Operation . Value . Action switch
172170 {
173- case V3OperationAction . Receive :
174- return this . DoExecutePublishOperationAsync ( cancellationToken ) ;
175- case V3OperationAction . Send :
176- return this . DoExecuteSubscribeOperationAsync ( cancellationToken ) ;
177- default :
178- throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ;
179- }
171+ V3OperationAction . Receive => this . DoExecutePublishOperationAsync ( cancellationToken ) ,
172+ V3OperationAction . Send => this . DoExecuteSubscribeOperationAsync ( cancellationToken ) ,
173+ _ => throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ,
174+ } ;
180175 }
181176
182177 /// <summary>
@@ -195,6 +190,7 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca
195190 } ;
196191 await using var result = await asyncApiClient . PublishAsync ( parameters , cancellationToken ) . ConfigureAwait ( false ) ;
197192 if ( ! result . IsSuccessful ) throw new Exception ( "Failed to execute the AsyncAPI publish operation" ) ;
193+ await this . SetResultAsync ( null , this . Task . Definition . Then , cancellationToken ) . ConfigureAwait ( false ) ;
198194 }
199195
200196 /// <summary>
@@ -205,10 +201,31 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca
205201 protected virtual async Task DoExecuteSubscribeOperationAsync ( CancellationToken cancellationToken )
206202 {
207203 if ( this . AsyncApi == null || this . Document == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
204+ if ( this . AsyncApi . Subscription == null ) throw new NullReferenceException ( "The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation" ) ;
208205 await using var asyncApiClient = this . AsyncApiClientFactory . CreateFor ( this . Document ) ;
209206 var parameters = new AsyncApiSubscribeOperationParameters ( this . Operation . Key , this . AsyncApi . Server , this . AsyncApi . Protocol ) ;
210207 await using var result = await asyncApiClient . SubscribeAsync ( parameters , cancellationToken ) . ConfigureAwait ( false ) ;
211208 if ( ! result . IsSuccessful ) throw new Exception ( "Failed to execute the AsyncAPI subscribe operation" ) ;
209+ if ( result . Messages == null )
210+ {
211+ await this . SetResultAsync ( null , this . Task . Definition . Then , cancellationToken ) . ConfigureAwait ( false ) ;
212+ return ;
213+ }
214+ var observable = result . Messages ;
215+ if ( this . AsyncApi . Subscription . Consume . For != null ) observable = observable . TakeUntil ( Observable . Timer ( this . AsyncApi . Subscription . Consume . For . ToTimeSpan ( ) ) ) ;
216+ if ( this . AsyncApi . Subscription . Consume . Amount . HasValue ) observable = observable . Take ( this . AsyncApi . Subscription . Consume . Amount . Value ) ;
217+ else if ( ! string . IsNullOrWhiteSpace ( this . AsyncApi . Subscription . Consume . While ) ) observable = observable . Select ( message => Observable . FromAsync ( async ( ) =>
218+ {
219+ var keepGoing = await this . Task . Workflow . Expressions . EvaluateConditionAsync ( this . AsyncApi . Subscription . Consume . While , this . Task . Input ! , this . GetExpressionEvaluationArguments ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
220+ return ( message , keepGoing ) ;
221+ } ) ) . Concat ( ) . TakeWhile ( i => i . keepGoing ) . Select ( i => i . message ) ;
222+ else if ( ! string . IsNullOrWhiteSpace ( this . AsyncApi . Subscription . Consume . Until ) ) observable = observable . Select ( message => Observable . FromAsync ( async ( ) =>
223+ {
224+ var keepGoing = ! ( await this . Task . Workflow . Expressions . EvaluateConditionAsync ( this . AsyncApi . Subscription . Consume . Until , this . Task . Input ! , this . GetExpressionEvaluationArguments ( ) , cancellationToken ) . ConfigureAwait ( false ) ) ;
225+ return ( message , keepGoing ) ;
226+ } ) ) . Concat ( ) . TakeWhile ( i => i . keepGoing ) . Select ( i => i . message ) ;
227+ var messages = await observable . ToAsyncEnumerable ( ) . ToListAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
228+ await this . SetResultAsync ( messages , this . Task . Definition . Then , cancellationToken ) . ConfigureAwait ( false ) ;
212229 }
213230
214231}
0 commit comments