Skip to content

Add the ClientEncryption.createEncryptedCollection helper method #1079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 9, 2023

Conversation

stIncMale
Copy link
Member

@stIncMale stIncMale commented Feb 4, 2023

Links to the relevant spec changes with their status are gathered and being kept updated in this comment. Note especially that some tests that you may want to see for this change are yet to be described in the spec.

JAVA-4679

@@ -183,6 +198,85 @@ public Publisher<RewrapManyDataKeyResult> rewrapManyDataKey(final Bson filter, f
}));
}

@Override
public Publisher<BsonDocument> createEncryptedCollection(final MongoDatabase database, final String collectionName,
Copy link
Member Author

@stIncMale stIncMale Feb 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rozza, your knowledge of the Project Reactor and the reactive streams programming is being needed when reviewing this method.

The intended behavior of the implementation of this method can be peeked either in the synchronous implementation, or in the spec: Create Encrypted Collection Helper.

The thing that made this implementation really difficult for me (besides the lack of knowledge) is:

  • if either createDataKey or createCollection fails, the method must attach to the onError signal the (possibly partially) updated encryptedFields that it would have produced in case of a success.

I achieved the needed behavior by capturing dataKeyMayBeCreated and actualEncryptedFields in the returned Publisher. As far as I understand, the Publisher is constructed such that there are no data races when updating the actualEncryptedFields document (updates are done to the unrelated field documents, which are different elements of a BSON array), and all reads of the updated actualEncryptedFields are ordered after (in the HB order) the updates due to the combination of then and defer methods.

Even if this implementation is correct (at the very least I doubt it is idiomatic), it can't work if there are more than one Subscriber (regardless of whether they exist concurrently with each other or not), which is why I had make the result a one-shot publisher.

.map(dataKey -> new SimpleImmutableEntry<>(field, dataKey))
)
.iterator();
Flux<SimpleImmutableEntry<BsonDocument, BsonBinary>> publisherOfFieldsAndDataKeyIds = Flux.concat(publishersOfFieldsAndDataKeyIds)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a result of Flux.concat all data keys are created sequentially. This way we stop as soon as there is an error, instead possibly continuing to invoke createDataKey despite there already being an error.

Comment on lines 265 to 271
.doOnError(RuntimeException.class, e -> {
if (dataKeyMayBeCreated.get()) {
throw new MongoUpdatedEncryptedFieldsException(actualEncryptedFields, format("Failed to create %s.", namespace), e);
} else {
throw e;
}
});
Copy link
Member Author

@stIncMale stIncMale Feb 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions from doOnError/defer are handled by the Project Reactor such that the reactive streams protocol is not violated. I am not sure what is a better / more idiomatic way of transforming the error.

P.S. While writing this comment, I tried to find a better way again and found Flux.handle, but it can't help because it is for handling emitted items, not errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that doOnError suppresses the original e when we throw MongoUpdatedEncryptedFieldsException, instead of treating the thrown exception as a replacement for the original one. As a result, we have e as both the cause of MongoUpdatedEncryptedFieldsException and an element in the suppressed set, which causes ugly stack-traces and implies that we are abusing doOnError. However, it may be that the Reactor simply does not support what we need.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rozza Thank you for pointing out the onErrorMap method, the emitted exceptions are nice now. There is no excuse for how I failed to find it myself.

Comment on lines 204 to 206
if (!fields.isArray()) {
throw new MongoConfigurationException(format("`encryptedFields` is incorrectly configured for the collection %s."
+ " `encryptedFields.fields` must be an array, but is %s type.", namespace, fields.getBsonType()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is not according to the current spec, and comes from the following:

If there is a concern that we should not throw this exception until (if ever) it is approved as a spec change, I will undo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I undone this part because I realized that the proposal in DRIVERS-2540 is not fully-baked.

@stIncMale stIncMale requested review from rozza and katcharov February 4, 2023 02:13
Comment on lines 208 to 212
return OneShotPublisher.from(Mono.defer(() -> {
MongoNamespace namespace = new MongoNamespace(database.getName(), collectionName);
Bson rawEncryptedFields = createCollectionOptions.getEncryptedFields();
if (rawEncryptedFields == null) {
throw new MongoConfigurationException(format("`encryptedFields` is not configured for the collection %s.", namespace));
Copy link
Member Author

@stIncMale stIncMale Feb 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of throwing here, it is likely more idiomatic to return Mono.error, given that we are in the Supplier<Mono> that is passed to Mono.defer. However, if we are to follow this approach consistently, the whole Supplier<Mono> passed to Mono.defer should be wrapped in a try statement that catches all non-fatal exceptions and returns Mono.error instead of throwing them.

But I see no reason to do that:

  1. it is one more thing we should always remember to do;
  2. it arguably makes the code less readable;
  3. why do that when the Project Reactor already wraps the Supplier<Mono> passed to Mono.defer in the try statement and signals onSubscribe followed by onError for us?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this work outside of the construction of the publisher. That way it follows the general validation eg notNull checks and reduces the code in the callable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had it like that, but then noticed that the other MongoConfigurationException we throw if fields is not an array, should be signalled via the publisher: we need to encode the encryptedFields config to BsonDocument, and this does not seem anymore like just an argument check.

Reporting MongoConfigurationException from the same method in two different ways (throw or signal onError) seems weird, so I decided to also signal the first MongoConfigurationException that you commented on. Given this consideration, does the code seem better now?

Copy link
Member Author

@stIncMale stIncMale Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your experimental code here, I infer that you still think that failures to render Bson should be treated as fatal and not signalled via the returned publisher. So I did this work outside of the construction of the publisher as you originally requested.

Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments.

I think there may be a more "functional" way with the reactive streams approach. But I liked the procedural nature of it and how it follows the sync version 👍

Comment on lines 204 to 206
if (!fields.isArray()) {
throw new MongoConfigurationException(format("`encryptedFields` is incorrectly configured for the collection %s."
+ " `encryptedFields.fields` must be an array, but is %s type.", namespace, fields.getBsonType()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Comment on lines 208 to 212
return OneShotPublisher.from(Mono.defer(() -> {
MongoNamespace namespace = new MongoNamespace(database.getName(), collectionName);
Bson rawEncryptedFields = createCollectionOptions.getEncryptedFields();
if (rawEncryptedFields == null) {
throw new MongoConfigurationException(format("`encryptedFields` is not configured for the collection %s.", namespace));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this work outside of the construction of the publisher. That way it follows the general validation eg notNull checks and reduces the code in the callable.

dataKeyOptions.masterKey(masterKey);
}
String keyIdBsonKey = "keyId";
// any mutable non-thread-safe Boolean should do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this comment. What is the non-thread-safe boolean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Do you find the updated text clearer?

@stIncMale stIncMale requested review from rozza and katcharov February 7, 2023 01:33
@stIncMale
Copy link
Member Author

@rozza

I think there may be a more "functional" way with the reactive streams approach.

What is that approach? I am most of all interested in how to attach the data key IDs to MongoUpdatedEncryptedFieldsException with a functional approach?

JAVA-4679
Copy link
Collaborator

@katcharov katcharov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@rozza
Copy link
Member

rozza commented Feb 7, 2023

@rozza

I think there may be a more "functional" way with the reactive streams approach.

What is that approach? I am most of all interested in how to attach the data key IDs to MongoUpdatedEncryptedFieldsException with a functional approach?

Me and my big mouth - turned out to be a fun exercise! Here is my attempt of a more functional approach:

 @Override
    public Publisher<BsonDocument> createEncryptedCollection(final MongoDatabase database, final String collectionName,
            final CreateCollectionOptions createCollectionOptions, final CreateEncryptedCollectionParams createEncryptedCollectionParams) {
        notNull("collectionName", collectionName);
        notNull("createCollectionOptions", createCollectionOptions);
        notNull("createEncryptedCollectionParams", createEncryptedCollectionParams);

        MongoNamespace namespace = new MongoNamespace(database.getName(), collectionName);
        Bson rawEncryptedFields = createCollectionOptions.getEncryptedFields();
        if (rawEncryptedFields == null) {
            throw new MongoConfigurationException(format("`encryptedFields` is not configured for the collection %s.", namespace));
        }
        CodecRegistry codecRegistry = options.getKeyVaultMongoClientSettings() == null
                ? MongoClientSettings.getDefaultCodecRegistry()
                : options.getKeyVaultMongoClientSettings().getCodecRegistry();

        BsonDocument encryptedFields = rawEncryptedFields.toBsonDocument(BsonDocument.class, codecRegistry);
        BsonValue fields = encryptedFields.get("fields");
        if (fields != null) {
            if (!fields.isArray()) {
                throw new MongoConfigurationException(format("`encryptedFields` is incorrectly configured for the collection %s."
                        + " `encryptedFields.fields` must be an array, but is %s type.", namespace, fields.getBsonType()));
            }

            String kmsProvider = createEncryptedCollectionParams.getKmsProvider();
            DataKeyOptions dataKeyOptions = new DataKeyOptions();
            BsonDocument masterKey = createEncryptedCollectionParams.getMasterKey();
            if (masterKey != null) {
                dataKeyOptions.masterKey(masterKey);
            }
            String keyIdBsonKey = "keyId";

            return Flux.concat(fields.asArray()
                            .stream()
                            .map(field -> {
                                Mono<BsonValue> newField = Mono.fromSupplier(() -> field);
                                if (field.isDocument()) {
                                    BsonDocument fieldDocument = field.asDocument();
                                    if (fieldDocument.get(keyIdBsonKey, BsonBoolean.FALSE).equals(BsonNull.VALUE)) {
                                        newField = Mono.fromDirect(createDataKey(kmsProvider, dataKeyOptions))
                                                .map(dataKeyId -> (BsonValue) fieldDocument.append(keyIdBsonKey, dataKeyId))
                                                .contextWrite(ctx -> ctx.put("createdDataKey", "true"));
                                    }
                                }
                                return newField;
                            })
                            .collect(Collectors.toList()))
                    .collectList()
                    .map(fieldsArray -> encryptedFields.clone().append("fields", new BsonArray(fieldsArray)))
                    .flatMap(actualEncryptedFields ->
                            Mono.deferContextual(ctx ->
                                    Mono.from(database.createCollection(collectionName,
                                                    createCollectionOptions.clone().encryptedFields(actualEncryptedFields)))
                                            .doOnError(t -> ctx.hasKey("createdDataKey"), e -> {
                                                throw new MongoUpdatedEncryptedFieldsException(actualEncryptedFields,
                                                        format("Failed to create %s.", namespace), e);
                                            })
                                            .thenReturn(actualEncryptedFields))
                    );
        }
        return Mono.from(database.createCollection(collectionName, createCollectionOptions))
                .thenReturn(encryptedFields);
    }
  • User input validation is done outside of the Publisher.
  • All side effects are kept within the bounds of the Publishers and propagated downstream for consumption by later Publishers.
  • I had to use the Context-Propagation to capture if I had called createDataKey for "Case 3: Invalid keyId". Not used that before but it is handy.
  • This should remove the need for using the oneShotPublisher - although like 99% of our Publishers its nonsensical to subscribe multiple times.

@stIncMale
Copy link
Member Author

stIncMale commented Feb 9, 2023

@rozza Thanks! I had to read-up about Context.

All side effects are kept within the bounds of the Publishers and propagated downstream for consumption by later Publishers.
I had to use the Context-Propagation to capture if I had called createDataKey

Context can't be used to propagate information downstream because it only propagates information upstream (from a subscriber through the chain of operators to the source of the data). I read about this in the section Adding a Context to a Reactive Sequence.

I also experimented just in case, and as expected, the context entry with the createdDataKey key is not propagated downstream.


I additionally have doubts about this code:

...
.collectList()
.map(fieldsArray -> encryptedFields.clone().append("fields", new BsonArray(fieldsArray)))
.flatMap(actualEncryptedFields -> ...

My understanding is that the function provided to map is called only if collectList() emits a list, which happens only if there were no errors. On top of that, collectList is documented as "discards the elements in the List upon cancellation or error triggered by a data signal."


However, I realized that the publisher indeed does not have to be one-shot because the mutable state it captures is created once per Subscriber by the Supplier passed to Mono.defer.

I took your idea to use doOnError(Predicate, Consumer).

- treat errors while encoding `rawEncryptedFields` as fatal, i.e., do that outside of the publisher pipeline;
- remove the one-shot publisher as it's not needed;
- simplify the `doOnError` call;
- undo the change proposed by DRIVERS-2540.

JAVA-4679
@stIncMale stIncMale requested a review from rozza February 9, 2023 01:28
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

I had a couple of thoughts regarding your comments - but they don't require any actioning:

Context can't be used to propagate information downstream because it only propagates information upstream

According to the context.write docs:

The Context is propagated from the bottom of the chain towards the top.

So I read that as in order of the chain (operations), the bottom being the publishersOfUpdatedFields in your code and the top being the final Mono.

This makes sense to me as, I used the context in order to get the invalidKeyIdtest to pass which was failing when not using the context.

My understanding is that the function provided to map is called only if collectList() emits a list, which happens only if there were no errors. On top of that, collectList is documented as "discards the elements in the List upon cancellation or error triggered by a data signal."

One to check, if the fields list is empty, then collectList will either emit an empty list or just complete. The map stage becomes a noop either way and will complete without any changes.

The impact of an error or cancellation signals would be the same for either approach. An error in any stage will bubble up to the top and be caught by any error handlers. I think the same for cancellation signals as well.

@stIncMale
Copy link
Member Author

@rozza, for the sake of knowledge sharing, I have to disagree:

The Context is propagated from the bottom of the chain towards the top.
So I read that as in order of the chain (operations), the bottom being the publishersOfUpdatedFields in your code and the top being the final Mono.

The following example from the docs:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) 
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); 

StepVerifier.create(r)
            .expectNext("Hello Stranger") 
            .verifyComplete();

demonstrates what "from the bottom of the chain towards the top" mean. If it worked according to your interpretation, the emitted string would have been "Hello World", but it is "Hello Stranger".


I used the context in order to get the invalidKeyIdtest to pass which was failing when not using the context.

The invalidKeyId test neither triggers nor checks for MongoUpdatedEncryptedFieldsException, so even if we remove the doOnError call from either your or my implementation, it passes.


One to check, if the fields list is empty, then collectList will either emit an empty list or just complete. The map stage becomes a noop either way and will complete without any changes.

The impact of an error or cancellation signals would be the same for either approach. An error in any stage will bubble up to the top and be caught by any error handlers. I think the same for cancellation signals as well.

As I mentioned in the PR description, not all needed tests are done in the spec, the spec change in DRIVERS-2538 is still not done. However, we can experiment by temporarily changing the code outside of the createEncryptedCollection method.

  1. If we modify ClientEncryptionImpl.createDataKey such that it throws an exception, createEncryptedCollection must emit MongoUpdatedEncryptedFieldsException. The implementation proposed in Add the ClientEncryption.createEncryptedCollection helper method #1079 (comment) does not do that, while the implementation in the PR does.

  2. If we modify ClientEncryptionImpl.createDataKey such that it throws an exception the second time it's called, and modify the insertEncryptedValue such that it has two elements in fields with keyId: null, then not only createEncryptedCollection must emit MongoUpdatedEncryptedFieldsException, but also MongoUpdatedEncryptedFieldsException.getEncryptedFields must report the key ID of the key that was created successfully. Again, the proposed implementation does not do that. Moreover, the function passed into flatMap is never called in this scenario, i.e., ctx never comes into play.

    I made these modifications in the JAVA-4679_experiments branch (commit stIncMale@8ba7938) in case you want to try yourself.

@stIncMale stIncMale merged commit 59585de into mongodb:master Feb 9, 2023
@stIncMale stIncMale deleted the JAVA-4679 branch February 9, 2023 16:22
@stIncMale stIncMale self-assigned this Feb 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants