Skip to content

Commit

Permalink
Add java-storage implementation for copy objects method. (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
shilpi23pandey committed Dec 20, 2023
1 parent d83b07b commit c208df4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,19 @@ public <T> void queue(Callable<T> task, FutureCallback<T> callback) {
}));
}

private static <T> void execute(Callable<T> task, FutureCallback<T> callback) {
private static <T> void execute(Callable<T> task, FutureCallback<T> callback) throws Exception {
try {
T result = task.call();
callback.onSuccess(result);
if (callback != null) {
callback.onSuccess(result);
}
} catch (Throwable throwable) {
callback.onFailure(throwable);
if (callback != null) {
callback.onFailure(throwable);
} else {
// Re-throw the exception.
throw throwable;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.validateCopyArguments;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -47,6 +49,7 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.BucketInfo.LifecycleRule.LifecycleAction;
import com.google.cloud.storage.BucketInfo.LifecycleRule.LifecycleCondition;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
Expand All @@ -59,6 +62,7 @@
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketListOption;
import com.google.cloud.storage.Storage.ComposeRequest;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageClass;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
Expand Down Expand Up @@ -215,6 +219,104 @@ public void createBucket(String bucketName, CreateBucketOptions options) throws
}
}

/**
* See {@link GoogleCloudStorage#copy(String, List, String, List)} for details about expected
* behavior.
*/
@Override
public void copy(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap)
throws IOException {

validateCopyArguments(sourceToDestinationObjectsMap, this);

if (sourceToDestinationObjectsMap.isEmpty()) {
return;
}

// Gather FileNotFoundExceptions for individual objects,
// but only throw a single combined exception at the end.
ConcurrentHashMap.KeySetView<IOException, Boolean> innerExceptions =
ConcurrentHashMap.newKeySet();

BatchExecutor executor = new BatchExecutor(storageOptions.getBatchThreads());

try {
for (Map.Entry<StorageResourceId, StorageResourceId> entry :
sourceToDestinationObjectsMap.entrySet()) {
StorageResourceId srcObject = entry.getKey();
StorageResourceId dstObject = entry.getValue();
copyInternal(
executor,
innerExceptions,
srcObject.getBucketName(),
srcObject.getObjectName(),
dstObject.getGenerationId(),
dstObject.getBucketName(),
dstObject.getObjectName());
}
} finally {
executor.shutdown();
}

if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}
}

private void copyInternal(
BatchExecutor executor,
ConcurrentHashMap.KeySetView<IOException, Boolean> innerExceptions,
String srcBucketName,
String srcObjectName,
long dstContentGeneration,
String dstBucketName,
String dstObjectName) {
CopyRequest.Builder copyRequestBuilder =
CopyRequest.newBuilder().setSource(BlobId.of(srcBucketName, srcObjectName));
if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
copyRequestBuilder.setTarget(
BlobId.of(dstBucketName, dstObjectName),
BlobTargetOption.generationMatch(dstContentGeneration));
} else {
copyRequestBuilder.setTarget(BlobId.of(dstBucketName, dstObjectName));
}

if (storageOptions.getMaxRewriteChunkSize() > 0) {
copyRequestBuilder.setMegabytesCopiedPerChunk(
// Convert raw byte size into Mib.
storageOptions.getMaxRewriteChunkSize() / (1024 * 1024));
}
executor.queue(
() -> {
try {
String srcString = StringPaths.fromComponents(srcBucketName, srcObjectName);
String dstString = StringPaths.fromComponents(dstBucketName, dstObjectName);

CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
while (!copyWriter.isDone()) {
copyWriter.copyChunk();
logger.atFinest().log(
"Copy (%s to %s) did not complete. Resuming...", srcString, dstString);
}
logger.atFiner().log("Successfully copied %s to %s", srcString, dstString);
} catch (StorageException e) {
if (errorExtractor.getErrorType(e) == ErrorType.NOT_FOUND) {
innerExceptions.add(
createFileNotFoundException(srcBucketName, srcObjectName, new IOException(e)));
} else {
innerExceptions.add(
new IOException(
String.format(
"Error copying '%s'",
StringPaths.fromComponents(srcBucketName, srcObjectName)),
e));
}
}
return null;
},
null);
}

/** See {@link GoogleCloudStorage#listBucketNames()} for details about expected behavior. */
@Override
public List<String> listBucketNames() throws IOException {
Expand Down

0 comments on commit c208df4

Please sign in to comment.