Skip to content

Commit

Permalink
parallel composite upload support (#1067)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhravidutt committed Oct 27, 2023
1 parent 48891bf commit e081924
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 19 deletions.
2 changes: 1 addition & 1 deletion gcs/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@

1. Upgrade Hadoop to 3.3.5.

1. Upgrade java-storage to 2.27.1
1. Upgrade java-storage to 2.28.0

1. Add support for `WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE` authentication type that retrieves a refresh token using workload identity federation configuraiton defined in: `fs.gs.auth.workload.identity.federation.credential.config.file`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType;
import com.google.cloud.hadoop.util.RedactedString;
Expand Down Expand Up @@ -470,7 +471,7 @@ public class GoogleHadoopFileSystemConfiguration {
* effective only if fs.gs.client.type is set to STORAGE_CLIENT.
*/
public static final HadoopConfigurationProperty<UploadType> GCS_CLIENT_UPLOAD_TYPE =
new HadoopConfigurationProperty<>("fs.gs.client.upload.type", UploadType.DEFAULT);
new HadoopConfigurationProperty<>("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD);

/**
* Configuration key to configure the Path where uploads will be parked on disk. If not set then
Expand All @@ -481,6 +482,49 @@ public class GoogleHadoopFileSystemConfiguration {
GCS_WRITE_TEMPORARY_FILES_PATH =
new HadoopConfigurationProperty<>("fs.gs.write.temporary.dirs", ImmutableSet.of());

/**
* Configuration key to configure the Buffers for UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in
* alignment with configuration of java-storage client
* https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_
*/
public static final HadoopConfigurationProperty<Integer> GCS_PCU_BUFFER_COUNT =
new HadoopConfigurationProperty<>(
"fs.gs.write.parallel.composite.upload.buffer.count",
AsyncWriteChannelOptions.DEFAULT.getPCUBufferCount());

/**
* Configuration key to configure the buffer capacity for UploadType.PARALLEL_COMPOSITE_UPLOAD. It
* is in alignment with configuration of java-storage client
* https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_
*/
public static final HadoopConfigurationProperty<Long> GCS_PCU_BUFFER_CAPACITY =
new HadoopConfigurationProperty<>(
"fs.gs.write.parallel.composite.upload.buffer.capacity",
(long) AsyncWriteChannelOptions.DEFAULT.getPCUBufferCapacity());

/**
* Configuration key to clean up strategy of part files created via
* UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage
* client
* https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy
*/
public static final HadoopConfigurationProperty<PartFileCleanupType>
GCS_PCU_PART_FILE_CLEANUP_TYPE =
new HadoopConfigurationProperty<>(
"fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
AsyncWriteChannelOptions.DEFAULT.getPartFileCleanupType());

/**
* Configuration key to set up the naming strategy of part files created via
* UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage
* client
* https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy
*/
public static final HadoopConfigurationProperty<String> GCS_PCU_PART_FILE_NAME_PREFIX =
new HadoopConfigurationProperty<>(
"fs.gs.write.parallel.composite.upload.part.file.name.prefix",
AsyncWriteChannelOptions.DEFAULT.getPartFileNamePrefix());

static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Configuration config) {
return GoogleCloudStorageFileSystemOptions.builder()
.setBucketDeleteEnabled(GCE_BUCKET_DELETE_ENABLE.get(config, config::getBoolean))
Expand Down Expand Up @@ -588,6 +632,10 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con
.setUploadType(GCS_CLIENT_UPLOAD_TYPE.get(config, config::getEnum))
.setTemporaryPaths(
ImmutableSet.copyOf(GCS_WRITE_TEMPORARY_FILES_PATH.getStringCollection(config)))
.setPCUBufferCount(GCS_PCU_BUFFER_COUNT.get(config, config::getInt))
.setPCUBufferCapacity(toIntExact(GCS_PCU_BUFFER_CAPACITY.get(config, config::getLongBytes)))
.setPartFileCleanupType(GCS_PCU_PART_FILE_CLEANUP_TYPE.get(config, config::getEnum))
.setPartFileNamePrefix(GCS_PCU_PART_FILE_NAME_PREFIX.get(config, config::get))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions.MetricsSink;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType;
import com.google.cloud.hadoop.util.RequesterPaysOptions.RequesterPaysMode;
Expand Down Expand Up @@ -113,8 +114,14 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.storage.service.path", "storage/v1/");
put("fs.gs.tracelog.enable", false);
put("fs.gs.working.dir", "/");
put("fs.gs.client.upload.type", UploadType.DEFAULT);
put("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD);
put("fs.gs.write.temporary.dirs", ImmutableSet.of());
put("fs.gs.write.parallel.composite.upload.buffer.count", 1);
put("fs.gs.write.parallel.composite.upload.buffer.capacity", 32 * 1024 * 1024L);
put(
"fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
PartFileCleanupType.ALWAYS);
put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", "");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.hadoop.util.AccessBoundary;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.ErrorTypeExtractor;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
import com.google.cloud.storage.BlobWriteSessionConfig;
import com.google.cloud.storage.BlobWriteSessionConfigs;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
Expand Down Expand Up @@ -87,6 +93,7 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
@Nullable HttpRequestInitializer httpRequestInitializer,
@Nullable ImmutableList<ClientInterceptor> gRPCInterceptors,
@Nullable Function<List<AccessBoundary>, String> downscopedAccessTokenFn,
@Nullable ExecutorService pCUExecutorService,
@Nullable GcsClientStatisticInterface gcsClientStatisticInterface)
throws IOException {
super(
Expand All @@ -102,7 +109,7 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
this.storageOptions = options;
this.storage =
clientLibraryStorage == null
? createStorage(credentials, options, gRPCInterceptors)
? createStorage(credentials, options, gRPCInterceptors, pCUExecutorService)
: clientLibraryStorage;
}

Expand Down Expand Up @@ -209,7 +216,8 @@ private long getWriteGeneration(StorageResourceId resourceId, boolean overwrite)
private static Storage createStorage(
Credentials credentials,
GoogleCloudStorageOptions storageOptions,
List<ClientInterceptor> interceptors)
List<ClientInterceptor> interceptors,
ExecutorService pCUExecutorService)
throws IOException {
return StorageOptions.grpc()
.setAttemptDirectPath(storageOptions.isDirectPathPreferred())
Expand All @@ -227,15 +235,20 @@ private static Storage createStorage(
return ImmutableList.copyOf(list);
})
.setCredentials(credentials != null ? credentials : NoCredentials.getInstance())
.setBlobWriteSessionConfig(getSessionConfig(storageOptions.getWriteChannelOptions()))
.setBlobWriteSessionConfig(
getSessionConfig(storageOptions.getWriteChannelOptions(), pCUExecutorService))
.build()
.getService();
}

private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions writeOptions)
private static BlobWriteSessionConfig getSessionConfig(
AsyncWriteChannelOptions writeOptions, ExecutorService pCUExecutorService)
throws IOException {
logger.atFiner().log("Upload strategy in use: %s", writeOptions.getUploadType());
switch (writeOptions.getUploadType()) {
case CHUNK_UPLOAD:
return BlobWriteSessionConfigs.getDefault()
.withChunkSize(writeOptions.getUploadChunkSize());
case WRITE_TO_DISK_THEN_UPLOAD:
if (writeOptions.getTemporaryPaths() == null
|| writeOptions.getTemporaryPaths().isEmpty()) {
Expand All @@ -255,12 +268,47 @@ private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions
writeOptions.getTemporaryPaths().stream()
.map(x -> Paths.get(x))
.collect(ImmutableSet.toImmutableSet()));
case PARALLEL_COMPOSITE_UPLOAD:
return BlobWriteSessionConfigs.parallelCompositeUpload()
.withBufferAllocationStrategy(
BufferAllocationStrategy.fixedPool(
writeOptions.getPCUBufferCount(), writeOptions.getPCUBufferCapacity()))
.withPartCleanupStrategy(getPartCleanupStrategy(writeOptions.getPartFileCleanupType()))
.withExecutorSupplier(getPCUExecutorSupplier(pCUExecutorService))
.withPartNamingStrategy(getPartNamingStrategy(writeOptions.getPartFileNamePrefix()));
default:
return BlobWriteSessionConfigs.getDefault()
.withChunkSize(writeOptions.getUploadChunkSize());
throw new IllegalArgumentException(
String.format("Upload type:%s is not supported.", writeOptions.getUploadType()));
}
}

private static PartCleanupStrategy getPartCleanupStrategy(PartFileCleanupType cleanupType) {
switch (cleanupType) {
case NEVER:
return PartCleanupStrategy.never();
case ON_SUCCESS:
return PartCleanupStrategy.onlyOnSuccess();
case ALWAYS:
return PartCleanupStrategy.always();
default:
throw new IllegalArgumentException(
String.format("Cleanup type:%s is not handled.", cleanupType));
}
}

private static PartNamingStrategy getPartNamingStrategy(String partFilePrefix) {
if (Strings.isNullOrEmpty(partFilePrefix)) {
return PartNamingStrategy.noPrefix();
}
return PartNamingStrategy.prefix(partFilePrefix);
}

private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecutorService) {
return pCUExecutorService == null
? ExecutorSupplier.cachedPool()
: ExecutorSupplier.useExecutor(pCUExecutorService);
}

public static Builder builder() {
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder();
}
Expand Down Expand Up @@ -290,6 +338,9 @@ public abstract Builder setGcsClientStatisticInterface(
@VisibleForTesting
public abstract Builder setClientLibraryStorage(@Nullable Storage clientLibraryStorage);

@VisibleForTesting
public abstract Builder setPCUExecutorService(@Nullable ExecutorService pCUExecutorService);

public abstract GoogleCloudStorageClientImpl build() throws IOException;
}
}
Loading

0 comments on commit e081924

Please sign in to comment.