Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add java storage implementation for delete objects method #1079

Conversation

shilpi23pandey
Copy link
Contributor

@shilpi23pandey shilpi23pandey commented Dec 4, 2023

Storage gRPC API doesnt support batch requests so BatchExecutor emulates batching at the connector client to have parity with the HTTP implementation.

@shilpi23pandey shilpi23pandey marked this pull request as draft December 4, 2023 18:52
@shilpi23pandey shilpi23pandey changed the title Move to java storage objects Dec 4, 2023
@arunkumarchacko
Copy link
Contributor

/gcbrun

@shilpi23pandey shilpi23pandey force-pushed the move-to-java-storage-objects branch 3 times, most recently from ec960d2 to 4f4a0d2 Compare December 6, 2023 06:14
@arunkumarchacko
Copy link
Contributor

/gcbrun

@shilpi23pandey shilpi23pandey marked this pull request as ready for review December 6, 2023 07:12
for (StorageResourceId object : fullObjectNames) {
queueSingleObjectDelete(object, innerExceptions, executor, 0);
}
executor.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do this in finally block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void onFailure(Throwable throwable) {
innerExceptions.add(
new IOException(
String.format("Error deleting %s, stage 1", resourceId), throwable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stage1 does not make sense. Please make the explicit and say something like "getting object".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the existing implementation.

String.format("Error deleting '%s', stage 1", resourceId), cause));
. Keeping this as is for the sake of parity.

// version of the object.
grpcManualBatchExecutor.queue(
() ->
storage.get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On which thread we will be getting the callback? Is there any advantage of making this async. i.e. why not make this a blocking call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback will be called on the same thread. The callable(get request here) will be marked as complete only after the callback is executed https://github.com/GoogleCloudDataproc/hadoop-connectors/pull/1079/files#diff-c13e75f742b44b8dfd6bcfe5b7014bc938cf8fd87790f3359e7afbca76bfad21R76. The callback queues a delete request which can then be async.

queueSingleObjectDelete(
resourceId, innerExceptions, grpcManualBatchExecutor, attempt + 1);
} else {
innerExceptions.add(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually expect the inner exceptions order to match the operations order. Is it guaranteed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner exceptions will match the operations order.

*
* <p>Instance of this class can not be used again after {@link #shutdown()} method has been called.
*/
public class GrpcManualBatchExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us not make this public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private final ExecutorService requestsExecutor;

private final Queue<Future<Void>> responseFutures = new ConcurrentLinkedQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why normal queue is not sufficient here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure thread safety, this collection will be shared across threads. We do have cases where a task running on a thread will try to queue another task(get object metadata will queue delete on success) so there may be cases where two threads will try to modify this collection.

/** Awaits until all sent requests are completed. Should be serialized */
private void awaitRequestsCompletion() throws IOException {
while (!responseFutures.isEmpty()) {
getFromFuture(responseFutures.remove());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to have catch block here? i.e. can getFromFuture throw exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFromFuture

static <T> T getFromFuture(Future<T> future) throws IOException {
handles the exception and wraps them in the IOException.

@@ -60,6 +60,8 @@ public abstract class GoogleCloudStorageFileSystemNewIntegrationTestBase {

protected GoogleCloudStorageFileSystem gcsFs;

protected static boolean isTracingSupported = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment explaining why we needed this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* <p>Instance of this class can not be used again after {@link #shutdown()} method has been called.
*/
public class GrpcManualBatchExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us give this a more generic name. Let us get rid of "Grpc" and "Manual"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@arunkumarchacko
Copy link
Contributor

/gcbrun

@shilpi23pandey
Copy link
Contributor Author

/gcbrun

@shilpi23pandey shilpi23pandey merged commit 84652b2 into GoogleCloudDataproc:master Dec 11, 2023
4 checks passed
@shilpi23pandey shilpi23pandey deleted the move-to-java-storage-objects branch December 11, 2023 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants