Skip to content

Commit

Permalink
Add java-storage implementation for get buckets and object methods. (#…
Browse files Browse the repository at this point in the history
…1089)

* Add java-storage implementation for get buckets and object methods.

* Address review comments.

* Sort blob field constants.
  • Loading branch information
shilpi23pandey committed Jan 7, 2024
1 parent 5396469 commit 000e849
Show file tree
Hide file tree
Showing 11 changed files with 636 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.ClientType;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.MethodOutcome;
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
Expand Down Expand Up @@ -1650,7 +1651,14 @@ public void testInvalidCredentialsFromAccessTokenProvider() throws Exception {

IOException thrown = assertThrows(IOException.class, () -> ghfs.exists(new Path("gs://")));

assertThat(thrown).hasCauseThat().hasMessageThat().contains("Invalid Credentials");
if (storageClientType == ClientType.STORAGE_CLIENT) {
assertThat(thrown)
.hasCauseThat()
.hasMessageThat()
.contains("invalid authentication credentials");
} else {
assertThat(thrown).hasCauseThat().hasMessageThat().contains("Invalid Credentials");
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,22 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
// Error extractor to map APi exception to meaningful ErrorTypes.
private static final ErrorTypeExtractor errorExtractor = GrpcErrorTypeExtractor.INSTANCE;

// Blob field that are used in GoogleCloudStorageItemInfo.
static final List<BlobField> BLOB_FIELDS =
ImmutableList.of(
BlobField.BUCKET,
BlobField.CONTENT_ENCODING,
BlobField.CONTENT_TYPE,
BlobField.CRC32C,
BlobField.GENERATION,
BlobField.METADATA,
BlobField.MD5HASH,
BlobField.METAGENERATION,
BlobField.NAME,
BlobField.SIZE,
BlobField.TIME_CREATED,
BlobField.UPDATED);

// Thread-pool used for background tasks.
private ExecutorService backgroundTasksThreadPool =
Executors.newCachedThreadPool(
Expand Down Expand Up @@ -585,6 +601,202 @@ public void deleteBuckets(List<String> bucketNames) throws IOException {
}
}

/** See {@link GoogleCloudStorage#getItemInfos(List)} for details about expected behavior. */
@Override
public List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceIds)
throws IOException {
logger.atFiner().log("getItemInfos(%s)", resourceIds);

if (resourceIds.isEmpty()) {
return new ArrayList<>();
}

Map<StorageResourceId, GoogleCloudStorageItemInfo> itemInfos =
new ConcurrentHashMap<>(resourceIds.size());
Set<IOException> innerExceptions = newConcurrentHashSet();
BatchExecutor executor = new BatchExecutor(storageOptions.getBatchThreads());
// For each resourceId, we'll either directly add ROOT_INFO, enqueue a Bucket fetch request,
// or enqueue a StorageObject fetch request.
try {
for (StorageResourceId resourceId : resourceIds) {
if (resourceId.isRoot()) {
itemInfos.put(resourceId, GoogleCloudStorageItemInfo.ROOT_INFO);
} else if (resourceId.isBucket()) {
executor.queue(
() -> getBucket(resourceId.getBucketName()),
getBucketCallback(resourceId, innerExceptions, itemInfos));

} else if (resourceId.isStorageObject()) {
executor.queue(
() -> getBlob(resourceId), getBlobCallback(resourceId, innerExceptions, itemInfos));
}
}
} finally {
executor.shutdown();
}
if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}

// Assemble the return list in the same order as the input arguments.
List<GoogleCloudStorageItemInfo> sortedItemInfos = new ArrayList<>();
for (StorageResourceId resourceId : resourceIds) {
checkState(
itemInfos.containsKey(resourceId),
"Somehow missing resourceId '%s' from map: %s",
resourceId,
itemInfos);
sortedItemInfos.add(itemInfos.get(resourceId));
}

// We expect the return list to be the same size, even if some entries were "not found".
checkState(
sortedItemInfos.size() == resourceIds.size(),
"sortedItemInfos.size() (%s) != resourceIds.size() (%s). infos: %s, ids: %s",
sortedItemInfos.size(),
resourceIds.size(),
sortedItemInfos,
resourceIds);
return sortedItemInfos;
}

private FutureCallback<Bucket> getBucketCallback(
StorageResourceId resourceId,
Set<IOException> innerExceptions,
Map<StorageResourceId, GoogleCloudStorageItemInfo> itemInfos) {
return new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Bucket bucket) {
if (bucket != null) {
logger.atFiner().log(
"getItemInfos: Successfully fetched bucket: %s for resourceId: %s",
bucket, resourceId);
itemInfos.put(resourceId, createItemInfoForBucket(resourceId, bucket));
} else {
logger.atFiner().log("getItemInfos: bucket '%s' not found", resourceId.getBucketName());
itemInfos.put(resourceId, GoogleCloudStorageItemInfo.createNotFound(resourceId));
}
}

@Override
public void onFailure(Throwable throwable) {
innerExceptions.add(
new IOException(
String.format("Error getting %s bucket", resourceId.getBucketName()), throwable));
}
};
}

private FutureCallback<Blob> getBlobCallback(
StorageResourceId resourceId,
Set<IOException> innerExceptions,
Map<StorageResourceId, GoogleCloudStorageItemInfo> itemInfos) {

return new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Blob blob) {
if (blob != null) {
logger.atFiner().log(
"getItemInfos: Successfully fetched object '%s' for resourceId '%s'",
blob, resourceId);
itemInfos.put(resourceId, createItemInfoForBlob(resourceId, blob));
} else {
logger.atFiner().log("getItemInfos: object '%s' not found", resourceId);
itemInfos.put(resourceId, GoogleCloudStorageItemInfo.createNotFound(resourceId));
}
}

@Override
public void onFailure(Throwable throwable) {
innerExceptions.add(
new IOException(String.format("Error getting %s object", resourceId), throwable));
}
};
}

/**
* See {@link GoogleCloudStorage#getItemInfo(StorageResourceId)} for details about expected
* behavior.
*/
@Override
public GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException {
logger.atFiner().log("getItemInfo(%s)", resourceId);

// Handle ROOT case first.
if (resourceId.isRoot()) {
return GoogleCloudStorageItemInfo.ROOT_INFO;
}
GoogleCloudStorageItemInfo itemInfo = null;

if (resourceId.isBucket()) {
Bucket bucket = getBucket(resourceId.getBucketName());
if (bucket != null) {
itemInfo = createItemInfoForBucket(resourceId, bucket);
} else {
logger.atFiner().log("getBucket(%s): not found", resourceId.getBucketName());
}
} else {
Blob blob = getBlob(resourceId);
if (blob != null) {
itemInfo = createItemInfoForBlob(resourceId, blob);
} else {
logger.atFiner().log("getObject(%s): not found", resourceId);
}
}

if (itemInfo == null) {
itemInfo = GoogleCloudStorageItemInfo.createNotFound(resourceId);
}
logger.atFiner().log("getItemInfo: %s", itemInfo);
return itemInfo;
}

/**
* Gets the bucket with the given name.
*
* @param bucketName name of the bucket to get
* @return the bucket with the given name or null if bucket not found
* @throws IOException if the bucket exists but cannot be accessed
*/
@Nullable
private Bucket getBucket(String bucketName) throws IOException {
logger.atFiner().log("getBucket(%s)", bucketName);
checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty");
try {
return storage.get(bucketName);
} catch (StorageException e) {
if (errorExtractor.getErrorType(e) == ErrorType.NOT_FOUND) {
return null;
}
throw new IOException("Error accessing Bucket " + bucketName, e);
}
}

/**
* Gets the object with the given resourceId.
*
* @param resourceId identifies a StorageObject
* @return the object with the given name or null if object not found
* @throws IOException if the object exists but cannot be accessed
*/
@Nullable
Blob getBlob(StorageResourceId resourceId) throws IOException {
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
String bucketName = resourceId.getBucketName();
String objectName = resourceId.getObjectName();
Blob blob;
try {
blob =
storage.get(
BlobId.of(bucketName, objectName),
BlobGetOption.fields(BLOB_FIELDS.toArray(new BlobField[0])));
} catch (StorageException e) {
throw new IOException("Error accessing " + resourceId, e);
}
return blob;
}

@Override
public SeekableByteChannel open(
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.storage.v2.CreateBucketRequest;
import com.google.storage.v2.DeleteBucketRequest;
import com.google.storage.v2.DeleteObjectRequest;
import com.google.storage.v2.GetBucketRequest;
import com.google.storage.v2.GetObjectRequest;
import com.google.storage.v2.ListBucketsRequest;
import com.google.storage.v2.ListBucketsResponse;
Expand Down Expand Up @@ -145,6 +146,26 @@ public void listBuckets(
}
}

@Override
public void getBucket(GetBucketRequest request, StreamObserver<Bucket> responseObserver) {
java.lang.Object response = responses.poll();
if (response instanceof Bucket) {
requests.add(request);
responseObserver.onNext(((Bucket) response));
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError(((Exception) response));
} else {
responseObserver.onError(
new IllegalArgumentException(
String.format(
"Unrecognized response type %s for method GetBucket, expected %s or %s",
response == null ? "null" : response.getClass().getName(),
Bucket.class.getName(),
Exception.class.getName())));
}
}

@Override
public void deleteObject(DeleteObjectRequest request, StreamObserver<Empty> responseObserver) {
java.lang.Object response = responses.poll();
Expand Down
Loading

0 comments on commit 000e849

Please sign in to comment.