Skip to content

Commit 14d8597

Browse files
raphwartembilan
authored andcommitted
GH-3872: Add PostgresSubscribableChannel implementation
Fixes #3872 Adds an implementation for a Postres-compatible notification listener for a `JdbcChannelMessageStore`. * Introduce `PostgresChannelMessageTableSubscriber.Subscription` contract * Implement a `PostgresSubscribableChannel` * Add Javadoc and fix code formatting issues. * Handle temporary connection loss. * Add tests for `PostgresChannelMessageTableSubscriber` * Replace NOTIFY command with pg_notify function call. * Code style clean up * Fix some Javadocs * Use `DataSourceInitializer` in the `PostgresChannelMessageTableSubscriberTests` to populate scripts * Use Java text block for DB scripts
1 parent 695e156 commit 14d8597

File tree

9 files changed

+758
-0
lines changed

9 files changed

+758
-0
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ project('spring-integration-jdbc') {
712712
dependencies {
713713
api project(':spring-integration-core')
714714
api 'org.springframework:spring-jdbc'
715+
optionalApi "org.postgresql:postgresql:$postgresVersion"
715716

716717
testImplementation "com.h2database:h2:$h2Version"
717718
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
@@ -721,6 +722,7 @@ project('spring-integration-jdbc') {
721722
testImplementation "mysql:mysql-connector-java:$mysqlVersion"
722723
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
723724
testImplementation 'org.testcontainers:mysql'
725+
testImplementation 'org.testcontainers:postgresql'
724726

725727
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
726728
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import java.sql.SQLException;
20+
import java.util.Properties;
21+
22+
import org.postgresql.jdbc.PgConnection;
23+
24+
/**
25+
* A connection supplier for a {@link PgConnection} to a Postgres database that is
26+
* to be used for a {@link PostgresChannelMessageTableSubscriber}.
27+
* <p/>
28+
* The supplied connection must <b>not</b> be read from a shared connection pool, typically
29+
* represented by a {@link javax.sql.DataSource}. If a shared connection pool is used, this
30+
* pool might reclaim a connection that was not closed within a given time frame. This
31+
* becomes a problem as a {@link PostgresChannelMessageTableSubscriber} requires a dedicated
32+
* {@link java.sql.Connection} to receive notifications from the Postgres database. This
33+
* connection needs to remain open over a longer period of time. Typically, a
34+
* {@link PgConnection} should be created directly via
35+
* {@link java.sql.Driver#connect(String, Properties)} and a subsequent call
36+
* to {@link java.sql.Connection#unwrap(Class)}.
37+
*
38+
* @author Rafael Winterhalter
39+
*
40+
* @since 6.0
41+
*
42+
* @see PostgresChannelMessageTableSubscriber
43+
*/
44+
@FunctionalInterface
45+
public interface PgConnectionSupplier {
46+
47+
/**
48+
* Supply an open, un-pooled connection to a Postgres database.
49+
* @return A dedicated connection to a Postgres database for listening.
50+
* @throws SQLException If the connection could not be established.
51+
*/
52+
PgConnection get() throws SQLException;
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import java.sql.SQLException;
20+
import java.sql.Statement;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.postgresql.PGNotification;
32+
import org.postgresql.jdbc.PgConnection;
33+
34+
import org.springframework.context.SmartLifecycle;
35+
import org.springframework.core.log.LogAccessor;
36+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
37+
import org.springframework.integration.util.UUIDConverter;
38+
import org.springframework.lang.Nullable;
39+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
40+
import org.springframework.util.Assert;
41+
42+
/**
43+
* A subscriber for new messages being received by a Postgres database via a
44+
* {@link JdbcChannelMessageStore}. This subscriber implementation is using
45+
* Postgres' <i>LISTEN</i>/<i>NOTIFY</i> mechanism to allow for receiving push
46+
* notifications for new messages what functions even if a message is written
47+
* and read from different JVMs or {@link JdbcChannelMessageStore}s.
48+
* <p/>
49+
* Note that this subscriber requires an unshared {@link PgConnection} which
50+
* remains open for any lifecycle. It is therefore recommended to execute a single
51+
* subscriber for any JVM. For this reason, this subscriber is region-agnostic.
52+
* To listen for messages for a given region and group id, use a
53+
* {@link Subscription} and register it with this subscriber.
54+
* <p/>
55+
* In order to function, the Postgres database that is used must define a trigger
56+
* for sending notifications upon newly arrived messages. This trigger is defined
57+
* in the <i>schema-postgresql.sql</i> file within this artifact but commented
58+
* out.
59+
*
60+
* @author Rafael Winterhalter
61+
* @author Artem Bilan
62+
*
63+
* @since 6.0
64+
*/
65+
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
66+
67+
private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
68+
69+
private final Map<String, Set<Subscription>> subscriptions = new ConcurrentHashMap<>();
70+
71+
private final PgConnectionSupplier connectionSupplier;
72+
73+
private final String tablePrefix;
74+
75+
@Nullable
76+
private ExecutorService executor;
77+
78+
private CountDownLatch latch = new CountDownLatch(0);
79+
80+
private Future<?> future = CompletableFuture.completedFuture(null);
81+
82+
@Nullable
83+
private volatile PgConnection connection;
84+
85+
/**
86+
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
87+
* @param connectionSupplier The connection supplier for the targeted Postgres database.
88+
*/
89+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
90+
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
91+
}
92+
93+
/**
94+
* Create a new subscriber.
95+
* @param tablePrefix The table prefix of the {@link JdbcChannelMessageStore} to subscribe to.
96+
* @param connectionSupplier The connection supplier for the targeted Postgres database.
97+
*/
98+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
99+
Assert.notNull(connectionSupplier, "A connectionSupplier must be provided.");
100+
Assert.notNull(tablePrefix, "A table prefix must be set.");
101+
this.connectionSupplier = connectionSupplier;
102+
this.tablePrefix = tablePrefix;
103+
}
104+
105+
/**
106+
* Define an executor to use for listening for new messages. Note that the Postgres SQL driver implements
107+
* listening for notifications as a blocking operation which will permanently block a thread of this executor
108+
* while running.
109+
* @param executor The executor to use or {@code null} if an executor should be created by this class.
110+
*/
111+
public void setExecutor(@Nullable ExecutorService executor) {
112+
this.executor = executor;
113+
}
114+
115+
/**
116+
* Add a new subscription to this subscriber.
117+
* @param subscription The subscription to register.
118+
* @return {@code true} if the subscription was not already added.
119+
*/
120+
public boolean subscribe(Subscription subscription) {
121+
String subscriptionKey = subscription.getRegion() + " " + getKey(subscription.getGroupId());
122+
Set<Subscription> subscriptions =
123+
this.subscriptions.computeIfAbsent(subscriptionKey, __ -> ConcurrentHashMap.newKeySet());
124+
return subscriptions.add(subscription);
125+
}
126+
127+
/**
128+
* Remove a previous subscription from this subscriber.
129+
* @param subscription The subscription to remove.
130+
* @return {@code true} if the subscription was previously registered and is now removed.
131+
*/
132+
public boolean unsubscribe(Subscription subscription) {
133+
String subscriptionKey = subscription.getRegion() + " " + getKey(subscription.getGroupId());
134+
Set<Subscription> subscriptions = this.subscriptions.get(subscriptionKey);
135+
return subscriptions != null && subscriptions.remove(subscription);
136+
}
137+
138+
@Override
139+
public synchronized void start() {
140+
if (this.latch.getCount() > 0) {
141+
return;
142+
}
143+
ExecutorService executor = this.executor;
144+
if (executor == null) {
145+
CustomizableThreadFactory threadFactory =
146+
new CustomizableThreadFactory("postgres-channel-message-table-subscriber-");
147+
threadFactory.setDaemon(true);
148+
executor = Executors.newSingleThreadExecutor(threadFactory);
149+
this.executor = executor;
150+
}
151+
this.latch = new CountDownLatch(1);
152+
this.future = executor.submit(() -> {
153+
try {
154+
while (isActive()) {
155+
try {
156+
PgConnection conn = this.connectionSupplier.get();
157+
try (Statement stmt = conn.createStatement()) {
158+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
159+
}
160+
catch (Throwable t) {
161+
try {
162+
conn.close();
163+
}
164+
catch (Throwable suppressed) {
165+
t.addSuppressed(suppressed);
166+
}
167+
throw t;
168+
}
169+
this.subscriptions.values()
170+
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
171+
try {
172+
this.connection = conn;
173+
while (isActive()) {
174+
PGNotification[] notifications = conn.getNotifications(0);
175+
// Unfortunately, there is no good way of interrupting a notification
176+
// poll but by closing its connection.
177+
if (!isActive()) {
178+
return;
179+
}
180+
if (notifications != null) {
181+
for (PGNotification notification : notifications) {
182+
String parameter = notification.getParameter();
183+
Set<Subscription> subscriptions = this.subscriptions.get(parameter);
184+
if (subscriptions == null) {
185+
continue;
186+
}
187+
for (Subscription subscription : subscriptions) {
188+
subscription.notifyUpdate();
189+
}
190+
}
191+
}
192+
}
193+
}
194+
finally {
195+
conn.close();
196+
}
197+
}
198+
catch (Exception e) {
199+
// The getNotifications method does not throw a meaningful message on interruption.
200+
// Therefore, we do not log an error, unless it occurred while active.
201+
if (isActive()) {
202+
LOGGER.error(e, "Failed to poll notifications from Postgres database");
203+
}
204+
}
205+
catch (Throwable t) {
206+
LOGGER.error(t, "Failed to poll notifications from Postgres database");
207+
return;
208+
}
209+
}
210+
}
211+
finally {
212+
this.latch.countDown();
213+
}
214+
});
215+
}
216+
217+
private boolean isActive() {
218+
if (Thread.interrupted()) {
219+
Thread.currentThread().interrupt();
220+
return false;
221+
}
222+
return true;
223+
}
224+
225+
@Override
226+
public synchronized void stop() {
227+
Future<?> future = this.future;
228+
if (future.isDone()) {
229+
return;
230+
}
231+
future.cancel(true);
232+
PgConnection conn = this.connection;
233+
if (conn != null) {
234+
try {
235+
conn.close();
236+
}
237+
catch (SQLException ignored) {
238+
}
239+
}
240+
try {
241+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
242+
throw new IllegalStateException("Failed to stop "
243+
+ PostgresChannelMessageTableSubscriber.class.getName());
244+
}
245+
}
246+
catch (InterruptedException ignored) {
247+
}
248+
}
249+
250+
@Override
251+
public boolean isRunning() {
252+
return this.latch.getCount() > 0;
253+
}
254+
255+
private static String getKey(Object input) {
256+
return input == null ? null : UUIDConverter.getUUID(input).toString();
257+
}
258+
259+
/**
260+
* A subscription to a {@link PostgresChannelMessageTableSubscriber} for
261+
* receiving push notifications for new messages that are added to
262+
* a {@link JdbcChannelMessageStore}.
263+
*/
264+
public interface Subscription {
265+
266+
/**
267+
* Indicate that a message was added to the represented region and
268+
* group id. Note that this method might also be invoked if there are
269+
* no new messages to read, for example if another subscription already
270+
* read those messages or if a new messages might have arrived during
271+
* a temporary connection loss.
272+
*/
273+
void notifyUpdate();
274+
275+
/**
276+
* Return the region for which this subscription receives notifications.
277+
* @return The relevant region of the {@link JdbcChannelMessageStore}.
278+
*/
279+
String getRegion();
280+
281+
/**
282+
* Return the group id for which this subscription receives notifications.
283+
* @return The group id of the {@link PostgresSubscribableChannel}.
284+
*/
285+
Object getGroupId();
286+
287+
}
288+
289+
}

0 commit comments

Comments
 (0)