Description
Apparently, the Spring Redis Data team does recognise this as an issue, but says it is something the Spring Session team should look into.
See here, for a full explanation, with their responses:
spring-projects/spring-data-redis#3075
Can a solution be found?
Original issue below:
I switched to AWS Elasticache Clustered, from AWS Elasticache Serveress, as I kept getting a psubscribe error, and no matter what I did could not remove the error by changing anything in Spring.
So I'm now using AWS Elasticache Clustered. The psubscribe has gone, but after about 15-20 mins I consistently get this error:
Do you know how I can find out what is causing it and resolve it?
Thanks in advance
ERROR
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@43b89ef5 was either previously returned or does not belong to this connection provider
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@4c4f02d6 was either previously returned or does not belong to this connection provider
2024-12-15T15:41:34.277Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider
at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.9.jar!/:3.6.9]
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.9.jar!/:3.6.9]
...
at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
CONNECTION FACTORY
On AWS RedisConfigType is Clustered, and profileProperties.active = PRODUCTION
/**
* Establishes a Redis Connection Factory with comprehensive configuration options.
*
* Provides configuration for:
* - Connection pooling and lifecycle management
* - Cluster and standalone deployment modes
* - SSL/TLS security for production environments
* - DNS resolution and caching
* - Performance tuning (thread pools, buffers, queues)
* - High availability features (topology refresh, failover)
*
* The factory supports different deployment profiles:
* - Production: Clustered Redis with SSL
* - Development: Standalone Redis without SSL
*
* @property profileProperties Configuration properties for active deployment profile
* @since 1.0.0
*/
@Configuration
@EnableRedisRepositories(
enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
private val profileProperties: ProfileProperties
) {
/**
* Client resources for Redis connections.
*
* Manages shared resources including:
* - Thread pools for I/O and computation
* - DNS resolution and caching
* - Command latency metrics
* - Connection lifecycle
*
* This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
* Using lateinit as the resources are created after Spring context initialization.
*/
private lateinit var clientResources: ClientResources
companion object {
private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)
/**
* Timeout configurations for Redis operations.
* - Command timeout: Maximum time for command execution
* - Connect timeout: Maximum time for connection establishment
* - Topology refresh: Interval for cluster topology updates
* - Adaptive refresh: Time window for topology change detection
*/
private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L
/**
* Connection pool settings to optimize resource usage.
* - Max total: Maximum number of connections in the pool
* - Max/Min idle: Upper/Lower bounds for idle connections
* - Max wait: Maximum time to wait for connection
* - Eviction: Maintenance settings for idle connections
*/
private const val MAX_TOTAL_CONNECTIONS = 100
private const val MAX_IDLE_CONNECTIONS = 60
private const val MIN_IDLE_CONNECTIONS = 20
private const val MAX_WAIT_SECONDS = 120L
private const val EVICTION_RUN_PERIOD_SECONDS = 120L
private const val MIN_EVICTABLE_IDLE_MINUTES = 5L
private const val NUM_TESTS_PER_EVICTION_RUN = 3
/**
* Performance optimization parameters.
* - Decode buffer ratio: Memory allocation for response buffers
* - Request queue size: Maximum pending requests
* - Latency publish interval: Metrics publication frequency
* - Thread pool multiplier: Scaling factor for I/O threads
*/
private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
private const val REQUEST_QUEUE_SIZE = 2500
private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
private const val IO_THREAD_POOL_MULTIPLIER = 2
}
/**
* Configures Redis keyspace notifications behavior.
*
* Returns NO_OP action to disable automatic configuration of keyspace
* notifications, preventing potential connection issues during shutdown
* related to Pub/Sub connections in Spring Session.
*
* @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
*/
@Bean
fun configureRedisAction(): ConfigureRedisAction {
return ConfigureRedisAction.NO_OP
}
/* LETTUCE - reactive RedisConnectionFactory */
/**
* Creates the primary reactive Redis connection factory.
*
* Configures a Lettuce-based connection factory with:
* - Profile-specific Redis deployment mode (clustered/standalone)
* - Connection pooling
* - Client resources (thread pools, DNS resolution)
* - SSL for production environments
*
* @param clusterProperties Cluster node configuration
* @param springDataRedisProperties Redis connection properties
* @return Configured [ReactiveRedisConnectionFactory]
*/
@Bean
@Primary
internal fun reactiveRedisConnectionFactory(
clusterProperties: ClusterConfigurationProperties,
springDataRedisProperties: SpringDataRedisProperties,
): ReactiveRedisConnectionFactory {
val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
clientResources = createClientResources(springDataRedisProperties.host)
val clientConfig = createLettuceClientConfig(
clientResources,
profileProperties.active,
springDataRedisProperties.type
)
return LettuceConnectionFactory(config, clientConfig).apply {
afterPropertiesSet()
validateConnection = false
setShareNativeConnection(true)
}
}
/**
* Properties class for Redis cluster configuration.
*
* @property nodes List of Redis nodes in format `host:port`
*/
@Component
internal class ClusterConfigurationProperties(
springDataRedisProperties: SpringDataRedisProperties
) {
/**
* Get initial collection of known cluster nodes in format `host:port`.
* @return
*/
var nodes = listOf(
"${springDataRedisProperties.host}:${springDataRedisProperties.port}",
)
}
/**
* Creates appropriate Redis configuration based on active profile.
*
* @param properties Redis connection properties
* @param clusterProperties Cluster node configuration
* @return [RedisConfiguration] for either clustered or standalone deployment
*/
private fun createRedisConfiguration(
properties: SpringDataRedisProperties,
clusterProperties: ClusterConfigurationProperties
): RedisConfiguration = when {
profileProperties.active == ProfileTypes.PRODUCTION.type &&
properties.type == RedisConfigTypes.CLUSTERED.type -> {
// Redis Cluster for production
RedisClusterConfiguration(clusterProperties.nodes).apply {
password = RedisPassword.of(properties.password)
}
}
else -> {
// Redis Standalone for non-production
RedisStandaloneConfiguration().apply {
hostName = properties.host
port = properties.port
password = RedisPassword.of(properties.password)
}
}
}
/**
* Creates client resources with optimized thread pools and DNS resolution.
*
* @param host Redis host for DNS resolution
* @return Configured [ClientResources]
*/
private fun createClientResources(host: String) = DefaultClientResources.builder()
.ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
.computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
.socketAddressResolver(createCachingDnsResolver(host))
.commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
.commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
.build()
/**
* Creates Lettuce client configuration with pooling and security settings.
*
* Configures:
* - Read preferences (replica preferred)
* - Command timeouts
* - Connection pooling
* - SSL (for production)
*
* @param clientResources Configured client resources for connection management
* @param activeProfile Current deployment profile
* @return Configured [LettucePoolingClientConfiguration]
*/
private fun createLettuceClientConfig(
clientResources: ClientResources,
activeProfile: String?,
redisConfigType: String?
): LettucePoolingClientConfiguration {
val clusterClientOptions = createClusterClientOptions(activeProfile, redisConfigType)
return LettucePoolingClientConfiguration.builder()
.readFrom(REPLICA_PREFERRED)
.commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
.clientResources(clientResources)
.clientOptions(clusterClientOptions)
.poolConfig(buildLettucePoolConfig())
.shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
.shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
// conditionally use sslOptions if profileProperties.active is 'prod'
.apply {
if (activeProfile == ProfileTypes.PRODUCTION.type) {
useSsl()
}
}
.build()
}
/**
* Creates cluster client options with comprehensive connection settings.
*
* Configures:
* - Auto-reconnect behavior
* - Connection validation
* - Timeout settings
* - Socket options
* - Topology refresh
* - Buffer and queue sizes
* - SSL (for production)
*
* @param activeProfile Current deployment profile
* @return Configured [ClusterClientOptions]
*/
private fun createClusterClientOptions(activeProfile: String?, redisConfigType: String?): ClusterClientOptions {
val builder = ClusterClientOptions.builder()
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.timeoutOptions(createTimeoutOptions())
.socketOptions(createSocketOptions())
.topologyRefreshOptions(createTopologyRefreshOptions())
.validateClusterNodeMembership(true)
.suspendReconnectOnProtocolFailure(true)
.disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
.decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
.requestQueueSize(REQUEST_QUEUE_SIZE)
.maxRedirects(DEFAULT_MAX_REDIRECTS)
.suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
.publishOnScheduler(true)
.protocolVersion(ProtocolVersion.RESP3)
// conditionally use sslOptions if profileProperties.active is 'prod'
if (activeProfile == ProfileTypes.PRODUCTION.type) {
builder.sslOptions(createSslOptions())
}
// conditionally use connected nodes if redisConfigType is 'clustered'
if (redisConfigType == RedisConfigTypes.CLUSTERED.type) {
builder.nodeFilter { node ->
node.isConnected
}
}
return builder.build()
}
/**
* Creates socket options for Redis connections.
*
* Configures:
* - Keep-alive settings
* - TCP no-delay
* - Connection timeouts
*
* @return Configured [SocketOptions]
*/
private fun createSocketOptions() = SocketOptions.builder()
.keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
.tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
.connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
.build()
/**
* Creates timeout options for Redis commands.
*
* Configures:
* - Fixed timeout duration
* - Command timeout behavior
*
* @return Configured [TimeoutOptions]
*/
private fun createTimeoutOptions() = TimeoutOptions.builder()
.fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
.timeoutCommands(true)
.build()
/**
* Creates topology refresh options for Redis cluster.
*
* Configures:
* - Periodic refresh intervals
* - Dynamic refresh sources
* - Stale connection handling
* - Adaptive refresh triggers
*
* @return Configured [ClusterTopologyRefreshOptions]
*/
private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
.dynamicRefreshSources(true)
.closeStaleConnections(true)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
.enableAllAdaptiveRefreshTriggers()
.build()
/**
* Creates connection pool configuration.
*
* Configures:
* - Maximum total/idle connections
* - Connection wait times
* - Eviction policies
* - Connection testing
* - Pool behavior (LIFO/FIFO)
*
* @return Configured [GenericObjectPoolConfig]
*/
private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
maxTotal = MAX_TOTAL_CONNECTIONS
maxIdle = MAX_IDLE_CONNECTIONS
minIdle = MIN_IDLE_CONNECTIONS
setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
testOnBorrow = true
testWhileIdle = true
testOnReturn = true
blockWhenExhausted = true
lifo = true
jmxEnabled = false
fairness = true
evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
}
/**
* Creates SSL options for secure Redis connections.
*
* Configures JDK-based SSL provider for Redis connections
* in production environments.
*
* @return Configured [SslOptions]
*/
private fun createSslOptions(): SslOptions = SslOptions.builder()
.jdkSslProvider()
.build()
/**
* Creates DNS resolver with caching capabilities.
*
* Implements:
* - DNS resolution caching
* - Hostname-to-IP mapping
* - Fallback handling for resolution failures
*
* @param host Redis host to resolve
* @return Configured [MappingSocketAddressResolver]
*/
private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()
val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
val addresses = dnsCache.computeIfAbsent(host) {
try {
DnsResolvers.JVM_DEFAULT.resolve(host)
} catch (e: UnknownHostException) {
emptyArray()
}
}
val cacheIP = addresses.firstOrNull()?.hostAddress
if (hostAndPort.hostText == cacheIP) {
HostAndPort.of(host, hostAndPort.port)
} else {
hostAndPort
}
}
return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
}
}