@@ -58,13 +58,14 @@ public OAuth2Options(Mode mode)
58
58
public int TokenExpiresInSeconds => 60 ;
59
59
}
60
60
61
- public class TestOAuth2
61
+ public class TestOAuth2 : IAsyncLifetime
62
62
{
63
63
private const string Exchange = "test_direct" ;
64
64
65
65
private readonly AutoResetEvent _doneEvent = new AutoResetEvent ( false ) ;
66
66
private readonly ITestOutputHelper _testOutputHelper ;
67
- private readonly IConnection _connection ;
67
+ private readonly IConnectionFactory _connectionFactory ;
68
+ private IConnection _connection ;
68
69
private readonly int _tokenExpiresInSeconds ;
69
70
70
71
public TestOAuth2 ( ITestOutputHelper testOutputHelper )
@@ -75,61 +76,76 @@ public TestOAuth2(ITestOutputHelper testOutputHelper)
75
76
Mode mode = ( Mode ) Enum . Parse ( typeof ( Mode ) , modeStr . ToLowerInvariant ( ) ) ;
76
77
var options = new OAuth2Options ( mode ) ;
77
78
78
- var connectionFactory = new ConnectionFactory
79
+ _connectionFactory = new ConnectionFactory
79
80
{
80
81
AutomaticRecoveryEnabled = true ,
82
+ DispatchConsumersAsync = true ,
81
83
CredentialsProvider = GetCredentialsProvider ( options ) ,
82
84
CredentialsRefresher = GetCredentialsRefresher ( ) ,
83
85
ClientProvidedName = nameof ( TestOAuth2 )
84
86
} ;
85
87
86
- _connection = connectionFactory . CreateConnection ( ) ;
87
88
_tokenExpiresInSeconds = options . TokenExpiresInSeconds ;
88
89
}
89
90
91
+ public async Task InitializeAsync ( )
92
+ {
93
+ _connection = await _connectionFactory . CreateConnectionAsync ( ) ;
94
+ }
95
+
96
+ public async Task DisposeAsync ( )
97
+ {
98
+ await _connection . CloseAsync ( ) ;
99
+ _connection . Dispose ( ) ;
100
+ }
101
+
90
102
[ Fact ]
91
103
public async void IntegrationTest ( )
92
104
{
93
- using ( _connection )
105
+ using ( IChannel publishChannel = await DeclarePublisherAsync ( ) )
106
+ using ( IChannel consumeChannel = await DeclareConsumerAsync ( ) )
94
107
{
95
- using ( IChannel publisher = declarePublisher ( ) )
96
- using ( IChannel subscriber = await declareConsumer ( ) )
97
- {
98
- await Publish ( publisher ) ;
99
- Consume ( subscriber ) ;
108
+ await PublishAsync ( publishChannel ) ;
109
+ Consume ( consumeChannel ) ;
100
110
101
- if ( _tokenExpiresInSeconds > 0 )
111
+ if ( _tokenExpiresInSeconds > 0 )
112
+ {
113
+ for ( int i = 0 ; i < 4 ; i ++ )
102
114
{
103
- for ( int i = 0 ; i < 4 ; i ++ )
104
- {
105
- _testOutputHelper . WriteLine ( "Wait until Token expires. Attempt #" + ( i + 1 ) ) ;
115
+ _testOutputHelper . WriteLine ( "Wait until Token expires. Attempt #" + ( i + 1 ) ) ;
106
116
107
- await Task . Delay ( TimeSpan . FromSeconds ( _tokenExpiresInSeconds + 10 ) ) ;
108
- _testOutputHelper . WriteLine ( "Resuming .." ) ;
117
+ await Task . Delay ( TimeSpan . FromSeconds ( _tokenExpiresInSeconds + 10 ) ) ;
118
+ _testOutputHelper . WriteLine ( "Resuming .." ) ;
109
119
110
- await Publish ( publisher ) ;
111
- _doneEvent . Reset ( ) ;
120
+ await PublishAsync ( publishChannel ) ;
121
+ _doneEvent . Reset ( ) ;
112
122
113
- Consume ( subscriber ) ;
114
- }
115
- }
116
- else
117
- {
118
- throw new InvalidOperationException ( ) ;
123
+ Consume ( consumeChannel ) ;
119
124
}
120
125
}
126
+ else
127
+ {
128
+ Assert . Fail ( "_tokenExpiresInSeconds is NOT greater than 0" ) ;
129
+ }
121
130
}
122
131
}
123
132
124
- private IChannel declarePublisher ( )
133
+ [ Fact ]
134
+ public async void SecondConnectionCrashes_GH1429 ( )
135
+ {
136
+ // https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429
137
+ using IConnection secondConnection = await _connectionFactory . CreateConnectionAsync ( ) ;
138
+ }
139
+
140
+ private async Task < IChannel > DeclarePublisherAsync ( )
125
141
{
126
- IChannel publisher = _connection . CreateChannel ( ) ;
127
- publisher . ConfirmSelect ( ) ;
128
- publisher . ExchangeDeclare ( "test_direct" , ExchangeType . Direct , true , false ) ;
142
+ IChannel publisher = await _connection . CreateChannelAsync ( ) ;
143
+ await publisher . ConfirmSelectAsync ( ) ;
144
+ await publisher . ExchangeDeclareAsync ( "test_direct" , ExchangeType . Direct , true , false ) ;
129
145
return publisher ;
130
146
}
131
147
132
- private async Task Publish ( IChannel publisher )
148
+ private async Task PublishAsync ( IChannel publisher )
133
149
{
134
150
const string message = "Hello World!" ;
135
151
@@ -146,7 +162,7 @@ private async Task Publish(IChannel publisher)
146
162
_testOutputHelper . WriteLine ( "Confirmed Sent message" ) ;
147
163
}
148
164
149
- private async ValueTask < IChannel > declareConsumer ( )
165
+ private async ValueTask < IChannel > DeclareConsumerAsync ( )
150
166
{
151
167
IChannel subscriber = _connection . CreateChannel ( ) ;
152
168
await subscriber . QueueDeclareAsync ( queue : "testqueue" , passive : false , true , false , false , arguments : null ) ;
0 commit comments