Skip to content

Commit

Permalink
Add java storage implementation for delete objects method (#1079)
Browse files Browse the repository at this point in the history
* Add java-storage implementation for delete objects API.

* Use a direct executor for 0 thread count configuration.
  • Loading branch information
shilpi23pandey committed Dec 11, 2023
1 parent 5eec76a commit 84652b2
Show file tree
Hide file tree
Showing 6 changed files with 541 additions and 170 deletions.
125 changes: 125 additions & 0 deletions gcsio/src/main/java/com/google/cloud/hadoop/gcsio/BatchExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl.getFromFuture;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;

import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* BatchExecutor provides a means to manually batch requests using a thread pool. Execution is
* performed by the underlying {@link #requestsExecutor} ExecutorService.
*
* <p>Expected usage is to create a new BatchExecutor instance per client operation that represents
* logical grouping of requests(delete, copy, get).
*
* <p>Instance of this class can not be used again after {@link #shutdown()} method has been called.
*/
class BatchExecutor {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final ExecutorService requestsExecutor;

private final Queue<Future<Void>> responseFutures = new ConcurrentLinkedQueue<>();

public BatchExecutor(int numThreads) {
this.requestsExecutor =
numThreads == 0 ? newDirectExecutorService() : newRequestExecutor(numThreads);
}

private static ExecutorService newRequestExecutor(int numThreads) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
/* corePoolSize= */ numThreads,
/* maximumPoolSize= */ numThreads,
/* keepAliveTime= */ 10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(numThreads * 20),
new ThreadFactoryBuilder()
.setNameFormat("gcs-grpc-manual-batching-pool-%d")
.setDaemon(true)
.build());
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

/** Adds a task to the execution queue. */
public <T> void queue(Callable<T> task, FutureCallback<T> callback) {
checkState(
!requestsExecutor.isShutdown() && !requestsExecutor.isTerminated(),
"requestExecutor should not be terminated to queue request");

responseFutures.add(
requestsExecutor.submit(
() -> {
execute(task, callback);
return null;
}));
}

private static <T> void execute(Callable<T> task, FutureCallback<T> callback) {
try {
T result = task.call();
callback.onSuccess(result);
} catch (Throwable throwable) {
callback.onFailure(throwable);
}
}

/** Awaits until all tasks are terminated and then shutdowns the executor. */
public void shutdown() throws IOException {
awaitRequestsCompletion();
try {
checkState(responseFutures.isEmpty(), "responseFutures should be empty after flush");
} finally {
requestsExecutor.shutdown();
try {
if (!requestsExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
logger.atWarning().log("Forcibly shutting down grpc manual batching thread pool.");
requestsExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atFine().withCause(e).log(
"Failed to await termination: forcibly shutting down grpc manual batching thread pool.");
requestsExecutor.shutdownNow();
}
}
}

/** Awaits until all sent requests are completed. Should be serialized */
private void awaitRequestsCompletion() throws IOException {
while (!responseFutures.isEmpty()) {
getFromFuture(responseFutures.remove());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.ErrorTypeExtractor;
import com.google.cloud.hadoop.util.ErrorTypeExtractor.ErrorType;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobWriteSessionConfig;
import com.google.cloud.storage.BlobWriteSessionConfigs;
import com.google.cloud.storage.Bucket;
Expand All @@ -45,6 +48,9 @@
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketListOption;
import com.google.cloud.storage.StorageClass;
Expand All @@ -55,6 +61,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ClientInterceptor;
import java.io.IOException;
Expand All @@ -65,6 +72,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand All @@ -78,8 +87,12 @@
*/
@VisibleForTesting
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

// Maximum number of times to retry deletes in the case of precondition failures.
private static final int MAXIMUM_PRECONDITION_FAILURES_IN_DELETE = 4;

private final GoogleCloudStorageOptions storageOptions;
private final Storage storage;

Expand Down Expand Up @@ -261,6 +274,136 @@ private static GoogleCloudStorageItemInfo createItemInfoForBucket(
bucket.getStorageClass().name());
}

/** See {@link GoogleCloudStorage#deleteObjects(List)} for details about the expected behavior. */
@Override
public void deleteObjects(List<StorageResourceId> fullObjectNames) throws IOException {
logger.atFiner().log("deleteObjects(%s)", fullObjectNames);

if (fullObjectNames.isEmpty()) {
return;
}

// Validate that all the elements represent StorageObjects.
for (StorageResourceId fullObjectName : fullObjectNames) {
checkArgument(
fullObjectName.isStorageObject(),
"Expected full StorageObject names only, got: %s",
fullObjectName);
}

// Gather exceptions to wrap in a composite exception at the end.
ConcurrentHashMap.KeySetView<IOException, Boolean> innerExceptions =
ConcurrentHashMap.newKeySet();

BatchExecutor executor = new BatchExecutor(storageOptions.getBatchThreads());

try {
for (StorageResourceId object : fullObjectNames) {
queueSingleObjectDelete(object, innerExceptions, executor, 0);
}
} finally {
executor.shutdown();
}

if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}
}

private void queueSingleObjectDelete(
StorageResourceId resourceId,
KeySetView<IOException, Boolean> innerExceptions,
BatchExecutor batchExecutor,
int attempt) {
String bucketName = resourceId.getBucketName();
String objectName = resourceId.getObjectName();
if (resourceId.hasGenerationId()) {
batchExecutor.queue(
() ->
storage.delete(
BlobId.of(bucketName, objectName),
BlobSourceOption.generationMatch(resourceId.getGenerationId())),
getObjectDeletionCallback(
resourceId, innerExceptions, batchExecutor, attempt, resourceId.getGenerationId()));

} else {
// We first need to get the current object version to issue a safe delete for only the latest
// version of the object.
batchExecutor.queue(
() ->
storage.get(
BlobId.of(bucketName, objectName), BlobGetOption.fields(BlobField.GENERATION)),
new FutureCallback<>() {
@Override
public void onSuccess(Blob blob) {
if (blob == null) {
// Denotes that the item cannot be found.
// If the item isn't found, treat it the same as if it's not found
// in the delete case: assume the user wanted the object gone, and now it is.
logger.atFiner().log("deleteObjects(%s): get not found.", resourceId);
return;
}
long generation = checkNotNull(blob.getGeneration(), "generation can not be null");
batchExecutor.queue(
() ->
storage.delete(
BlobId.of(bucketName, objectName),
BlobSourceOption.generationMatch(generation)),
getObjectDeletionCallback(
resourceId, innerExceptions, batchExecutor, attempt, generation));
}

@Override
public void onFailure(Throwable throwable) {
innerExceptions.add(
new IOException(
String.format("Error deleting %s, stage 1", resourceId), throwable));
}
});
}
}

private FutureCallback<Boolean> getObjectDeletionCallback(
StorageResourceId resourceId,
ConcurrentHashMap.KeySetView<IOException, Boolean> innerExceptions,
BatchExecutor batchExecutor,
int attempt,
long generation) {
return new FutureCallback<>() {
@Override
public void onSuccess(Boolean result) {
if (!result) {
// Ignore item-not-found scenario. We do not have to delete what we cannot find.
// This situation typically shows up when we make a request to delete something and the
// server receives the request, but we get a retry-able error before we get a response.
// During a retry, we no longer find the item because the server had deleted it already.
logger.atFiner().log("Delete object %s not found.", resourceId);
} else {
logger.atFiner().log("Successfully deleted %s at generation %s", resourceId, generation);
}
}

@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof Exception
&& errorExtractor.getErrorType((Exception) throwable) == ErrorType.FAILED_PRECONDITION
&& attempt <= MAXIMUM_PRECONDITION_FAILURES_IN_DELETE) {
logger.atInfo().log(
"Precondition not met while deleting '%s' at generation %s. Attempt %s."
+ " Retrying:%s",
resourceId, generation, attempt, throwable);
queueSingleObjectDelete(resourceId, innerExceptions, batchExecutor, attempt + 1);
} else {
innerExceptions.add(
new IOException(
String.format(
"Error deleting '%s', stage 2 with generation %s", resourceId, generation),
throwable));
}
}
};
}

@Override
public SeekableByteChannel open(
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.common.util.concurrent.FutureCallback;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link BatchExecutor}. */
@RunWith(JUnit4.class)
public class BatchExecutorTest {

private BatchExecutor batchExecutor;

@Before
public void setUp() {
batchExecutor = new BatchExecutor(10);
}

@Test
public void queue_succeeds() {
batchExecutor.queue(() -> true, /* callback*/ assertCallBack());
}

@Test
public void queue_throwsException_afterShutdownCalled() throws IOException {
batchExecutor.shutdown();

IllegalStateException e =
assertThrows(IllegalStateException.class, () -> batchExecutor.queue(() -> null, null));

assertThat(e)
.hasMessageThat()
.startsWith("requestExecutor should not be terminated to queue request");
}

private FutureCallback<Boolean> assertCallBack() {
return new FutureCallback<>() {
@Override
public void onSuccess(Boolean result) {
assertThat(result).isEqualTo(true);
}

@Override
public void onFailure(Throwable throwable) {}
};
}
}
Loading

0 comments on commit 84652b2

Please sign in to comment.