Skip to content

Commit

Permalink
Add java-storage implementation for update items method. (#1090)
Browse files Browse the repository at this point in the history
  • Loading branch information
shilpi23pandey committed Dec 20, 2023
1 parent ca808bf commit d83b07b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.lang.Math.toIntExact;

import com.google.api.client.http.HttpRequestInitializer;
Expand Down Expand Up @@ -66,6 +66,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.FutureCallback;
Expand All @@ -80,6 +81,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -115,6 +117,10 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
.setDaemon(true)
.build());

private static String encodeMetadataValues(byte[] bytes) {
return bytes == null ? null : BaseEncoding.base64().encode(bytes);
}

/**
* Having an instance of gscImpl to redirect calls to Json client while new client implementation
* is in WIP.
Expand Down Expand Up @@ -494,6 +500,98 @@ public void close() {
}
}

@Override
public List<GoogleCloudStorageItemInfo> updateItems(List<UpdatableItemInfo> itemInfoList)
throws IOException {
logger.atFiner().log("updateItems(%s)", itemInfoList);

if (itemInfoList.isEmpty()) {
return new ArrayList<>();
}

for (UpdatableItemInfo itemInfo : itemInfoList) {
checkArgument(
!itemInfo.getStorageResourceId().isBucket() && !itemInfo.getStorageResourceId().isRoot(),
"Buckets and GCS Root resources are not supported for updateItems");
}

Map<StorageResourceId, GoogleCloudStorageItemInfo> resultItemInfos = new ConcurrentHashMap<>();
Set<IOException> innerExceptions = newConcurrentHashSet();
BatchExecutor executor = new BatchExecutor(storageOptions.getBatchThreads());

try {
for (UpdatableItemInfo itemInfo : itemInfoList) {
StorageResourceId resourceId = itemInfo.getStorageResourceId();
String bucketName = resourceId.getBucketName();
String blobName = resourceId.getObjectName();

Map<String, byte[]> originalMetadata = itemInfo.getMetadata();
Map<String, String> rewrittenMetadata = encodeMetadata(originalMetadata);

BlobInfo blobUpdate =
BlobInfo.newBuilder(bucketName, blobName).setMetadata(rewrittenMetadata).build();

executor.queue(
() -> storage.update(blobUpdate),
new FutureCallback<>() {
@Override
public void onSuccess(Blob blob) {
logger.atFiner().log(
"updateItems: Successfully updated object '%s' for resourceId '%s'",
blob, resourceId);
resultItemInfos.put(resourceId, createItemInfoForBlob(resourceId, blob));
}

@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof Exception
&& errorExtractor.getErrorType((Exception) throwable) == ErrorType.NOT_FOUND) {
logger.atFiner().log(
"updateItems: object not found %s: %s", resourceId, throwable);
resultItemInfos.put(
resourceId, GoogleCloudStorageItemInfo.createNotFound(resourceId));
} else {
innerExceptions.add(
new IOException(
String.format("Error updating '%s' object", resourceId), throwable));
}
}
});
}
} finally {
executor.shutdown();
}

if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}

// Assemble the return list in the same order as the input arguments.
List<GoogleCloudStorageItemInfo> sortedItemInfos = new ArrayList<>();
for (UpdatableItemInfo itemInfo : itemInfoList) {
checkState(
resultItemInfos.containsKey(itemInfo.getStorageResourceId()),
"Missing resourceId '%s' from map: %s",
itemInfo.getStorageResourceId(),
resultItemInfos);
sortedItemInfos.add(resultItemInfos.get(itemInfo.getStorageResourceId()));
}

// We expect the return list to be the same size, even if some entries were "not found".
checkState(
sortedItemInfos.size() == itemInfoList.size(),
"sortedItemInfos.size() (%s) != resourceIds.size() (%s). infos: %s, updateItemInfos: %s",
sortedItemInfos.size(),
itemInfoList.size(),
sortedItemInfos,
itemInfoList);
return sortedItemInfos;
}

private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
return Maps.transformValues(metadata, GoogleCloudStorageClientImpl::encodeMetadataValues);
}

/**
* See {@link GoogleCloudStorage#composeObjects(List, StorageResourceId, CreateObjectOptions)}}
* for details about expected behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,10 @@ public void updateItems_withoutLimits() throws Exception {
assertThat(getObjectNames(updatedObjects)).containsExactly(testDir + "f1");
assertThat(updatedObjects.get(0).getMetadata().keySet()).isEqualTo(updatedMetadata.keySet());

assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(postRequestString(testBucket, testDir + "f1"));
if (isTracingSupported) {
assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(postRequestString(testBucket, testDir + "f1"));
}
}

@Test
Expand All @@ -956,13 +958,15 @@ public void updateItems_withLimits_MultipleBatchGcsRequests() throws Exception {
.containsExactly(testDir + "f1", testDir + "f2", testDir + "f3");
assertThat(updatedObjects.get(0).getMetadata().keySet()).isEqualTo(updatedMetadata.keySet());

assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(
batchRequestString(),
postRequestString(testBucket, testDir + "f1"),
postRequestString(testBucket, testDir + "f2"),
batchRequestString(),
postRequestString(testBucket, testDir + "f3"));
if (isTracingSupported) {
assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(
batchRequestString(),
postRequestString(testBucket, testDir + "f1"),
postRequestString(testBucket, testDir + "f2"),
batchRequestString(),
postRequestString(testBucket, testDir + "f3"));
}
}

@Test
Expand Down

0 comments on commit d83b07b

Please sign in to comment.