Skip to content

Commit 737d77a

Browse files
committed
Fix @transactional support on functions returning Flow
Closes gh-26052
1 parent 1f13516 commit 737d77a

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,9 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
352352
}
353353
return new ReactiveTransactionSupport(adapter);
354354
});
355-
Publisher<?> publisher = (Publisher<?>) txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
356-
return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(publisher) :
357-
KotlinDelegate.awaitSingleOrNull(publisher, ((CoroutinesInvocationCallback) invocation).getContinuation())) : publisher);
355+
Object result = txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
356+
return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher<?>) result) :
357+
KotlinDelegate.awaitSingleOrNull((Publisher<?>) result, ((CoroutinesInvocationCallback) invocation).getContinuation())) : result);
358358
}
359359

360360
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);

spring-tx/src/test/kotlin/org/springframework/transaction/annotation/CoroutinesAnnotationTransactionInterceptorTests.kt

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.springframework.transaction.annotation
1818

1919
import kotlinx.coroutines.delay
20+
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.flow
22+
import kotlinx.coroutines.flow.toList
2023
import kotlinx.coroutines.runBlocking
2124
import org.assertj.core.api.Assertions
2225
import org.junit.jupiter.api.Test
@@ -83,14 +86,38 @@ class CoroutinesAnnotationTransactionInterceptorTests {
8386
runBlocking {
8487
try {
8588
proxy.suspendingValueFailure()
89+
Assertions.fail("No exception thrown as expected")
8690
}
8791
catch (ex: IllegalStateException) {
8892
}
89-
9093
}
9194
assertReactiveGetTransactionAndRollbackCount(1)
9295
}
9396

97+
@Test
98+
fun suspendingFlowSuccess() {
99+
val proxyFactory = ProxyFactory()
100+
proxyFactory.setTarget(TestWithCoroutines())
101+
proxyFactory.addAdvice(TransactionInterceptor(rtm, source))
102+
val proxy = proxyFactory.proxy as TestWithCoroutines
103+
runBlocking {
104+
Assertions.assertThat(proxy.suspendingFlowSuccess().toList()).containsExactly("foo", "foo")
105+
}
106+
assertReactiveGetTransactionAndCommitCount(1)
107+
}
108+
109+
@Test
110+
fun flowSuccess() {
111+
val proxyFactory = ProxyFactory()
112+
proxyFactory.setTarget(TestWithCoroutines())
113+
proxyFactory.addAdvice(TransactionInterceptor(rtm, source))
114+
val proxy = proxyFactory.proxy as TestWithCoroutines
115+
runBlocking {
116+
Assertions.assertThat(proxy.flowSuccess().toList()).containsExactly("foo", "foo")
117+
}
118+
assertReactiveGetTransactionAndCommitCount(1)
119+
}
120+
94121
private fun assertReactiveGetTransactionAndCommitCount(expectedCount: Int) {
95122
Assertions.assertThat(rtm.begun).isEqualTo(expectedCount)
96123
Assertions.assertThat(rtm.commits).isEqualTo(expectedCount)
@@ -122,5 +149,22 @@ class CoroutinesAnnotationTransactionInterceptorTests {
122149
delay(10)
123150
throw IllegalStateException()
124151
}
152+
153+
open fun flowSuccess(): Flow<String> {
154+
return flow {
155+
emit("foo")
156+
delay(10)
157+
emit("foo")
158+
}
159+
}
160+
161+
open suspend fun suspendingFlowSuccess(): Flow<String> {
162+
delay(10)
163+
return flow {
164+
emit("foo")
165+
delay(10)
166+
emit("foo")
167+
}
168+
}
125169
}
126170
}

0 commit comments

Comments
 (0)