Skip to content

Commit 2215d20

Browse files
committed
* Fix RPC examples
1 parent b2eb6c5 commit 2215d20

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

dotnet/RPCClient/RPCClient.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,18 @@ public async Task<string> CallAsync(string message,
5858
throw new InvalidOperationException();
5959
}
6060

61-
var props = new BasicProperties();
62-
var correlationId = Guid.NewGuid().ToString();
63-
props.CorrelationId = correlationId;
64-
props.ReplyTo = _replyQueueName;
65-
var messageBytes = Encoding.UTF8.GetBytes(message);
66-
var tcs = new TaskCompletionSource<string>();
61+
string correlationId = Guid.NewGuid().ToString();
62+
var props = new BasicProperties
63+
{
64+
CorrelationId = correlationId,
65+
ReplyTo = _replyQueueName
66+
};
67+
68+
var tcs = new TaskCompletionSource<string>(
69+
TaskCreationOptions.RunContinuationsAsynchronously);
6770
_callbackMapper.TryAdd(correlationId, tcs);
6871

72+
var messageBytes = Encoding.UTF8.GetBytes(message);
6973
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: QUEUE_NAME,
7074
mandatory: true, basicProperties: props, body: messageBytes);
7175

dotnet/RPCServer/RPCServer.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22
using RabbitMQ.Client.Events;
33
using System.Text;
44

5+
const string QUEUE_NAME = "rpc_queue";
6+
57
var factory = new ConnectionFactory { HostName = "localhost" };
68
using var connection = await factory.CreateConnectionAsync();
79
using var channel = await connection.CreateChannelAsync();
810

9-
await channel.QueueDeclareAsync(queue: "rpc_queue", durable: false, exclusive: false,
11+
await channel.QueueDeclareAsync(queue: QUEUE_NAME, durable: false, exclusive: false,
1012
autoDelete: false, arguments: null);
1113

1214
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
1315

1416
var consumer = new AsyncEventingBasicConsumer(channel);
15-
await channel.BasicConsumeAsync("rpc_queue", false, consumer);
16-
Console.WriteLine(" [x] Awaiting RPC requests");
17-
1817
consumer.Received += async (object sender, BasicDeliverEventArgs ea) =>
1918
{
20-
IChannel ch = (IChannel)sender;
19+
AsyncEventingBasicConsumer cons = (AsyncEventingBasicConsumer)sender;
20+
IChannel ch = cons.Channel;
2121
string response = string.Empty;
2222

2323
byte[] body = ea.Body.ToArray();
@@ -48,6 +48,8 @@ await ch.BasicPublishAsync(exchange: string.Empty, routingKey: props.ReplyTo!,
4848
}
4949
};
5050

51+
await channel.BasicConsumeAsync(QUEUE_NAME, false, consumer);
52+
Console.WriteLine(" [x] Awaiting RPC requests");
5153
Console.WriteLine(" Press [enter] to exit.");
5254
Console.ReadLine();
5355

0 commit comments

Comments
 (0)