Skip to content

DATAREDIS-756 - Order results of partitioned multi-key commands by positional keys. #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAREDIS-756-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
*/
package org.springframework.data.redis.connection;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.AsyncTaskExecutor;
Expand Down Expand Up @@ -202,7 +208,6 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba

Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();
for (RedisClusterNode node : resolvedRedisClusterNodes) {

futures.put(new NodeExecution(node), executor.submit(() -> executeCommandOnSingleNode(callback, node)));
}

Expand All @@ -225,23 +230,30 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
if (!entry.getValue().isDone() && !entry.getValue().isCancelled()) {
done = false;
} else {

NodeExecution execution = entry.getKey();
try {

String futureId = ObjectUtils.getIdentityHexString(entry.getValue());
if (!saveGuard.contains(futureId)) {
result.add(entry.getValue().get());

if (execution.isPositional()) {
result.add(execution.getPositionalKey(), entry.getValue().get());
} else {
result.add(entry.getValue().get());
}
saveGuard.add(futureId);
}
} catch (ExecutionException e) {

RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
exceptions.put(entry.getKey().getNode(), ex != null ? ex : e.getCause());
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
} catch (InterruptedException e) {

Thread.currentThread().interrupt();

RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
exceptions.put(entry.getKey().getNode(), ex != null ? ex : e.getCause());
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
break;
}
}
Expand Down Expand Up @@ -271,29 +283,22 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> cmd,
Iterable<byte[]> keys) {

Map<RedisClusterNode, Set<byte[]>> nodeKeyMap = new HashMap<>();
Map<RedisClusterNode, PositionalKeys> nodeKeyMap = new HashMap<>();

int index = 0;
for (byte[] key : keys) {
for (RedisClusterNode node : getClusterTopology().getKeyServingNodes(key)) {

if (nodeKeyMap.containsKey(node)) {
nodeKeyMap.get(node).add(key);
} else {
Set<byte[]> keySet = new LinkedHashSet<>();
keySet.add(key);
nodeKeyMap.put(node, keySet);
}
nodeKeyMap.computeIfAbsent(node, val -> PositionalKeys.empty()).append(PositionalKey.of(key, index++));
}
}

Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();

for (Entry<RedisClusterNode, Set<byte[]>> entry : nodeKeyMap.entrySet()) {
for (Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {

if (entry.getKey().isMaster()) {
for (byte[] key : entry.getValue()) {
for (PositionalKey key : entry.getValue()) {
futures.put(new NodeExecution(entry.getKey(), key),
executor.submit(() -> executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key)));
executor.submit(() -> executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key.getBytes())));
}
}
}
Expand Down Expand Up @@ -326,6 +331,7 @@ private ClusterTopology getClusterTopology() {
return this.topologyProvider.getTopology();
}

@Nullable
private DataAccessException convertToDataAccessException(Exception e) {
return exceptionTranslationStrategy.translate(e);
}
Expand Down Expand Up @@ -384,17 +390,22 @@ public interface MultiKeyClusterCommandCallback<T, S> {
* keys, involved.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 1.7
*/
private static class NodeExecution {

private RedisClusterNode node;
private Object[] args;
private final RedisClusterNode node;
private final @Nullable PositionalKey positionalKey;

NodeExecution(RedisClusterNode node) {
this(node, null);
}

NodeExecution(RedisClusterNode node, Object... args) {
NodeExecution(RedisClusterNode node, @Nullable PositionalKey positionalKey) {

this.node = node;
this.args = args;
this.positionalKey = positionalKey;
}

/**
Expand All @@ -404,45 +415,18 @@ RedisClusterNode getNode() {
return node;
}

/*
* (non-Javadoc)
* @see java.lang.Object#hashCode()
/**
* Get the {@link PositionalKey} of this execution.
*
* @since 2.0.3
*/
@Override
public int hashCode() {

int result = ObjectUtils.nullSafeHashCode(node);
return result + ObjectUtils.nullSafeHashCode(args);
PositionalKey getPositionalKey() {
return positionalKey;
}

/*
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {

if (this == obj) {
return true;
}

if (obj == null) {
return false;
}

if (!(obj instanceof NodeExecution)) {
return false;
}

NodeExecution that = (NodeExecution) obj;

if (!ObjectUtils.nullSafeEquals(this.node, that.node)) {
return false;
}

return ObjectUtils.nullSafeEquals(this.args, that.args);
boolean isPositional() {
return positionalKey != null;
}

}

/**
Expand Down Expand Up @@ -515,17 +499,25 @@ public byte[] getKey() {
* {@link MultiNodeResult} holds all {@link NodeResult} of a command executed on multiple {@link RedisClusterNode}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @param <T>
* @since 1.7
*/
public static class MultiNodeResult<T> {

List<NodeResult<T>> nodeResults = new ArrayList<>();
Map<PositionalKey, NodeResult<T>> positionalResults = new LinkedHashMap<>();

private void add(NodeResult<T> result) {
nodeResults.add(result);
}

private void add(PositionalKey key, NodeResult<T> result) {

positionalResults.put(key, result);
add(result);
}

/**
* @return never {@literal null}.
*/
Expand All @@ -551,15 +543,23 @@ public List<T> resultsAsList() {
*/
public List<T> resultsAsListSortBy(byte[]... keys) {

ArrayList<NodeResult<T>> clone = new ArrayList<>(nodeResults);
clone.sort(new ResultByReferenceKeyPositionComparator(keys));
if (positionalResults.isEmpty()) {

List<NodeResult<T>> clone = new ArrayList<>(nodeResults);
clone.sort(new ResultByReferenceKeyPositionComparator(keys));

return toList(clone);
return toList(clone);
}

Map<PositionalKey, NodeResult<T>> result = new TreeMap<>(new ResultByKeyPositionComparator(keys));
result.putAll(positionalResults);

return result.values().stream().map(tNodeResult -> tNodeResult.value).collect(Collectors.toList());
}

/**
* @param returnValue can be {@literal null}.
* @return can be {@litearl null}.
* @return can be {@literal null}.
*/
@Nullable
public T getFirstNonNullNotEmptyOrDefault(@Nullable T returnValue) {
Expand Down Expand Up @@ -598,7 +598,7 @@ private List<T> toList(Collection<NodeResult<T>> source) {
*/
private static class ResultByReferenceKeyPositionComparator implements Comparator<NodeResult<?>> {

List<ByteArrayWrapper> reference;
private final List<ByteArrayWrapper> reference;

ResultByReferenceKeyPositionComparator(byte[]... keys) {
reference = new ArrayList<>(new ByteArraySet(Arrays.asList(keys)));
Expand All @@ -609,5 +609,117 @@ public int compare(NodeResult<?> o1, NodeResult<?> o2) {
return Integer.compare(reference.indexOf(o1.key), reference.indexOf(o2.key));
}
}

/**
* {@link Comparator} for sorting {@link PositionalKey} by external {@link PositionalKeys}.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0.3
*/
private static class ResultByKeyPositionComparator implements Comparator<PositionalKey> {

private final PositionalKeys reference;

ResultByKeyPositionComparator(byte[]... keys) {
reference = PositionalKeys.of(keys);
}

@Override
public int compare(PositionalKey o1, PositionalKey o2) {
return Integer.compare(reference.indexOf(o1), reference.indexOf(o2));
}
}
}

/**
* Value object representing a Redis key at a particular command position.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0.3
*/
@Getter
@EqualsAndHashCode
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static class PositionalKey {

private final ByteArrayWrapper key;
private final int position;

static PositionalKey of(byte[] key, int index) {
return new PositionalKey(new ByteArrayWrapper(key), index);
}

/**
* @return binary key.
*/
byte[] getBytes() {
return key.getArray();
}
}

/**
* Mutable data structure to represent multiple {@link PositionalKey}s.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0.3
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static class PositionalKeys implements Iterable<PositionalKey> {

private final List<PositionalKey> keys;

/**
* Create an empty {@link PositionalKeys}.
*/
static PositionalKeys empty() {
return new PositionalKeys(new ArrayList<>());
}

/**
* Create an {@link PositionalKeys} from {@code keys}.
*/
static PositionalKeys of(byte[]... keys) {

List<PositionalKey> result = new ArrayList<>(keys.length);

for (int i = 0; i < keys.length; i++) {
result.add(PositionalKey.of(keys[i], i));
}

return new PositionalKeys(result);
}

/**
* Create an {@link PositionalKeys} from {@link PositionalKey}s.
*/
static PositionalKeys of(PositionalKey... keys) {

PositionalKeys result = PositionalKeys.empty();
result.append(keys);

return result;
}

/**
* Append {@link PositionalKey}s to this object.
*/
void append(PositionalKey... keys) {
this.keys.addAll(Arrays.asList(keys));
}

/**
* @return index of the {@link PositionalKey}.
*/
int indexOf(PositionalKey key) {
return keys.indexOf(key);
}

@Override
public Iterator<PositionalKey> iterator() {
return keys.iterator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,15 @@ public interface ClusterConnectionTests {
// DATAREDIS-315
void mGetShouldReturnCorrectlyWhenKeysDoNotMapToSameSlot();

// DATAREDIS-756
void mGetShouldReturnMultipleSameKeysWhenKeysDoNotMapToSameSlot();

// DATAREDIS-315
void mGetShouldReturnCorrectlyWhenKeysMapToSameSlot();

// DATAREDIS-756
void mGetShouldReturnMultipleSameKeysWhenKeysMapToSameSlot();

// DATAREDIS-315
void mSetNXShouldReturnFalseIfNotAllKeysSet();

Expand Down
Loading