|
17 | 17 | package com.mongodb.reactivestreams.client.internal.vault;
|
18 | 18 |
|
19 | 19 | import com.mongodb.ClientEncryptionSettings;
|
| 20 | +import com.mongodb.MongoClientSettings; |
| 21 | +import com.mongodb.MongoConfigurationException; |
20 | 22 | import com.mongodb.MongoNamespace;
|
| 23 | +import com.mongodb.MongoUpdatedEncryptedFieldsException; |
21 | 24 | import com.mongodb.ReadConcern;
|
22 | 25 | import com.mongodb.WriteConcern;
|
| 26 | +import com.mongodb.client.model.CreateCollectionOptions; |
| 27 | +import com.mongodb.client.model.CreateEncryptedCollectionParams; |
23 | 28 | import com.mongodb.client.model.Filters;
|
24 | 29 | import com.mongodb.client.model.UpdateOneModel;
|
25 | 30 | import com.mongodb.client.model.Updates;
|
|
32 | 37 | import com.mongodb.reactivestreams.client.MongoClient;
|
33 | 38 | import com.mongodb.reactivestreams.client.MongoClients;
|
34 | 39 | import com.mongodb.reactivestreams.client.MongoCollection;
|
| 40 | +import com.mongodb.reactivestreams.client.MongoDatabase; |
35 | 41 | import com.mongodb.reactivestreams.client.internal.crypt.Crypt;
|
36 | 42 | import com.mongodb.reactivestreams.client.internal.crypt.Crypts;
|
37 | 43 | import com.mongodb.reactivestreams.client.vault.ClientEncryption;
|
38 | 44 | import org.bson.BsonArray;
|
39 | 45 | import org.bson.BsonBinary;
|
40 | 46 | import org.bson.BsonDocument;
|
| 47 | +import org.bson.BsonNull; |
41 | 48 | import org.bson.BsonString;
|
42 | 49 | import org.bson.BsonValue;
|
| 50 | +import org.bson.codecs.configuration.CodecRegistry; |
43 | 51 | import org.bson.conversions.Bson;
|
44 | 52 | import org.reactivestreams.Publisher;
|
| 53 | +import reactor.core.publisher.Flux; |
45 | 54 | import reactor.core.publisher.Mono;
|
46 | 55 |
|
47 | 56 | import java.util.List;
|
| 57 | +import java.util.Objects; |
| 58 | +import java.util.concurrent.atomic.AtomicBoolean; |
48 | 59 | import java.util.stream.Collectors;
|
49 | 60 |
|
| 61 | +import static com.mongodb.assertions.Assertions.notNull; |
50 | 62 | import static com.mongodb.internal.capi.MongoCryptHelper.validateRewrapManyDataKeyOptions;
|
| 63 | +import static java.lang.String.format; |
51 | 64 | import static java.util.Arrays.asList;
|
52 | 65 | import static java.util.Collections.singletonList;
|
53 | 66 |
|
@@ -183,6 +196,79 @@ public Publisher<RewrapManyDataKeyResult> rewrapManyDataKey(final Bson filter, f
|
183 | 196 | }));
|
184 | 197 | }
|
185 | 198 |
|
| 199 | + @Override |
| 200 | + public Publisher<BsonDocument> createEncryptedCollection(final MongoDatabase database, final String collectionName, |
| 201 | + final CreateCollectionOptions createCollectionOptions, final CreateEncryptedCollectionParams createEncryptedCollectionParams) { |
| 202 | + notNull("collectionName", collectionName); |
| 203 | + notNull("createCollectionOptions", createCollectionOptions); |
| 204 | + notNull("createEncryptedCollectionParams", createEncryptedCollectionParams); |
| 205 | + MongoNamespace namespace = new MongoNamespace(database.getName(), collectionName); |
| 206 | + Bson rawEncryptedFields = createCollectionOptions.getEncryptedFields(); |
| 207 | + if (rawEncryptedFields == null) { |
| 208 | + throw new MongoConfigurationException(format("`encryptedFields` is not configured for the collection %s.", namespace)); |
| 209 | + } |
| 210 | + CodecRegistry codecRegistry = options.getKeyVaultMongoClientSettings() == null |
| 211 | + ? MongoClientSettings.getDefaultCodecRegistry() |
| 212 | + : options.getKeyVaultMongoClientSettings().getCodecRegistry(); |
| 213 | + BsonDocument encryptedFields = rawEncryptedFields.toBsonDocument(BsonDocument.class, codecRegistry); |
| 214 | + BsonValue fields = encryptedFields.get("fields"); |
| 215 | + if (fields != null && fields.isArray()) { |
| 216 | + String kmsProvider = createEncryptedCollectionParams.getKmsProvider(); |
| 217 | + DataKeyOptions dataKeyOptions = new DataKeyOptions(); |
| 218 | + BsonDocument masterKey = createEncryptedCollectionParams.getMasterKey(); |
| 219 | + if (masterKey != null) { |
| 220 | + dataKeyOptions.masterKey(masterKey); |
| 221 | + } |
| 222 | + String keyIdBsonKey = "keyId"; |
| 223 | + return Mono.defer(() -> { |
| 224 | + // `Mono.defer` results in `maybeUpdatedEncryptedFields` and `dataKeyMightBeCreated` (mutable state) |
| 225 | + // being created once per `Subscriber`, which allows the produced `Mono` to support multiple `Subscribers`. |
| 226 | + BsonDocument maybeUpdatedEncryptedFields = encryptedFields.clone(); |
| 227 | + AtomicBoolean dataKeyMightBeCreated = new AtomicBoolean(); |
| 228 | + Iterable<Mono<BsonDocument>> publishersOfUpdatedFields = () -> maybeUpdatedEncryptedFields.get("fields").asArray() |
| 229 | + .stream() |
| 230 | + .filter(BsonValue::isDocument) |
| 231 | + .map(BsonValue::asDocument) |
| 232 | + .filter(field -> field.containsKey(keyIdBsonKey)) |
| 233 | + .filter(field -> Objects.equals(field.get(keyIdBsonKey), BsonNull.VALUE)) |
| 234 | + // here we rely on the `createDataKey` publisher being cold, i.e., doing nothing until it is subscribed to |
| 235 | + .map(field -> Mono.fromDirect(createDataKey(kmsProvider, dataKeyOptions)) |
| 236 | + // This is the closest we can do with reactive streams to setting the `dataKeyMightBeCreated` flag |
| 237 | + // immediately before calling `createDataKey`. |
| 238 | + .doOnSubscribe(subscription -> dataKeyMightBeCreated.set(true)) |
| 239 | + .doOnNext(dataKeyId -> field.put(keyIdBsonKey, dataKeyId)) |
| 240 | + .map(dataKeyId -> field) |
| 241 | + ) |
| 242 | + .iterator(); |
| 243 | + // `Flux.concat` ensures that data keys are created / fields are updated sequentially one by one |
| 244 | + Flux<BsonDocument> publisherOfUpdatedFields = Flux.concat(publishersOfUpdatedFields); |
| 245 | + return publisherOfUpdatedFields |
| 246 | + // All write actions in `doOnNext` above happen-before the completion (`onComplete`/`onError`) signals |
| 247 | + // for this publisher, because all signals are serial. `thenEmpty` further guarantees that the completion signal |
| 248 | + // for this publisher happens-before the `onSubscribe` signal for the publisher passed to it |
| 249 | + // (the next publisher, which creates a collection). |
| 250 | + // `defer` defers calling `createCollection` until the next publisher is subscribed to. |
| 251 | + // Therefore, all write actions in `doOnNext` above happen-before the invocation of `createCollection`, |
| 252 | + // which means `createCollection` is guaranteed to observe all those write actions, i.e., |
| 253 | + // it is guaranteed to observe the updated document via the `maybeUpdatedEncryptedFields` reference. |
| 254 | + // |
| 255 | + // Similarly, the `Subscriber` of the returned `Publisher` is guaranteed to observe all those write actions |
| 256 | + // via the `maybeUpdatedEncryptedFields` reference, which is emitted as a result of `thenReturn`. |
| 257 | + .thenEmpty(Mono.defer(() -> Mono.fromDirect(database.createCollection(collectionName, |
| 258 | + new CreateCollectionOptions(createCollectionOptions).encryptedFields(maybeUpdatedEncryptedFields)))) |
| 259 | + ) |
| 260 | + .onErrorMap(e -> dataKeyMightBeCreated.get(), e -> |
| 261 | + new MongoUpdatedEncryptedFieldsException(maybeUpdatedEncryptedFields, |
| 262 | + format("Failed to create %s.", namespace), e) |
| 263 | + ) |
| 264 | + .thenReturn(maybeUpdatedEncryptedFields); |
| 265 | + }); |
| 266 | + } else { |
| 267 | + return Mono.fromDirect(database.createCollection(collectionName, createCollectionOptions)) |
| 268 | + .thenReturn(encryptedFields); |
| 269 | + } |
| 270 | + } |
| 271 | + |
186 | 272 | @Override
|
187 | 273 | public void close() {
|
188 | 274 | keyVaultClient.close();
|
|
0 commit comments