Skip to content

Add DateTimeOffset test #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ public void Dispose()
}
catch (Exception e)
{
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}.", _subscriberId);
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}", _subscriberId);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void Dispose()
}
catch (Exception e)
{
_logger.LogError(e, "Error during disposing Consumer: {PublisherId}.", _publisherId);
_logger.LogError(e, "Error during disposing Consumer: {PublisherId}", _publisherId);
}

GC.SuppressFinalize(this);
Expand Down
84 changes: 60 additions & 24 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,23 @@ public async void ConsumerStoreOffset()
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
await system.CreateStream(new StreamSpec(stream));
const int numberOfMessages = 10;
await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper);
const int NumberOfMessages = 10;
await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper);
var count = 0;
var consumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Reference = "consumer_offset",
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (consumer, ctx, message) =>
MessageHandler = async (consumer, ctx, _) =>
{
testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}");
count++;
if (count == numberOfMessages)
if (count == NumberOfMessages)
{
await consumer.StoreOffset(ctx.Offset);
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
testPassed.SetResult(numberOfMessages);
testPassed.SetResult(NumberOfMessages);
}

await Task.CompletedTask;
Expand All @@ -122,7 +122,7 @@ public async void ConsumerStoreOffset()
var client = await Client.Create(clientParameters);
var offset = await client.QueryOffset("consumer_offset", stream);
// The offset must be numberOfMessages less one
Assert.Equal(offset.Offset, Convert.ToUInt64(numberOfMessages - 1));
Assert.Equal(offset.Offset, Convert.ToUInt64(NumberOfMessages - 1));
await consumer.Close();
await system.DeleteStream(stream);
await system.Close();
Expand Down Expand Up @@ -456,33 +456,33 @@ public async void ConsumerQueryOffset()
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
await system.CreateStream(new StreamSpec(stream));
const int numberOfMessages = 10;
const int numberOfMessagesToStore = 4;
await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper);
const int NumberOfMessages = 10;
const int NumberOfMessagesToStore = 4;
await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper);
var count = 0;
const string reference = "consumer_offset";
const string Reference = "consumer_offset";
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = reference,
Reference = Reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, message) =>
{
testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}");
count++;
if (count == numberOfMessagesToStore)
{
// store the the offset after numberOfMessagesToStore messages
// so when we query the offset we should (must) have the same
// values
await consumer.StoreOffset(ctx.Offset);
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
}

if (count == numberOfMessages)
switch (count)
{
testPassed.SetResult(numberOfMessages);
case NumberOfMessagesToStore:
// store the the offset after numberOfMessagesToStore messages
// so when we query the offset we should (must) have the same
// values
await consumer.StoreOffset(ctx.Offset);
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
break;
case NumberOfMessages:
testPassed.SetResult(NumberOfMessages);
break;
}

await Task.CompletedTask;
Expand All @@ -494,8 +494,8 @@ public async void ConsumerQueryOffset()
// it may need some time to store the offset
SystemUtils.Wait();
// numberOfMessagesToStore index 0
Assert.Equal((ulong)(numberOfMessagesToStore - 1),
await system.QueryOffset(reference, stream));
Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
await system.QueryOffset(Reference, stream));

// this has to raise OffsetNotFoundException in case the offset
// does not exist like in this case.
Expand Down Expand Up @@ -696,5 +696,41 @@ public async void ProducerConsumerMixingDifferentSendTypesCompressAndStandard()
await system.DeleteStream(stream);
await system.Close();
}

[Fact]
public async void ShouldConsumeFromDateTimeOffset()
{
// validate the consumer can start from a specific time
// this test is not deterministic because it depends on the
// time the test is executed.
// but at least we can validate the consumer can start from a specific time less 100 ms
// and it has to receive all the messages
// not 100% perfect but it is better than nothing

SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var before = DateTimeOffset.Now.AddMilliseconds(-100);
await SystemUtils.PublishMessages(system, stream, 100, testOutputHelper);
var testPassed = new TaskCompletionSource<bool>();

var consumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Reference = "consumer",
OffsetSpec = new OffsetTypeTimestamp(before),
MessageHandler = async (_, ctx, _) =>
{
Assert.True(ctx.Timestamp >= before.Offset);
if (ctx.Offset == 99)
{
testPassed.SetResult(true);
}

await Task.CompletedTask;
}
});
new Utils<bool>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
await consumer.Close().ConfigureAwait(false);
await SystemUtils.CleanUpStreamSystem(system, stream);
}
}
}
4 changes: 2 additions & 2 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void MessageConfirmationShouldHaveTheSameMessages()
var message = new Message(Encoding.UTF8.GetBytes($"hello"));
confirmationPipe.AddUnConfirmedMessage(1, message);
confirmationPipe.AddUnConfirmedMessage(2, new List<Message>() { message });
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null);
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null);
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null).ConfigureAwait(false);
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null).ConfigureAwait(false);
new Utils<List<MessagesConfirmation>>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask);
Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status);
Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status);
Expand Down