-
Notifications
You must be signed in to change notification settings - Fork 231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parallel composite upload support #1067
Conversation
/gcbrun |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #1067 +/- ##
============================================
- Coverage 81.71% 81.68% -0.03%
- Complexity 1669 1675 +6
============================================
Files 96 96
Lines 7275 7318 +43
Branches 890 890
============================================
+ Hits 5945 5978 +33
- Misses 969 975 +6
- Partials 361 365 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
Outdated
Show resolved
Hide resolved
* alignment with configuration of java-storage client | ||
* https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_ | ||
*/ | ||
public static final HadoopConfigurationProperty<Integer> GCS_PCU_BUFFER_COUNT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update Configuration.md as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we have benchmarking numbers out then will be adding it to configurations file.
gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
Outdated
Show resolved
Hide resolved
@@ -588,6 +632,10 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con | |||
.setUploadType(GCS_CLIENT_UPLOAD_TYPE.get(config, config::getEnum)) | |||
.setTemporaryPaths( | |||
ImmutableSet.copyOf(GCS_WRITE_TEMPORARY_FILES_PATH.getStringCollection(config))) | |||
.setPCUBufferCapacity(GCS_PCU_BUFFER_COUNT.get(config, config::getInt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
Outdated
Show resolved
Hide resolved
...a/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java
Outdated
Show resolved
Hide resolved
...a/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java
Outdated
Show resolved
Hide resolved
...a/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java
Show resolved
Hide resolved
.setWriteChannelOptions( | ||
pcuDefaultOptions.toBuilder() | ||
.setPartFilePrefix(partFilePrefix) | ||
.setPartFileCleanupType(PartFileCleanupType.NEVER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ON_SUCCESS is not tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test for it.
gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java
Show resolved
Hide resolved
writeChannel.write(ByteBuffer.wrap(bytesToWrite)); | ||
// part files are getting uploaded in async thread | ||
// wait for it to complete before listing files | ||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
/gcbrun |
return PartCleanupStrategy.never(); | ||
case ON_SUCCESS: | ||
return PartCleanupStrategy.onlyOnSuccess(); | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://google.github.io/styleguide/javaguide.html#s4.8.4-switch
It is recommended to explicitly list the values, rather than having default. This is more future proof.
@@ -251,12 +257,43 @@ private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions | |||
writeOptions.getTemporaryPaths().stream() | |||
.map(x -> Paths.get(x)) | |||
.collect(ImmutableSet.toImmutableSet())); | |||
case PARALLEL_COMPOSITE_UPLOAD: | |||
ExecutorService pcuThreadPool = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit weird. If that is the case, why can't it create it?
@@ -251,12 +257,43 @@ private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions | |||
writeOptions.getTemporaryPaths().stream() | |||
.map(x -> Paths.get(x)) | |||
.collect(ImmutableSet.toImmutableSet())); | |||
case PARALLEL_COMPOSITE_UPLOAD: | |||
ExecutorService pcuThreadPool = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either case, add a comment. It is not obvious.
BufferAllocationStrategy.fixedPool( | ||
writeOptions.getPCUBufferCount(), writeOptions.getPCUBufferCapacity())) | ||
.withPartCleanupStrategy(getPartCleanupStrategy(writeOptions.getPartFileCleanupType())) | ||
.withExecutorSupplier(ExecutorSupplier.useExecutor(pcuThreadPool)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not go with the default option?
} | ||
|
||
private static final int PARALLEL_COMPOSITE_UPLOAD_BUFFER_COUNT = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment with TODO.
.build()) | ||
.build(); | ||
|
||
gcs = getGCSImpl(storageOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old test does not close does not mean that we keep on repeating the mistake, I guess.
private static GoogleCloudStorage helperGcs; | ||
|
||
private GoogleCloudStorage gcs; | ||
|
||
private int partFileCount = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
private static GoogleCloudStorage helperGcs; | ||
|
||
private GoogleCloudStorage gcs; | ||
|
||
private int partFileCount = 2; | ||
private int bufferCapacity = partFileCount * ONE_MiB; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
This PR adds the support for integrating with new write APIs introduced in java-storage to leverage ParallelCompositeUpload (PCU) feature.