Skip to content

Force localhost connections when client connects to localhost #305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ test: build
rabbitmq-server:
docker run -it --rm --name rabbitmq-stream-docker \
-p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
--pull always \
pivotalrabbitmq/rabbitmq-stream

Expand Down
56 changes: 45 additions & 11 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public async Task<IProducer> CreateRawSuperStreamProducer(
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
foreach (var partitionsStream in partitions.Streams)
{
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
}

var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
Expand Down Expand Up @@ -217,8 +216,7 @@ public async Task<IConsumer> CreateSuperStreamConsumer(
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
foreach (var partitionsStream in partitions.Streams)
{
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
}

var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
Expand All @@ -239,10 +237,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
throw new CreateProducerException("Batch Size must be bigger than 0");
}

await MayBeReconnectLocator().ConfigureAwait(false);
var meta = await _client.QueryMetadata(new[] { rawProducerConfig.Stream }).ConfigureAwait(false);

var metaStreamInfo = meta.StreamInfos[rawProducerConfig.Stream];
var metaStreamInfo = await StreamInfo(rawProducerConfig.Stream).ConfigureAwait(false);
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
{
throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}");
Expand All @@ -268,6 +263,47 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
}
}

private async Task<StreamInfo> StreamInfo(string streamName)
{
// force localhost connection for single node clusters and when address resolver is not provided
// when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer
var forceLocalHost = false;
var localPort = 0;
if (_clientParameters.Endpoints.Count == 1 &&
_clientParameters.AddressResolver is null)
{
var clientParametersEndpoint = _clientParameters.Endpoints[0];
switch (clientParametersEndpoint)
{
case DnsEndPoint { Host: "localhost" } dnsEndPoint:
forceLocalHost = true;
localPort = dnsEndPoint.Port;
break;
case IPEndPoint ipEndPoint when Equals(ipEndPoint.Address, IPAddress.Loopback):
forceLocalHost = true;
localPort = ipEndPoint.Port;
break;
}
}

StreamInfo metaStreamInfo;
if (forceLocalHost)
{
// craft the metadata response to force using localhost
var leader = new Broker("localhost", (uint)localPort);
metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader,
new List<Broker>(1) { leader });
}
else
{
await MayBeReconnectLocator().ConfigureAwait(false);
var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false);
metaStreamInfo = meta.StreamInfos[streamName];
}

return metaStreamInfo;
}

public async Task CreateStream(StreamSpec spec)
{
var response = await _client.CreateStream(spec.Name, spec.Args).ConfigureAwait(false);
Expand Down Expand Up @@ -350,9 +386,7 @@ public async Task<StreamStats> StreamStats(string stream)
public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConfig,
ILogger logger = null)
{
await MayBeReconnectLocator().ConfigureAwait(false);
var meta = await _client.QueryMetadata(new[] { rawConsumerConfig.Stream }).ConfigureAwait(false);
var metaStreamInfo = meta.StreamInfos[rawConsumerConfig.Stream];
var metaStreamInfo = await StreamInfo(rawConsumerConfig.Stream).ConfigureAwait(false);
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
{
throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}");
Expand Down
19 changes: 16 additions & 3 deletions Tests/FilterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ public class FilterTest
{
// When the Filter is set also Values must be set and PostFilter must be set
// Values must be a list of string and must contain at least one element
[Fact]
[SkippableFact]
public async void ValidateFilter()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new SkipException("broker does not support filter");
}

await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
new ConsumerConfig(system, stream) { Filter = new ConsumerFilter() }
Expand Down Expand Up @@ -54,10 +58,14 @@ await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
// We send 100 messages with two different states (Alabama and New York)
// By using the filter we should be able to consume only the messages from Alabama
// and the server has to send only one chunk with all the messages from Alabama
[Fact]
[SkippableFact]
public async void FilterShouldReturnOnlyOneChunk()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new SkipException("broker does not support filter");
}

var producer = await Producer.Create(
new ProducerConfig(system, stream)
Expand Down Expand Up @@ -171,10 +179,15 @@ async Task SendTo(string state)
// For the producer side we have the ConfirmationHandler the messages with errors
// will be reported as not confirmed and the user can handle them.
// for the consumer the messages will be skipped and logged with the standard logger
[Fact]
[SkippableFact]
public async void ErrorFiltersFunctionWontDeliverTheMessage()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new SkipException("broker does not support filter");
}

var messagesConfirmed = 0;
var messagesError = 0;
var testPassed = new TaskCompletionSource<int>();
Expand Down
17 changes: 12 additions & 5 deletions Tests/RawProducerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public async void CreateRawProducer()
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
await system.CreateStream(new StreamSpec(stream));
var rawProducer = await system.CreateRawProducer(

var createProducerTask = system.CreateRawProducer(
new RawProducerConfig(stream)
{
Reference = "producer",
Expand All @@ -44,6 +45,15 @@ public async void CreateRawProducer()
}
});

if (await Task.WhenAny(createProducerTask, Task.Delay(5000)) != createProducerTask)
{
// timeout to avoid infinite await
Assert.Fail("timeout awaiting for CreateRawProducer");
return;
}

var rawProducer = await createProducerTask;

var readonlySequence = "apple".AsReadonlySequence();
var message = new Message(new Data(readonlySequence));
await rawProducer.Send(1, message);
Expand Down Expand Up @@ -378,10 +388,7 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence<byte> @event)
RawProducerConfig(stream)
{
Reference = "producer",
ConfirmHandler = _ =>
{
testPassed.SetResult(true);
}
ConfirmHandler = _ => { testPassed.SetResult(true); }
}
);

Expand Down
33 changes: 30 additions & 3 deletions Tests/SuperStreamConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumedMessages = 0;
const int NumberOfMessages = 20;
var system = await StreamSystem.Create(new StreamSystemConfig());
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper);
// Publish to super stream hands sometimes, for unknow reason
var publishTask = SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages,
"", _testOutputHelper);
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
{
Assert.Fail("timed out awaiting to publish messages to super stream");
}

await publishTask;

var clientProvidedName = Guid.NewGuid().ToString();

var consumer = await system.CreateSuperStreamConsumer(
Expand Down Expand Up @@ -200,7 +209,15 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished
var listConsumed = new ConcurrentBag<string>();
const int NumberOfMessages = 20;
var system = await StreamSystem.Create(new StreamSystemConfig());
await SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
var publishToSuperStreamTask =
SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask)
{
Assert.Fail("timeout waiting to publish messages");
}

// We re-await the task so that any exceptions/cancellation is rethrown.
await publishToSuperStreamTask;
var clientProvidedName = Guid.NewGuid().ToString();
var consumers = new Dictionary<string, IConsumer>();

Expand Down Expand Up @@ -253,7 +270,17 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
{
SystemUtils.ResetSuperStreams();
var system = await StreamSystem.Create(new StreamSystemConfig());
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
_testOutputHelper.WriteLine("awaiting publish to super stream");
var publishTask =
SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
{
Assert.Fail("timed out awaiting to publish messages to super stream");
}

// re-await in case any cancellation or exception happen, it can throw
await publishTask;

var listConsumed = new ConcurrentBag<string>();
var testPassed = new TaskCompletionSource<bool>();
var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)
Expand Down