Skip to content

Commit 09a3c18

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents d134cda + 080f523 commit 09a3c18

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

hsweb-core/src/main/java/org/hswebframework/web/event/DefaultAsyncEvent.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,23 @@ public class DefaultAsyncEvent implements AsyncEvent {
1515

1616
public synchronized void async(Publisher<?> publisher) {
1717
hasListener = true;
18-
this.async = async.then(Mono.from(publisher).then());
18+
this.async = async.then(Mono.fromDirect(publisher).then());
1919
}
2020

2121
@Override
2222
public synchronized void first(Publisher<?> publisher) {
2323
hasListener = true;
24-
this.first = Mono.from(publisher).then(first);
24+
this.first = Mono.fromDirect(publisher).then(first);
2525
}
2626

2727
@Override
2828
public void transformFirst(Function<Mono<?>, Publisher<?>> mapper) {
29-
this.first = Mono.from(mapper.apply(this.first));
29+
this.first = Mono.fromDirect(mapper.apply(this.first));
3030
}
3131

3232
@Override
3333
public void transform(Function<Mono<?>, Publisher<?>> mapper) {
34-
this.async = Mono.from(mapper.apply(this.async));
34+
this.async = Mono.fromDirect(mapper.apply(this.async));
3535
}
3636

3737
@Override
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.hswebframework.web.event;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.junit.Test;
5+
import reactor.core.publisher.Flux;
6+
import reactor.core.publisher.Mono;
7+
8+
@Slf4j
9+
public class EventTest {
10+
11+
@Test
12+
public void testMonoFrom() {
13+
Flux<String> source = Flux.just("1", "2", "3").doOnNext(s -> log.info("get {}", s));
14+
15+
Mono.from(source).subscribe();
16+
17+
Mono.fromDirect(source).subscribe();
18+
}
19+
20+
@Test
21+
public void testAsync() {
22+
Flux<String> source = Flux.just("1", "2", "3").doOnNext(s -> log.info("get {}", s));
23+
AsyncEvent event = new DefaultAsyncEvent();
24+
event.async(source);
25+
event.getAsync().subscribe();
26+
}
27+
}

0 commit comments

Comments
 (0)