Skip to content

Commit dd9c52b

Browse files
committed
Implement OIDC auth for async (#1131)
JAVA-4981
1 parent 60d2f06 commit dd9c52b

File tree

17 files changed

+1248
-94
lines changed

17 files changed

+1248
-94
lines changed

driver-core/src/main/com/mongodb/assertions/Assertions.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.mongodb.assertions;
1919

20-
import com.mongodb.internal.async.SingleResultCallback;
2120
import com.mongodb.lang.Nullable;
2221

2322
import java.util.Collection;
@@ -78,25 +77,6 @@ public static <T> Iterable<T> notNullElements(final String name, final Iterable<
7877
return values;
7978
}
8079

81-
/**
82-
* Throw IllegalArgumentException if the value is null.
83-
*
84-
* @param name the parameter name
85-
* @param value the value that should not be null
86-
* @param callback the callback that also is passed the exception if the value is null
87-
* @param <T> the value type
88-
* @return the value
89-
* @throws java.lang.IllegalArgumentException if value is null
90-
*/
91-
public static <T> T notNull(final String name, final T value, final SingleResultCallback<?> callback) {
92-
if (value == null) {
93-
IllegalArgumentException exception = new IllegalArgumentException(name + " can not be null");
94-
callback.onResult(null, exception);
95-
throw exception;
96-
}
97-
return value;
98-
}
99-
10080
/**
10181
* Throw IllegalStateException if the condition if false.
10282
*
@@ -110,22 +90,6 @@ public static void isTrue(final String name, final boolean condition) {
11090
}
11191
}
11292

113-
/**
114-
* Throw IllegalStateException if the condition if false.
115-
*
116-
* @param name the name of the state that is being checked
117-
* @param condition the condition about the parameter to check
118-
* @param callback the callback that also is passed the exception if the condition is not true
119-
* @throws java.lang.IllegalStateException if the condition is false
120-
*/
121-
public static void isTrue(final String name, final boolean condition, final SingleResultCallback<?> callback) {
122-
if (!condition) {
123-
IllegalStateException exception = new IllegalStateException("state should be: " + name);
124-
callback.onResult(null, exception);
125-
throw exception;
126-
}
127-
}
128-
12993
/**
13094
* Throw IllegalArgumentException if the condition if false.
13195
*

driver-core/src/main/com/mongodb/internal/Locks.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.mongodb.internal;
1818

1919
import com.mongodb.MongoInterruptedException;
20+
import com.mongodb.internal.async.AsyncRunnable;
21+
import com.mongodb.internal.async.SingleResultCallback;
2022

2123
import java.util.concurrent.locks.Lock;
2224
import java.util.concurrent.locks.ReentrantLock;
@@ -36,7 +38,23 @@ public static void withLock(final Lock lock, final Runnable action) {
3638
});
3739
}
3840

39-
public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier) {
41+
public static void withLockAsync(final StampedLock lock, final AsyncRunnable runnable,
42+
final SingleResultCallback<Void> callback) {
43+
long stamp;
44+
try {
45+
stamp = lock.writeLockInterruptibly();
46+
} catch (InterruptedException e) {
47+
Thread.currentThread().interrupt();
48+
callback.onResult(null, new MongoInterruptedException("Interrupted waiting for lock", e));
49+
return;
50+
}
51+
52+
runnable.thenAlwaysRunAndFinish(() -> {
53+
lock.unlockWrite(stamp);
54+
}, callback);
55+
}
56+
57+
public static void withLock(final StampedLock lock, final Runnable runnable) {
4058
long stamp;
4159
try {
4260
stamp = lock.writeLockInterruptibly();
@@ -45,7 +63,7 @@ public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier)
4563
throw new MongoInterruptedException("Interrupted waiting for lock", e);
4664
}
4765
try {
48-
return supplier.get();
66+
runnable.run();
4967
} finally {
5068
lock.unlockWrite(stamp);
5169
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
/**
20+
* See tests for usage (AsyncFunctionsTest).
21+
* <p>
22+
* This class is not part of the public API and may be removed or changed at any time
23+
*/
24+
@FunctionalInterface
25+
public interface AsyncConsumer<T> extends AsyncFunction<T, Void> {
26+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.lang.Nullable;
20+
21+
/**
22+
* See tests for usage (AsyncFunctionsTest).
23+
* <p>
24+
* This class is not part of the public API and may be removed or changed at any time
25+
*/
26+
@FunctionalInterface
27+
public interface AsyncFunction<T, R> {
28+
/**
29+
* This should not be called externally, but should be implemented as a
30+
* lambda. To "finish" an async chain, use one of the "finish" methods.
31+
*/
32+
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
33+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.internal.async.function.RetryState;
20+
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
21+
22+
import java.util.function.Predicate;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* See tests for usage (AsyncFunctionsTest).
27+
* <p>
28+
* This class is not part of the public API and may be removed or changed at any time
29+
*/
30+
@FunctionalInterface
31+
public interface AsyncRunnable extends AsyncSupplier<Void>, AsyncConsumer<Void> {
32+
33+
static AsyncRunnable beginAsync() {
34+
return (c) -> c.onResult(null, null);
35+
}
36+
37+
/**
38+
* Must be invoked at end of async chain
39+
* @param runnable the sync code to invoke (under non-exceptional flow)
40+
* prior to the callback
41+
* @param callback the callback provided by the method the chain is used in
42+
*/
43+
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
44+
this.finish((r, e) -> {
45+
if (e != null) {
46+
callback.onResult(null, e);
47+
return;
48+
}
49+
try {
50+
runnable.run();
51+
} catch (Throwable t) {
52+
callback.onResult(null, t);
53+
return;
54+
}
55+
callback.onResult(null, null);
56+
});
57+
}
58+
59+
/**
60+
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
61+
* will always be executed, including on the exceptional path.
62+
* @param runnable the runnable
63+
* @param callback the callback
64+
*/
65+
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
66+
this.finish((r, e) -> {
67+
try {
68+
runnable.run();
69+
} catch (Throwable t) {
70+
if (e != null) {
71+
t.addSuppressed(e);
72+
}
73+
callback.onResult(null, t);
74+
return;
75+
}
76+
callback.onResult(null, e);
77+
});
78+
}
79+
80+
/**
81+
* @param runnable The async runnable to run after this runnable
82+
* @return the composition of this runnable and the runnable, a runnable
83+
*/
84+
default AsyncRunnable thenRun(final AsyncRunnable runnable) {
85+
return (c) -> {
86+
this.unsafeFinish((r, e) -> {
87+
if (e == null) {
88+
runnable.unsafeFinish(c);
89+
} else {
90+
c.onResult(null, e);
91+
}
92+
});
93+
};
94+
}
95+
96+
/**
97+
* @param condition the condition to check
98+
* @param runnable The async runnable to run after this runnable,
99+
* if and only if the condition is met
100+
* @return the composition of this runnable and the runnable, a runnable
101+
*/
102+
default AsyncRunnable thenRunIf(final Supplier<Boolean> condition, final AsyncRunnable runnable) {
103+
return (callback) -> {
104+
this.unsafeFinish((r, e) -> {
105+
if (e != null) {
106+
callback.onResult(null, e);
107+
return;
108+
}
109+
boolean matched;
110+
try {
111+
matched = condition.get();
112+
} catch (Throwable t) {
113+
callback.onResult(null, t);
114+
return;
115+
}
116+
if (matched) {
117+
runnable.unsafeFinish(callback);
118+
} else {
119+
callback.onResult(null, null);
120+
}
121+
});
122+
};
123+
}
124+
125+
/**
126+
* @param supplier The supplier to supply using after this runnable
127+
* @return the composition of this runnable and the supplier, a supplier
128+
* @param <R> The return type of the resulting supplier
129+
*/
130+
default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
131+
return (c) -> {
132+
this.unsafeFinish((r, e) -> {
133+
if (e == null) {
134+
supplier.unsafeFinish(c);
135+
} else {
136+
c.onResult(null, e);
137+
}
138+
});
139+
};
140+
}
141+
142+
/**
143+
* @param runnable the runnable to loop
144+
* @param shouldRetry condition under which to retry
145+
* @return the composition of this, and the looping branch
146+
* @see RetryingAsyncCallbackSupplier
147+
*/
148+
default AsyncRunnable thenRunRetryingWhile(
149+
final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
150+
return thenRun(callback -> {
151+
new RetryingAsyncCallbackSupplier<Void>(
152+
new RetryState(),
153+
(rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure),
154+
cb -> runnable.finish(cb) // finish is required here, to handle exceptions
155+
).get(callback);
156+
});
157+
}
158+
}

0 commit comments

Comments
 (0)