Skip to content

Commit 711a2fa

Browse files
authored
Implement OIDC auth for async (#1131)
JAVA-4981
1 parent b03bfbc commit 711a2fa

File tree

16 files changed

+1252
-97
lines changed

16 files changed

+1252
-97
lines changed

driver-core/src/main/com/mongodb/assertions/Assertions.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.mongodb.assertions;
1919

20-
import com.mongodb.internal.async.SingleResultCallback;
2120
import com.mongodb.lang.Nullable;
2221

2322
import java.util.Collection;
@@ -50,25 +49,6 @@ public static <T> T notNull(final String name, final T value) {
5049
return value;
5150
}
5251

53-
/**
54-
* Throw IllegalArgumentException if the value is null.
55-
*
56-
* @param name the parameter name
57-
* @param value the value that should not be null
58-
* @param callback the callback that also is passed the exception if the value is null
59-
* @param <T> the value type
60-
* @return the value
61-
* @throws java.lang.IllegalArgumentException if value is null
62-
*/
63-
public static <T> T notNull(final String name, final T value, final SingleResultCallback<?> callback) {
64-
if (value == null) {
65-
IllegalArgumentException exception = new IllegalArgumentException(name + " can not be null");
66-
callback.onResult(null, exception);
67-
throw exception;
68-
}
69-
return value;
70-
}
71-
7252
/**
7353
* Throw IllegalStateException if the condition if false.
7454
*
@@ -82,22 +62,6 @@ public static void isTrue(final String name, final boolean condition) {
8262
}
8363
}
8464

85-
/**
86-
* Throw IllegalStateException if the condition if false.
87-
*
88-
* @param name the name of the state that is being checked
89-
* @param condition the condition about the parameter to check
90-
* @param callback the callback that also is passed the exception if the condition is not true
91-
* @throws java.lang.IllegalStateException if the condition is false
92-
*/
93-
public static void isTrue(final String name, final boolean condition, final SingleResultCallback<?> callback) {
94-
if (!condition) {
95-
IllegalStateException exception = new IllegalStateException("state should be: " + name);
96-
callback.onResult(null, exception);
97-
throw exception;
98-
}
99-
}
100-
10165
/**
10266
* Throw IllegalArgumentException if the condition if false.
10367
*

driver-core/src/main/com/mongodb/internal/Locks.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.mongodb.internal;
1818

1919
import com.mongodb.MongoInterruptedException;
20+
import com.mongodb.internal.async.AsyncRunnable;
21+
import com.mongodb.internal.async.SingleResultCallback;
2022

2123
import java.util.concurrent.locks.Lock;
2224
import java.util.concurrent.locks.StampedLock;
@@ -33,7 +35,23 @@ public static void withLock(final Lock lock, final Runnable action) {
3335
});
3436
}
3537

36-
public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier) {
38+
public static void withLockAsync(final StampedLock lock, final AsyncRunnable runnable,
39+
final SingleResultCallback<Void> callback) {
40+
long stamp;
41+
try {
42+
stamp = lock.writeLockInterruptibly();
43+
} catch (InterruptedException e) {
44+
Thread.currentThread().interrupt();
45+
callback.onResult(null, new MongoInterruptedException("Interrupted waiting for lock", e));
46+
return;
47+
}
48+
49+
runnable.thenAlwaysRunAndFinish(() -> {
50+
lock.unlockWrite(stamp);
51+
}, callback);
52+
}
53+
54+
public static void withLock(final StampedLock lock, final Runnable runnable) {
3755
long stamp;
3856
try {
3957
stamp = lock.writeLockInterruptibly();
@@ -42,7 +60,7 @@ public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier)
4260
throw new MongoInterruptedException("Interrupted waiting for lock", e);
4361
}
4462
try {
45-
return supplier.get();
63+
runnable.run();
4664
} finally {
4765
lock.unlockWrite(stamp);
4866
}
@@ -55,15 +73,15 @@ public static <V> V withLock(final Lock lock, final Supplier<V> supplier) {
5573
public static <V, E extends Exception> V checkedWithLock(final Lock lock, final CheckedSupplier<V, E> supplier) throws E {
5674
try {
5775
lock.lockInterruptibly();
58-
try {
59-
return supplier.get();
60-
} finally {
61-
lock.unlock();
62-
}
6376
} catch (InterruptedException e) {
6477
Thread.currentThread().interrupt();
6578
throw new MongoInterruptedException("Interrupted waiting for lock", e);
6679
}
80+
try {
81+
return supplier.get();
82+
} finally {
83+
lock.unlock();
84+
}
6785
}
6886

6987
private Locks() {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
/**
20+
* See tests for usage (AsyncFunctionsTest).
21+
* <p>
22+
* This class is not part of the public API and may be removed or changed at any time
23+
*/
24+
@FunctionalInterface
25+
public interface AsyncConsumer<T> extends AsyncFunction<T, Void> {
26+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.lang.Nullable;
20+
21+
/**
22+
* See tests for usage (AsyncFunctionsTest).
23+
* <p>
24+
* This class is not part of the public API and may be removed or changed at any time
25+
*/
26+
@FunctionalInterface
27+
public interface AsyncFunction<T, R> {
28+
/**
29+
* This should not be called externally, but should be implemented as a
30+
* lambda. To "finish" an async chain, use one of the "finish" methods.
31+
*/
32+
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
33+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.internal.async.function.RetryState;
20+
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
21+
22+
import java.util.function.Predicate;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* See tests for usage (AsyncFunctionsTest).
27+
* <p>
28+
* This class is not part of the public API and may be removed or changed at any time
29+
*/
30+
@FunctionalInterface
31+
public interface AsyncRunnable extends AsyncSupplier<Void>, AsyncConsumer<Void> {
32+
33+
static AsyncRunnable beginAsync() {
34+
return (c) -> c.onResult(null, null);
35+
}
36+
37+
/**
38+
* Must be invoked at end of async chain
39+
* @param runnable the sync code to invoke (under non-exceptional flow)
40+
* prior to the callback
41+
* @param callback the callback provided by the method the chain is used in
42+
*/
43+
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
44+
this.finish((r, e) -> {
45+
if (e != null) {
46+
callback.onResult(null, e);
47+
return;
48+
}
49+
try {
50+
runnable.run();
51+
} catch (Throwable t) {
52+
callback.onResult(null, t);
53+
return;
54+
}
55+
callback.onResult(null, null);
56+
});
57+
}
58+
59+
/**
60+
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
61+
* will always be executed, including on the exceptional path.
62+
* @param runnable the runnable
63+
* @param callback the callback
64+
*/
65+
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
66+
this.finish((r, e) -> {
67+
try {
68+
runnable.run();
69+
} catch (Throwable t) {
70+
if (e != null) {
71+
t.addSuppressed(e);
72+
}
73+
callback.onResult(null, t);
74+
return;
75+
}
76+
callback.onResult(null, e);
77+
});
78+
}
79+
80+
/**
81+
* @param runnable The async runnable to run after this runnable
82+
* @return the composition of this runnable and the runnable, a runnable
83+
*/
84+
default AsyncRunnable thenRun(final AsyncRunnable runnable) {
85+
return (c) -> {
86+
this.unsafeFinish((r, e) -> {
87+
if (e == null) {
88+
runnable.unsafeFinish(c);
89+
} else {
90+
c.onResult(null, e);
91+
}
92+
});
93+
};
94+
}
95+
96+
/**
97+
* @param condition the condition to check
98+
* @param runnable The async runnable to run after this runnable,
99+
* if and only if the condition is met
100+
* @return the composition of this runnable and the runnable, a runnable
101+
*/
102+
default AsyncRunnable thenRunIf(final Supplier<Boolean> condition, final AsyncRunnable runnable) {
103+
return (callback) -> {
104+
this.unsafeFinish((r, e) -> {
105+
if (e != null) {
106+
callback.onResult(null, e);
107+
return;
108+
}
109+
boolean matched;
110+
try {
111+
matched = condition.get();
112+
} catch (Throwable t) {
113+
callback.onResult(null, t);
114+
return;
115+
}
116+
if (matched) {
117+
runnable.unsafeFinish(callback);
118+
} else {
119+
callback.onResult(null, null);
120+
}
121+
});
122+
};
123+
}
124+
125+
/**
126+
* @param supplier The supplier to supply using after this runnable
127+
* @return the composition of this runnable and the supplier, a supplier
128+
* @param <R> The return type of the resulting supplier
129+
*/
130+
default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
131+
return (c) -> {
132+
this.unsafeFinish((r, e) -> {
133+
if (e == null) {
134+
supplier.unsafeFinish(c);
135+
} else {
136+
c.onResult(null, e);
137+
}
138+
});
139+
};
140+
}
141+
142+
/**
143+
* @param runnable the runnable to loop
144+
* @param shouldRetry condition under which to retry
145+
* @return the composition of this, and the looping branch
146+
* @see RetryingAsyncCallbackSupplier
147+
*/
148+
default AsyncRunnable thenRunRetryingWhile(
149+
final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
150+
return thenRun(callback -> {
151+
new RetryingAsyncCallbackSupplier<Void>(
152+
new RetryState(),
153+
(rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure),
154+
cb -> runnable.finish(cb) // finish is required here, to handle exceptions
155+
).get(callback);
156+
});
157+
}
158+
}

0 commit comments

Comments
 (0)