Skip to content

Commit

Permalink
Add java-storage implementation for compose object methods. (#1084)
Browse files Browse the repository at this point in the history
* Add java-storage implementation for compose object methods.
  • Loading branch information
shilpi23pandey committed Dec 19, 2023
1 parent 831bc38 commit ca808bf
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -38,6 +40,7 @@
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobWriteSessionConfig;
import com.google.cloud.storage.BlobWriteSessionConfigs;
import com.google.cloud.storage.Bucket;
Expand All @@ -52,8 +55,10 @@
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketListOption;
import com.google.cloud.storage.Storage.ComposeRequest;
import com.google.cloud.storage.StorageClass;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
Expand All @@ -62,6 +67,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ClientInterceptor;
Expand All @@ -73,6 +79,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -487,6 +494,51 @@ public void close() {
}
}

/**
* See {@link GoogleCloudStorage#composeObjects(List, StorageResourceId, CreateObjectOptions)}}
* for details about expected behavior.
*/
@Override
public GoogleCloudStorageItemInfo composeObjects(
List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options)
throws IOException {
logger.atFiner().log("composeObjects(%s, %s, %s)", sources, destination, options);
for (StorageResourceId inputId : sources) {
if (!destination.getBucketName().equals(inputId.getBucketName())) {
throw new IOException(
String.format(
"Bucket doesn't match for source '%s' and destination '%s'!",
inputId, destination));
}
}
ComposeRequest request =
ComposeRequest.newBuilder()
.addSource(
sources.stream().map(StorageResourceId::getObjectName).collect(Collectors.toList()))
.setTarget(
BlobInfo.newBuilder(destination.getBucketName(), destination.getObjectName())
.setContentType(options.getContentType())
.setContentEncoding(options.getContentEncoding())
.setMetadata(encodeMetadata(options.getMetadata()))
.build())
.setTargetOptions(
BlobTargetOption.generationMatch(
destination.hasGenerationId()
? destination.getGenerationId()
: getWriteGeneration(destination, true)))
.build();

Blob composedBlob;
try {
composedBlob = storage.compose(request);
} catch (StorageException e) {
throw new IOException(e);
}
GoogleCloudStorageItemInfo compositeInfo = createItemInfoForBlob(destination, composedBlob);
logger.atFiner().log("composeObjects() done, returning: %s", compositeInfo);
return compositeInfo;
}

/**
* Gets the object generation for a write operation
*
Expand Down Expand Up @@ -609,6 +661,57 @@ private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecut
: ExecutorSupplier.useExecutor(pCUExecutorService);
}

/** Helper for converting a StorageResourceId + Blob into a GoogleCloudStorageItemInfo. */
private static GoogleCloudStorageItemInfo createItemInfoForBlob(
StorageResourceId resourceId, Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
checkArgument(
resourceId.isStorageObject(),
"resourceId must be a StorageObject. resourceId: %s",
resourceId);
checkArgument(
resourceId.getBucketName().equals(blob.getBucket()),
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'",
resourceId.getBucketName(),
blob.getBucket());
checkArgument(
resourceId.getObjectName().equals(blob.getName()),
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'",
resourceId.getObjectName(),
blob.getName());

Map<String, byte[]> decodedMetadata =
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata());

byte[] md5Hash = null;
byte[] crc32c = null;

if (!isNullOrEmpty(blob.getCrc32c())) {
crc32c = BaseEncoding.base64().decode(blob.getCrc32c());
}

if (!isNullOrEmpty(blob.getMd5())) {
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}

return GoogleCloudStorageItemInfo.createObject(
resourceId,
blob.getCreateTimeOffsetDateTime() == null
? 0
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null
? 0
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(),
blob.getContentType(),
blob.getContentEncoding(),
decodedMetadata,
blob.getGeneration() == null ? 0 : blob.getGeneration(),
blob.getMetageneration() == null ? 0 : blob.getMetageneration(),
new VerificationAttributes(md5Hash, crc32c));
}

public static Builder builder() {
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,10 +1086,12 @@ public void composeObject_withoutLimit() throws Exception {

gcs.compose(testBucket, ImmutableList.of(testDir + "f1", testDir + "f2"), testDir + "f3", null);

assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(
getRequestString(testBucket, testDir + "f3"),
composeRequestString(testBucket, testDir + "f3", /* generationId= */ 1));
if (isTracingSupported) {
assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(
getRequestString(testBucket, testDir + "f3"),
composeRequestString(testBucket, testDir + "f3", /* generationId= */ 1));
}

List<String> listedObjects = getObjectNames(gcs.listObjectInfo(testBucket, testDir));
assertThat(listedObjects).containsExactly(testDir + "f1", testDir + "f2", testDir + "f3");
Expand Down

0 comments on commit ca808bf

Please sign in to comment.