Skip to content

Commit

Permalink
Add java-storage implementation for createEmptyObjects implementaion. (
Browse files Browse the repository at this point in the history
…#1094)

* Add java-storage implementation for createEmptyObjects implementaion.

* Increase code coverage.
  • Loading branch information
shilpi23pandey committed Jan 9, 2024
1 parent d677c34 commit 4aafbba
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.EMPTY_OBJECT_CREATE_OPTIONS;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.sleeper;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.validateCopyArguments;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -28,6 +30,8 @@

import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.paging.Page;
import com.google.auth.Credentials;
import com.google.auto.value.AutoBuilder;
Expand Down Expand Up @@ -69,6 +73,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.BaseEncoding;
Expand All @@ -80,6 +85,7 @@
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -235,6 +241,211 @@ public void createBucket(String bucketName, CreateBucketOptions options) throws
}
}

/**
* See {@link GoogleCloudStorage#createEmptyObject(StorageResourceId)} for details about expected
* behavior.
*/
@Override
public void createEmptyObject(StorageResourceId resourceId) throws IOException {
logger.atFiner().log("createEmptyObject(%s)", resourceId);
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
createEmptyObject(resourceId, EMPTY_OBJECT_CREATE_OPTIONS);
}

/**
* See {@link GoogleCloudStorage#createEmptyObject(StorageResourceId, CreateObjectOptions)} for
* details about expected behavior.
*/
@Override
public void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);

try {
createEmptyObjectInternal(resourceId, options);
} catch (StorageException e) {
if (canIgnoreExceptionForEmptyObject(e, resourceId, options)) {
logger.atInfo().log(
"Ignoring exception of type %s; verified object already exists with desired state.",
e.getClass().getSimpleName());
logger.atFine().withCause(e).log("Ignored exception while creating empty object");
} else {
if (errorExtractor.getErrorType(e) == ErrorType.ALREADY_EXISTS) {
throw (FileAlreadyExistsException)
new FileAlreadyExistsException(
String.format("Object '%s' already exists.", resourceId))
.initCause(e);
}
throw new IOException(e);
}
}
}

/**
* See {@link GoogleCloudStorage#createEmptyObjects(List)} for details about expected behavior.
*/
@Override
public void createEmptyObjects(List<StorageResourceId> resourceIds) throws IOException {
createEmptyObjects(resourceIds, EMPTY_OBJECT_CREATE_OPTIONS);
}

/**
* See {@link GoogleCloudStorage#createEmptyObjects(List, CreateObjectOptions)} for details about
* expected behavior.
*/
@Override
public void createEmptyObjects(List<StorageResourceId> resourceIds, CreateObjectOptions options)
throws IOException {
logger.atFiner().log("createEmptyObjects(%s)", resourceIds);
if (resourceIds.isEmpty()) {
return;
}

// Don't go through batch interface for a single-item case to avoid batching overhead.
if (resourceIds.size() == 1) {
createEmptyObject(Iterables.getOnlyElement(resourceIds), options);
return;
}

// Validate that all the elements represent StorageObjects.
for (StorageResourceId resourceId : resourceIds) {
checkArgument(
resourceId.isStorageObject(),
"Expected full StorageObject names only, got: '%s'",
resourceId);
}

// Gather exceptions to wrap in a composite exception at the end.
Set<IOException> innerExceptions = newConcurrentHashSet();
BatchExecutor executor = new BatchExecutor(storageOptions.getBatchThreads());

try {
for (StorageResourceId resourceId : resourceIds) {
executor.queue(
() -> {
try {
createEmptyObjectInternal(resourceId, options);
logger.atFiner().log("Successfully inserted %s", resourceId);
} catch (StorageException se) {
boolean canIgnoreException = false;
try {
canIgnoreException = canIgnoreExceptionForEmptyObject(se, resourceId, options);
} catch (Exception e) {
// Make sure to catch Exception instead of only StorageException so that we can
// correctly wrap other such exceptions and propagate them out cleanly inside
// innerExceptions.
innerExceptions.add(
new IOException(
"Error re-fetching after rate-limit error: " + resourceId, e));
}
if (canIgnoreException) {
logger.atInfo().log(
"Ignoring exception of type %s; verified object already exists with desired"
+ " state.",
se.getClass().getSimpleName());
logger.atFine().withCause(se).log(
"Ignored exception while creating empty object");
} else {
innerExceptions.add(new IOException("Error inserting " + resourceId, se));
}
} catch (Exception e) {
innerExceptions.add(new IOException("Error inserting " + resourceId, e));
}
return null;
},
null);
}
} finally {
executor.shutdown();
}

if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}
}

private void createEmptyObjectInternal(
StorageResourceId resourceId, CreateObjectOptions createObjectOptions) {
Map<String, String> rewrittenMetadata = encodeMetadata(createObjectOptions.getMetadata());

List<BlobTargetOption> blobTargetOptions = new ArrayList<>();
blobTargetOptions.add(BlobTargetOption.disableGzipContent());
if (resourceId.hasGenerationId()) {
blobTargetOptions.add(BlobTargetOption.generationMatch(resourceId.getGenerationId()));
} else if (resourceId.isDirectory() || !createObjectOptions.isOverwriteExisting()) {
blobTargetOptions.add(BlobTargetOption.doesNotExist());
}

if (storageOptions.getEncryptionKey() != null) {
blobTargetOptions.add(
BlobTargetOption.encryptionKey(storageOptions.getEncryptionKey().value()));
}

storage.create(
BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName()))
.setMetadata(rewrittenMetadata)
.setContentEncoding(createObjectOptions.getContentEncoding())
.setContentType(createObjectOptions.getContentType())
.build(),
blobTargetOptions.toArray(BlobTargetOption[]::new));
}

/**
* Helper to check whether an empty object already exists with the expected metadata specified in
* {@code options}, to be used to determine whether it's safe to ignore an exception that was
* thrown when trying to create the object, {@code exceptionOnCreate}.
*/
private boolean canIgnoreExceptionForEmptyObject(
StorageException exceptionOnCreate, StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
ErrorType errorType = errorExtractor.getErrorType(exceptionOnCreate);
if (errorType == ErrorType.RESOURCE_EXHAUSTED
|| errorType == ErrorType.INTERNAL
|| (resourceId.isDirectory() && errorType == ErrorType.FAILED_PRECONDITION)) {
GoogleCloudStorageItemInfo existingInfo;
Duration maxWaitTime = storageOptions.getMaxWaitTimeForEmptyObjectCreation();

BackOff backOff =
!maxWaitTime.isZero() && !maxWaitTime.isNegative()
? new ExponentialBackOff.Builder()
.setMaxElapsedTimeMillis(toIntExact(maxWaitTime.toMillis()))
.setMaxIntervalMillis(500)
.setInitialIntervalMillis(100)
.setMultiplier(1.5)
.setRandomizationFactor(0.15)
.build()
: BackOff.STOP_BACKOFF;
long nextSleep = 0L;
do {
if (nextSleep > 0) {
try {
sleeper.sleep(nextSleep);
} catch (InterruptedException e) {
// We caught an InterruptedException, we should set the interrupted bit on this thread.
Thread.currentThread().interrupt();
nextSleep = BackOff.STOP;
}
}
existingInfo = getItemInfo(resourceId);
nextSleep = nextSleep == BackOff.STOP ? BackOff.STOP : backOff.nextBackOffMillis();
} while (!existingInfo.exists() && nextSleep != BackOff.STOP);

// Compare existence, size, and metadata; for 429 errors creating an empty object,
// we don't care about metaGeneration/contentGeneration as long as the metadata
// matches, since we don't know for sure whether our low-level request succeeded
// first or some other client succeeded first.
if (existingInfo.exists() && existingInfo.getSize() == 0) {
if (options.isEnsureEmptyObjectsMetadataMatch()) {
return existingInfo.metadataEquals(options.getMetadata());
}
return true;
}
}
return false;
}

/**
* See {@link GoogleCloudStorage#copy(String, List, String, List)} for details about expected
* behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class GoogleCloudStorageImpl implements GoogleCloudStorage {

private static final String USER_PROJECT_FIELD_NAME = "userProject";

private static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS =
static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS =
CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
.setEnsureEmptyObjectsMetadataMatch(false)
.build();
Expand Down Expand Up @@ -252,7 +252,7 @@ public Boolean load(String bucketName) {
private final GoogleCloudStorageOptions storageOptions;

// Object to use to perform sleep operations
private final Sleeper sleeper = Sleeper.DEFAULT;
static final Sleeper sleeper = Sleeper.DEFAULT;

// BackOff objects are per-request, use this to make new ones.
private final BackOffFactory backOffFactory = BackOffFactory.DEFAULT;
Expand Down
Loading

0 comments on commit 4aafbba

Please sign in to comment.