-
Notifications
You must be signed in to change notification settings - Fork 231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add java storage implementation for delete objects method #1079
Add java storage implementation for delete objects method #1079
Conversation
4eea78c
to
d2b9944
Compare
/gcbrun |
ec960d2
to
4f4a0d2
Compare
/gcbrun |
for (StorageResourceId object : fullObjectNames) { | ||
queueSingleObjectDelete(object, innerExceptions, executor, 0); | ||
} | ||
executor.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do this in finally block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public void onFailure(Throwable throwable) { | ||
innerExceptions.add( | ||
new IOException( | ||
String.format("Error deleting %s, stage 1", resourceId), throwable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stage1 does not make sense. Please make the explicit and say something like "getting object".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from the existing implementation.
hadoop-connectors/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java
Line 851 in 48891bf
String.format("Error deleting '%s', stage 1", resourceId), cause)); |
// version of the object. | ||
grpcManualBatchExecutor.queue( | ||
() -> | ||
storage.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On which thread we will be getting the callback? Is there any advantage of making this async. i.e. why not make this a blocking call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback will be called on the same thread. The callable(get request here) will be marked as complete only after the callback is executed https://github.com/GoogleCloudDataproc/hadoop-connectors/pull/1079/files#diff-c13e75f742b44b8dfd6bcfe5b7014bc938cf8fd87790f3359e7afbca76bfad21R76. The callback queues a delete request which can then be async.
queueSingleObjectDelete( | ||
resourceId, innerExceptions, grpcManualBatchExecutor, attempt + 1); | ||
} else { | ||
innerExceptions.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually expect the inner exceptions order to match the operations order. Is it guaranteed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inner exceptions will match the operations order.
gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java
Outdated
Show resolved
Hide resolved
* | ||
* <p>Instance of this class can not be used again after {@link #shutdown()} method has been called. | ||
*/ | ||
public class GrpcManualBatchExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us not make this public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private final ExecutorService requestsExecutor; | ||
|
||
private final Queue<Future<Void>> responseFutures = new ConcurrentLinkedQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why normal queue is not sufficient here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure thread safety, this collection will be shared across threads. We do have cases where a task running on a thread will try to queue another task(get object metadata will queue delete on success) so there may be cases where two threads will try to modify this collection.
/** Awaits until all sent requests are completed. Should be serialized */ | ||
private void awaitRequestsCompletion() throws IOException { | ||
while (!responseFutures.isEmpty()) { | ||
getFromFuture(responseFutures.remove()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to have catch block here? i.e. can getFromFuture throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getFromFuture
Line 736 in 48891bf
static <T> T getFromFuture(Future<T> future) throws IOException { |
@@ -60,6 +60,8 @@ public abstract class GoogleCloudStorageFileSystemNewIntegrationTestBase { | |||
|
|||
protected GoogleCloudStorageFileSystem gcsFs; | |||
|
|||
protected static boolean isTracingSupported = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment explaining why we needed this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* | ||
* <p>Instance of this class can not be used again after {@link #shutdown()} method has been called. | ||
*/ | ||
public class GrpcManualBatchExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us give this a more generic name. Let us get rid of "Grpc" and "Manual"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
4f4a0d2
to
9f427c3
Compare
/gcbrun |
9f427c3
to
5188dac
Compare
/gcbrun |
Storage gRPC API doesnt support batch requests so
BatchExecutor
emulates batching at the connector client to have parity with the HTTP implementation.