Skip to content

AWS Clustered issue - how can I resolve in Spring - redis.connection.PoolException error #3298

Open
@dreamstar-enterprises

Description

@dreamstar-enterprises

Apparently, the Spring Redis Data team does recognise this as an issue, but says it is something the Spring Session team should look into.

See here, for a full explanation, with their responses:

spring-projects/spring-data-redis#3075

Can a solution be found?

Original issue below:

I switched to AWS Elasticache Clustered, from AWS Elasticache Serveress, as I kept getting a psubscribe error, and no matter what I did could not remove the error by changing anything in Spring.

So I'm now using AWS Elasticache Clustered. The psubscribe has gone, but after about 15-20 mins I consistently get this error:

Do you know how I can find out what is causing it and resolve it?

Thanks in advance

ERROR

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@43b89ef5 was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@4c4f02d6 was either previously returned or does not belong to this connection provider

2024-12-15T15:41:34.277Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider

at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.9.jar!/:3.6.9]

at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.9.jar!/:3.6.9]
...

at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]

at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

CONNECTION FACTORY

On AWS RedisConfigType is Clustered, and profileProperties.active = PRODUCTION

/**
 * Establishes a Redis Connection Factory with comprehensive configuration options.
 *
 * Provides configuration for:
 * - Connection pooling and lifecycle management
 * - Cluster and standalone deployment modes
 * - SSL/TLS security for production environments
 * - DNS resolution and caching
 * - Performance tuning (thread pools, buffers, queues)
 * - High availability features (topology refresh, failover)
 *
 * The factory supports different deployment profiles:
 * - Production: Clustered Redis with SSL
 * - Development: Standalone Redis without SSL
 *
 * @property profileProperties Configuration properties for active deployment profile
 * @since 1.0.0
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val profileProperties: ProfileProperties
) {

    /**
     * Client resources for Redis connections.
     *
     * Manages shared resources including:
     * - Thread pools for I/O and computation
     * - DNS resolution and caching
     * - Command latency metrics
     * - Connection lifecycle
     *
     * This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
     * Using lateinit as the resources are created after Spring context initialization.
     */
    private lateinit var clientResources: ClientResources

    companion object {

        private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)

        /**
         * Timeout configurations for Redis operations.
         * - Command timeout: Maximum time for command execution
         * - Connect timeout: Maximum time for connection establishment
         * - Topology refresh: Interval for cluster topology updates
         * - Adaptive refresh: Time window for topology change detection
         */
        private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
        private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
        private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
        private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
        private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
        private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L

        /**
         * Connection pool settings to optimize resource usage.
         * - Max total: Maximum number of connections in the pool
         * - Max/Min idle: Upper/Lower bounds for idle connections
         * - Max wait: Maximum time to wait for connection
         * - Eviction: Maintenance settings for idle connections
         */
        private const val MAX_TOTAL_CONNECTIONS = 100
        private const val MAX_IDLE_CONNECTIONS = 60
        private const val MIN_IDLE_CONNECTIONS = 20
        private const val MAX_WAIT_SECONDS = 120L
        private const val EVICTION_RUN_PERIOD_SECONDS = 120L
        private const val MIN_EVICTABLE_IDLE_MINUTES = 5L
        private const val NUM_TESTS_PER_EVICTION_RUN = 3

        /**
         * Performance optimization parameters.
         * - Decode buffer ratio: Memory allocation for response buffers
         * - Request queue size: Maximum pending requests
         * - Latency publish interval: Metrics publication frequency
         * - Thread pool multiplier: Scaling factor for I/O threads
         */
        private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
        private const val REQUEST_QUEUE_SIZE = 2500
        private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
        private const val IO_THREAD_POOL_MULTIPLIER = 2
    }

    /**
     * Configures Redis keyspace notifications behavior.
     *
     * Returns NO_OP action to disable automatic configuration of keyspace
     * notifications, preventing potential connection issues during shutdown
     * related to Pub/Sub connections in Spring Session.
     *
     * @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
     */
    @Bean
    fun configureRedisAction(): ConfigureRedisAction {
        return ConfigureRedisAction.NO_OP
    }

    /* LETTUCE - reactive RedisConnectionFactory */
    /**
     * Creates the primary reactive Redis connection factory.
     *
     * Configures a Lettuce-based connection factory with:
     * - Profile-specific Redis deployment mode (clustered/standalone)
     * - Connection pooling
     * - Client resources (thread pools, DNS resolution)
     * - SSL for production environments
     *
     * @param clusterProperties Cluster node configuration
     * @param springDataRedisProperties Redis connection properties
     * @return Configured [ReactiveRedisConnectionFactory]
     */
    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
        springDataRedisProperties: SpringDataRedisProperties,
    ): ReactiveRedisConnectionFactory {
        val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
        clientResources = createClientResources(springDataRedisProperties.host)
        val clientConfig = createLettuceClientConfig(
            clientResources,
            profileProperties.active,
            springDataRedisProperties.type
        )

        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)
        }
    }

    /**
     * Properties class for Redis cluster configuration.
     *
     * @property nodes List of Redis nodes in format `host:port`
     */
    @Component
    internal class ClusterConfigurationProperties(
        springDataRedisProperties: SpringDataRedisProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataRedisProperties.host}:${springDataRedisProperties.port}",
        )
    }

    /**
     * Creates appropriate Redis configuration based on active profile.
     *
     * @param properties Redis connection properties
     * @param clusterProperties Cluster node configuration
     * @return [RedisConfiguration] for either clustered or standalone deployment
     */
    private fun createRedisConfiguration(
        properties: SpringDataRedisProperties,
        clusterProperties: ClusterConfigurationProperties
    ): RedisConfiguration = when {
        profileProperties.active == ProfileTypes.PRODUCTION.type &&
                properties.type == RedisConfigTypes.CLUSTERED.type -> {

            // Redis Cluster for production
            RedisClusterConfiguration(clusterProperties.nodes).apply {
                password = RedisPassword.of(properties.password)
            }
        }
        else -> {

            // Redis Standalone for non-production
            RedisStandaloneConfiguration().apply {
                hostName = properties.host
                port = properties.port
                password = RedisPassword.of(properties.password)
            }
        }
    }

    /**
     * Creates client resources with optimized thread pools and DNS resolution.
     *
     * @param host Redis host for DNS resolution
     * @return Configured [ClientResources]
     */
    private fun createClientResources(host: String) = DefaultClientResources.builder()
        .ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
        .computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
        .socketAddressResolver(createCachingDnsResolver(host))
        .commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
        .commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
        .build()

    /**
     * Creates Lettuce client configuration with pooling and security settings.
     *
     * Configures:
     * - Read preferences (replica preferred)
     * - Command timeouts
     * - Connection pooling
     * - SSL (for production)
     *
     * @param clientResources Configured client resources for connection management
     * @param activeProfile Current deployment profile
     * @return Configured [LettucePoolingClientConfiguration]
     */
    private fun createLettuceClientConfig(
        clientResources: ClientResources,
        activeProfile: String?,
        redisConfigType: String?
    ): LettucePoolingClientConfiguration {
        val clusterClientOptions = createClusterClientOptions(activeProfile, redisConfigType)

        return LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            .commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())
            .shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
            .shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
            // conditionally use sslOptions if profileProperties.active is 'prod'
            .apply {
                if (activeProfile == ProfileTypes.PRODUCTION.type) {
                    useSsl()
                }
            }
            .build()
    }

    /**
     * Creates cluster client options with comprehensive connection settings.
     *
     * Configures:
     * - Auto-reconnect behavior
     * - Connection validation
     * - Timeout settings
     * - Socket options
     * - Topology refresh
     * - Buffer and queue sizes
     * - SSL (for production)
     *
     * @param activeProfile Current deployment profile
     * @return Configured [ClusterClientOptions]
     */
    private fun createClusterClientOptions(activeProfile: String?, redisConfigType: String?): ClusterClientOptions {
        val builder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(createTimeoutOptions())
            .socketOptions(createSocketOptions())
            .topologyRefreshOptions(createTopologyRefreshOptions())
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
            .requestQueueSize(REQUEST_QUEUE_SIZE)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
            .publishOnScheduler(true)
            .protocolVersion(ProtocolVersion.RESP3)

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (activeProfile == ProfileTypes.PRODUCTION.type) {
            builder.sslOptions(createSslOptions())
        }

        // conditionally use connected nodes if redisConfigType is 'clustered'
        if (redisConfigType == RedisConfigTypes.CLUSTERED.type) {
            builder.nodeFilter { node ->
                node.isConnected
            }
        }

        return builder.build()
    }

    /**
     * Creates socket options for Redis connections.
     *
     * Configures:
     * - Keep-alive settings
     * - TCP no-delay
     * - Connection timeouts
     *
     * @return Configured [SocketOptions]
     */
    private fun createSocketOptions() = SocketOptions.builder()
        .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
        .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
        .connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
        .build()

    /**
     * Creates timeout options for Redis commands.
     *
     * Configures:
     * - Fixed timeout duration
     * - Command timeout behavior
     *
     * @return Configured [TimeoutOptions]
     */
    private fun createTimeoutOptions() = TimeoutOptions.builder()
        .fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
        .timeoutCommands(true)
        .build()

    /**
     * Creates topology refresh options for Redis cluster.
     *
     * Configures:
     * - Periodic refresh intervals
     * - Dynamic refresh sources
     * - Stale connection handling
     * - Adaptive refresh triggers
     *
     * @return Configured [ClusterTopologyRefreshOptions]
     */
    private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
        .enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
        .dynamicRefreshSources(true)
        .closeStaleConnections(true)
        .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
        .enableAllAdaptiveRefreshTriggers()
        .build()

    /**
     * Creates connection pool configuration.
     *
     * Configures:
     * - Maximum total/idle connections
     * - Connection wait times
     * - Eviction policies
     * - Connection testing
     * - Pool behavior (LIFO/FIFO)
     *
     * @return Configured [GenericObjectPoolConfig]
     */
    private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
        maxTotal = MAX_TOTAL_CONNECTIONS
        maxIdle = MAX_IDLE_CONNECTIONS
        minIdle = MIN_IDLE_CONNECTIONS
        setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
        timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
        minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
        testOnBorrow = true
        testWhileIdle = true
        testOnReturn = true
        blockWhenExhausted = true
        lifo = true
        jmxEnabled = false
        fairness = true
        evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
        numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
    }

    /**
     * Creates SSL options for secure Redis connections.
     *
     * Configures JDK-based SSL provider for Redis connections
     * in production environments.
     *
     * @return Configured [SslOptions]
     */
    private fun createSslOptions(): SslOptions = SslOptions.builder()
        .jdkSslProvider()
        .build()

    /**
     * Creates DNS resolver with caching capabilities.
     *
     * Implements:
     * - DNS resolution caching
     * - Hostname-to-IP mapping
     * - Fallback handling for resolution failures
     *
     * @param host Redis host to resolve
     * @return Configured [MappingSocketAddressResolver]
     */
    private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
        val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val addresses = dnsCache.computeIfAbsent(host) {
                try {
                    DnsResolvers.JVM_DEFAULT.resolve(host)
                } catch (e: UnknownHostException) {
                    emptyArray()
                }
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress
            if (hostAndPort.hostText == cacheIP) {
                HostAndPort.of(host, hostAndPort.port)
            } else {
                hostAndPort
            }
        }

        return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
    }

}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions