Skip to content

Commit aa9d57e

Browse files
committed
[#2873] Execute a reactive flush before a native query execution
Sometimes, Hibernate ORM needs to flush the session before executing a native query. This commit makes sure that a reactive operation is executed instead of the blocking one from Hiberante ORM.
1 parent 08be46c commit aa9d57e

File tree

5 files changed

+82
-12
lines changed

5 files changed

+82
-12
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* Emulate {@link org.hibernate.query.spi.AbstractSelectionQuery}.
5252
* <p>
5353
* Hibernate Reactive implementations already extend another class,
54-
* they cannot extends {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
54+
* they cannot extend {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
5555
* This approach allows us to avoid duplicating code.
5656
* </p>
5757
* @param <R>
@@ -75,7 +75,7 @@ public class ReactiveAbstractSelectionQuery<R> {
7575

7676
private Set<String> fetchProfiles;
7777

78-
private final Runnable beforeQuery;
78+
private final Supplier<CompletionStage<Void>> beforeQuery;
7979

8080
private final Consumer<Boolean> afterQuery;
8181
private final Function<List<R>, R> uniqueElement;
@@ -94,7 +94,7 @@ public ReactiveAbstractSelectionQuery(
9494
Supplier<DomainParameterXref> getDomainParameterXref,
9595
Supplier<Class<?>> getResultType,
9696
Supplier<String> getQueryString,
97-
Runnable beforeQuery,
97+
Supplier<CompletionStage<Void>> beforeQuery,
9898
Consumer<Boolean> afterQuery,
9999
Function<List<R>, R> uniqueElement) {
100100
this(
@@ -122,7 +122,7 @@ public ReactiveAbstractSelectionQuery(
122122
Supplier<DomainParameterXref> getDomainParameterXref,
123123
Supplier<Class<?>> getResultType,
124124
Supplier<String> getQueryString,
125-
Runnable beforeQuery,
125+
Supplier<CompletionStage<Void>> beforeQuery,
126126
Consumer<Boolean> afterQuery,
127127
Function<List<R>, R> uniqueElement,
128128
InterpretationsKeySource interpretationsKeySource) {
@@ -201,8 +201,8 @@ private LockOptions getLockOptions() {
201201

202202
public CompletionStage<List<R>> reactiveList() {
203203
final Set<String> profiles = applyProfiles();
204-
beforeQuery.run();
205-
return doReactiveList()
204+
return beforeQuery.get()
205+
.thenCompose( v -> doReactiveList() )
206206
.handle( (list, error) -> {
207207
handleException( error );
208208
return list;

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
4444
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
4545
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
46+
import org.hibernate.reactive.session.ReactiveSession;
4647
import org.hibernate.sql.exec.spi.Callback;
4748
import org.hibernate.type.BasicTypeReference;
4849

@@ -53,8 +54,11 @@
5354
import jakarta.persistence.LockModeType;
5455
import jakarta.persistence.Parameter;
5556
import jakarta.persistence.TemporalType;
56-
import jakarta.persistence.metamodel.Type;
5757
import jakarta.persistence.metamodel.SingularAttribute;
58+
import jakarta.persistence.metamodel.Type;
59+
60+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
61+
5862

5963
public class ReactiveNativeQueryImpl<R> extends NativeQueryImpl<R>
6064
implements ReactiveNativeQueryImplementor<R> {
@@ -124,13 +128,46 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
124128
this::getNull,
125129
this::getNull,
126130
this::getQueryString,
127-
this::beforeQuery,
131+
this::reactiveBeforeQuery,
128132
this::afterQuery,
129133
AbstractSelectionQuery::uniqueElement,
130134
null
131135
);
132136
}
133137

138+
protected CompletionStage<Void> reactiveBeforeQuery() {
139+
getQueryParameterBindings().validate();
140+
141+
final var session = getSession();
142+
session.prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().getLockMode() ) );
143+
return reactivePrepareForExecution()
144+
.thenAccept( v -> {
145+
prepareSessionFlushMode( session );
146+
prepareSessionCacheMode( session );
147+
} );
148+
}
149+
150+
protected CompletionStage<Void> reactivePrepareForExecution() {
151+
final var spaces = getSynchronizedQuerySpaces();
152+
if ( spaces == null || spaces.isEmpty() ) {
153+
// We need to flush. The query itself is not required to execute in a
154+
// transaction; if there is no transaction, the flush would throw a
155+
// TransactionRequiredException which would potentially break existing
156+
// apps, so we only do the flush if a transaction is in progress.
157+
if ( shouldFlush() ) {
158+
return ( (ReactiveSession) getSession() )
159+
.reactiveFlush()
160+
.thenAccept( v -> resetCallback() );
161+
}
162+
// Reset the callback before every execution
163+
resetCallback();
164+
}
165+
// Otherwise, the application specified query spaces via the Hibernate
166+
// SynchronizeableQuery and so the query will already perform a partial
167+
// flush according to the defined query spaces - no need for a full flush.
168+
return voidFuture();
169+
}
170+
134171
private CompletionStage<List<R>> doReactiveList() {
135172
return reactiveSelectPlan().reactivePerformList( this );
136173
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmQueryImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@
7373
import jakarta.persistence.TemporalType;
7474
import jakarta.persistence.metamodel.Type;
7575

76+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
77+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
78+
7679
/**
7780
* A reactive {@link SqmQueryImpl}
7881
*/
@@ -125,12 +128,22 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
125128
this::getDomainParameterXref,
126129
this::getResultType,
127130
this::getQueryString,
128-
this::beforeQuery,
131+
this::reactiveBeforeQuery,
129132
this::afterQuery,
130133
AbstractSelectionQuery::uniqueElement
131134
);
132135
}
133136

137+
private CompletionStage<Void> reactiveBeforeQuery() {
138+
try {
139+
beforeQuery();
140+
return voidFuture();
141+
}
142+
catch (Throwable e) {
143+
return failedFuture( e );
144+
}
145+
}
146+
134147
@Override
135148
public CompletionStage<R> reactiveUnique() {
136149
return selectionQueryDelegate.reactiveUnique();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import java.util.stream.Stream;
4949

5050
import static org.hibernate.query.spi.SqlOmittingQueryOptions.omitSqlQueryOptions;
51+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
52+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
5153

5254
/**
5355
* A reactive {@link SqmSelectionQueryImpl}
@@ -84,15 +86,24 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
8486
this::getDomainParameterXref,
8587
this::getResultType,
8688
this::getQueryString,
87-
this::beforeQuery,
89+
this::reactiveBeforeQuery,
8890
this::afterQuery,
8991
AbstractSelectionQuery::uniqueElement
9092
);
9193
}
9294

95+
private CompletionStage<Void> reactiveBeforeQuery() {
96+
try {
97+
beforeQuery();
98+
return voidFuture();
99+
}
100+
catch (Throwable e) {
101+
return failedFuture( e );
102+
}
103+
}
104+
93105
private CompletionStage<List<R>> doReactiveList() {
94-
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions()
95-
.findGreatestLockMode() ) );
106+
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().findGreatestLockMode() ) );
96107

97108
final SqmSelectStatement<?> sqmStatement = getSqmStatement();
98109
final boolean containsCollectionFetches = sqmStatement.containsCollectionFetches();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,11 @@ public class ReactiveStatelessSessionImpl extends StatelessSessionImpl implement
133133
private final ReactiveConnection reactiveConnection;
134134
private final ReactiveStatelessSessionImpl batchingHelperSession;
135135
private final PersistenceContext persistenceContext;
136+
private final boolean connectionProvided;
136137

137138
public ReactiveStatelessSessionImpl(SessionFactoryImpl factory, SessionCreationOptions options, ReactiveConnection connection) {
138139
super( factory, options );
140+
connectionProvided = options.getConnection() != null;
139141
reactiveConnection = connection;
140142
persistenceContext = new ReactivePersistenceContextAdapter( super.getPersistenceContext() );
141143
batchingHelperSession = new ReactiveStatelessSessionImpl( factory, options, reactiveConnection, persistenceContext );
@@ -151,6 +153,7 @@ private ReactiveStatelessSessionImpl(
151153
ReactiveConnection connection,
152154
PersistenceContext persistenceContext) {
153155
super( factory, options );
156+
connectionProvided = options.getConnection() != null;
154157
this.persistenceContext = persistenceContext;
155158
// StatelessSession should not allow JDBC batching, because that would change
156159
// its "immediate synchronous execution" model into something more like transactional
@@ -1020,6 +1023,12 @@ public void prepareForQueryExecution(boolean requiresTxn) {
10201023
// }
10211024
}
10221025

1026+
@Override
1027+
public boolean isTransactionInProgress() {
1028+
return connectionProvided || ( isOpenOrWaitingForAutoClose()
1029+
&& reactiveConnection.isTransactionInProgress() );
1030+
}
1031+
10231032
@Override
10241033
public <R> ReactiveSqmQueryImplementor<R> createReactiveQuery(String queryString, Class<R> expectedResultType) {
10251034
checkOpen();

0 commit comments

Comments
 (0)