Skip to content

Commit 4662572

Browse files
authored
fix: update Grpc Write implementation to allow specifying expected md5 (#1815)
Remove Hasher.Constant. StartResumableWriteRequest has been updated to allow specifying `object_checksums` when creating the session. Add several new positive and negative integration test for md5 verification
1 parent 2eec791 commit 4662572

File tree

11 files changed

+157
-76
lines changed

11 files changed

+157
-76
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,9 @@
22
<!-- see https://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
33
<differences>
44

5+
<difference>
6+
<differenceType>8001</differenceType>
7+
<className>com/google/cloud/storage/Hasher$ConstantConcatValueHasher</className>
8+
</difference>
9+
510
</differences>

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.storage.v2.StartResumableWriteResponse;
2626
import com.google.storage.v2.WriteObjectRequest;
2727
import com.google.storage.v2.WriteObjectResponse;
28+
import java.util.function.Function;
2829

2930
final class GapicUploadSessionBuilder {
3031

@@ -49,8 +50,16 @@ ApiFuture<ResumableWrite> resumableWrite(
4950
if (writeObjectRequest.hasCommonObjectRequestParams()) {
5051
b.setCommonObjectRequestParams(writeObjectRequest.getCommonObjectRequestParams());
5152
}
53+
if (writeObjectRequest.hasObjectChecksums()) {
54+
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
55+
}
5256
StartResumableWriteRequest req = b.build();
57+
Function<String, WriteObjectRequest> f =
58+
uploadId ->
59+
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
5360
return ApiFutures.transform(
54-
x.futureCall(req), (resp) -> new ResumableWrite(req, resp), MoreExecutors.directExecutor());
61+
x.futureCall(req),
62+
(resp) -> new ResumableWrite(req, resp, f),
63+
MoreExecutors.directExecutor());
5564
}
5665
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import com.google.storage.v2.LockBucketRetentionPolicyRequest;
9696
import com.google.storage.v2.Object;
9797
import com.google.storage.v2.ObjectAccessControl;
98+
import com.google.storage.v2.ObjectChecksums;
9899
import com.google.storage.v2.ProjectName;
99100
import com.google.storage.v2.ReadObjectRequest;
100101
import com.google.storage.v2.RewriteObjectRequest;
@@ -221,6 +222,7 @@ public Blob create(
221222
GrpcCallContext grpcCallContext =
222223
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
223224
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
225+
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
224226
return Retrying.run(
225227
getOptions(),
226228
retryAlgorithmManager.getFor(req),
@@ -231,7 +233,7 @@ public Blob create(
231233
.byteChannel(
232234
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
233235
.setByteStringStrategy(ByteStringStrategy.noCopy())
234-
.setHasher(Hasher.enabled())
236+
.setHasher(hasher)
235237
.direct()
236238
.unbuffered()
237239
.setRequest(req)
@@ -273,10 +275,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
273275
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
274276
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
275277

276-
Hasher hasher = Hasher.enabled();
277-
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
278-
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
279-
}
278+
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
280279
GapicWritableByteChannelSessionBuilder channelSessionBuilder =
281280
ResumableMedia.gapic()
282281
.write()
@@ -346,10 +345,7 @@ public Blob createFrom(
346345

347346
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
348347

349-
Hasher hasher = Hasher.enabled();
350-
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
351-
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
352-
}
348+
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
353349
BufferedWritableByteChannelSession<WriteObjectResponse> session =
354350
ResumableMedia.gapic()
355351
.write()
@@ -736,10 +732,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
736732
GrpcCallContext grpcCallContext =
737733
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
738734
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
739-
Hasher hasher = Hasher.noop();
740-
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
741-
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
742-
}
735+
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
743736
return new GrpcBlobWriteChannel(
744737
storageClient.writeObjectCallable(),
745738
getOptions(),
@@ -1789,4 +1782,17 @@ private Object updateObject(UpdateObjectRequest req) {
17891782
() -> storageClient.updateObjectCallable().call(req, grpcCallContext),
17901783
Decoder.identity());
17911784
}
1785+
1786+
private static Hasher getHasherForRequest(WriteObjectRequest req, Hasher defaultHasher) {
1787+
if (!req.hasObjectChecksums()) {
1788+
return defaultHasher;
1789+
} else {
1790+
ObjectChecksums checksums = req.getObjectChecksums();
1791+
if (!checksums.hasCrc32C() && checksums.getMd5Hash().isEmpty()) {
1792+
return defaultHasher;
1793+
} else {
1794+
return Hasher.noop();
1795+
}
1796+
}
1797+
}
17921798
}

google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,6 @@ static Hasher enabled() {
4747
return GuavaHasher.INSTANCE;
4848
}
4949

50-
/**
51-
* Create a Hasher which will always yield the specified value when {@link
52-
* #nullSafeConcat(Crc32cLengthKnown, Crc32cLengthKnown)} is invoked.
53-
*/
54-
// Not perfect, and not a great approach for a public API. However, this is the most pragmatic way
55-
// right now to wire an externally defined value all the way down to the last write message of a
56-
// resumable upload session.
57-
static Hasher constant(int crc32c) {
58-
return new ConstantConcatValueHasher(Crc32cValue.of(crc32c, -1));
59-
}
60-
6150
@Immutable
6251
class NoOpHasher implements Hasher {
6352
private static final NoOpHasher INSTANCE = new NoOpHasher();
@@ -112,26 +101,4 @@ public Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown
112101
}
113102
}
114103
}
115-
116-
@Immutable
117-
class ConstantConcatValueHasher implements Hasher {
118-
private final Crc32cLengthKnown value;
119-
120-
private ConstantConcatValueHasher(Crc32cLengthKnown value) {
121-
this.value = value;
122-
}
123-
124-
@Override
125-
public @Nullable Crc32cLengthKnown hash(ByteBuffer b) {
126-
return null;
127-
}
128-
129-
@Override
130-
public void validate(Crc32cValue<?> expected, Supplier<ByteBuffer> b) {}
131-
132-
@Override
133-
public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) {
134-
return value;
135-
}
136-
}
137104
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableWrite.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,13 @@ final class ResumableWrite implements WriteObjectRequestBuilderFactory {
3333

3434
private final WriteObjectRequest writeRequest;
3535

36-
public ResumableWrite(StartResumableWriteRequest req, StartResumableWriteResponse res) {
36+
public ResumableWrite(
37+
StartResumableWriteRequest req,
38+
StartResumableWriteResponse res,
39+
Function<String, WriteObjectRequest> f) {
3740
this.req = req;
3841
this.res = res;
39-
WriteObjectRequest.Builder b = WriteObjectRequest.newBuilder().setUploadId(res.getUploadId());
40-
if (req.hasCommonObjectRequestParams()) {
41-
b.setCommonObjectRequestParams(req.getCommonObjectRequestParams());
42-
}
43-
this.writeRequest = b.build();
42+
this.writeRequest = f.apply(res.getUploadId());
4443
}
4544

4645
public StartResumableWriteRequest getReq() {

google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ public static BlobWriteOption metagenerationNotMatch() {
774774
* @deprecated Please compute and use a crc32c checksum instead. {@link #crc32cMatch()}
775775
*/
776776
@Deprecated
777-
@TransportCompatibility(Transport.HTTP)
777+
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
778778
public static BlobWriteOption md5Match() {
779779
return new BlobWriteOption(UnifiedOpts.md5MatchExtractor());
780780
}

google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,15 @@ public boolean equals(Object o) {
10961096
return Objects.equals(val, md5Match.val);
10971097
}
10981098

1099+
@Override
1100+
public Mapper<WriteObjectRequest.Builder> writeObject() {
1101+
return b -> {
1102+
b.getObjectChecksumsBuilder()
1103+
.setMd5Hash(ByteString.copyFrom(BaseEncoding.base64().decode(val)));
1104+
return b;
1105+
};
1106+
}
1107+
10991108
@Override
11001109
public int hashCode() {
11011110
return Objects.hash(val);

google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ private static GrpcCallContext contextWithBucketName(String bucketName) {
8989
return ret;
9090
}
9191

92+
/**
93+
* Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
94+
* this utility method centralizes the logic necessary to clear those fields for use by subsequent
95+
* messages.
96+
*/
97+
private static WriteObjectRequest possiblyPairDownRequest(
98+
WriteObjectRequest message, boolean firstMessageOfStream) {
99+
if (firstMessageOfStream && message.getWriteOffset() == 0) {
100+
return message;
101+
}
102+
103+
WriteObjectRequest.Builder b = message.toBuilder();
104+
if (!firstMessageOfStream) {
105+
b.clearUploadId();
106+
}
107+
108+
if (message.getWriteOffset() > 0) {
109+
b.clearWriteObjectSpec().clearObjectChecksums();
110+
}
111+
return b.build();
112+
}
113+
92114
@FunctionalInterface
93115
interface FlusherFactory {
94116
/**
@@ -144,9 +166,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {
144166

145167
boolean first = true;
146168
for (WriteObjectRequest message : segments) {
147-
if (!first) {
148-
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
149-
}
169+
message = possiblyPairDownRequest(message, first);
150170

151171
write.onNext(message);
152172
first = false;
@@ -188,9 +208,7 @@ private FsyncOnClose(
188208
public void flush(@NonNull List<WriteObjectRequest> segments) {
189209
ensureOpen();
190210
for (WriteObjectRequest message : segments) {
191-
if (!first) {
192-
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
193-
}
211+
message = possiblyPairDownRequest(message, first);
194212

195213
stream.onNext(message);
196214
first = false;
@@ -201,9 +219,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {
201219
public void close(@Nullable WriteObjectRequest message) {
202220
ensureOpen();
203221
if (message != null) {
204-
if (!first) {
205-
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
206-
}
222+
message = possiblyPairDownRequest(message, first);
207223
stream.onNext(message);
208224
}
209225
stream.onCompleted();

google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannelTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public final class GapicUnbufferedWritableByteChannelTest {
105105
WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build();
106106

107107
private static final WriteObjectRequestBuilderFactory reqFactory =
108-
new ResumableWrite(startReq, startResp);
108+
new ResumableWrite(startReq, startResp, TestUtils.onlyUploadId());
109109

110110
@Test
111111
public void directUpload() throws IOException, InterruptedException, ExecutionException {

google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.protobuf.ByteString;
3838
import com.google.rpc.DebugInfo;
3939
import com.google.storage.v2.ChecksummedData;
40+
import com.google.storage.v2.WriteObjectRequest;
4041
import io.grpc.Status.Code;
4142
import io.grpc.StatusRuntimeException;
4243
import java.io.ByteArrayOutputStream;
@@ -49,6 +50,7 @@
4950
import java.util.stream.IntStream;
5051
import java.util.stream.Stream;
5152
import java.util.zip.GZIPOutputStream;
53+
import org.checkerframework.checker.nullness.qual.NonNull;
5254
import org.checkerframework.checker.nullness.qual.Nullable;
5355

5456
public final class TestUtils {
@@ -179,4 +181,13 @@ public boolean shouldRetry(Throwable previousThrowable, Object previousResponse)
179181
}
180182
}
181183
}
184+
185+
/**
186+
* Return a function which when provided an {@code uploadId} will create a {@link
187+
* WriteObjectRequest} with that {@code uploadId}
188+
*/
189+
@NonNull
190+
public static Function<String, WriteObjectRequest> onlyUploadId() {
191+
return uId -> WriteObjectRequest.newBuilder().setUploadId(uId).build();
192+
}
182193
}

0 commit comments

Comments
 (0)