Skip to content

Commit

Permalink
Use rename LRO on HN buckets (#1140) (#1192)
Browse files Browse the repository at this point in the history
* Use rename LRO on HN buckets (#1140)

* Disable Cloud build for test
  • Loading branch information
arunkumarchacko committed Jun 21, 2024
1 parent ac86720 commit fc94f28
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ steps:
- 'VCS_COMMIT_ID=$COMMIT_SHA'
- 'VCS_TAG=$TAG_NAME'
- 'CI_BUILD_ID=$BUILD_ID'
- 'GCS_TEST_DIRECT_PATH_PREFERRED=true'
- 'GCS_TEST_DIRECT_PATH_PREFERRED=false'

# Tests take on average 25 minutes to run
timeout: 2400s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public class GoogleHadoopFileSystemConfiguration {
public static final HadoopConfigurationProperty<Long> BLOCK_SIZE =
new HadoopConfigurationProperty<>("fs.gs.block.size", 64 * 1024 * 1024L);

/**
* Configuration key for enabling hierarchical namespace buckets.
*
* <p>If this is enabled, rename folder operation on a Hierarchical namespace enabled bucket will
* be performed by calling the rename API.
*/
public static final HadoopConfigurationProperty<Boolean> GCS_HIERARCHICAL_NAMESPACE_ENABLE =
new HadoopConfigurationProperty<>("fs.gs.hierarchical.namespace.folders.enable", false);

/** Configuration key for Delegation Token binding class. Default value: none */
public static final HadoopConfigurationProperty<String> DELEGATION_TOKEN_BINDING_CLASS =
new HadoopConfigurationProperty<>("fs.gs.delegation.token.binding");
Expand Down Expand Up @@ -581,6 +590,7 @@ static GoogleCloudStorageOptions.Builder getGcsOptionsBuilder(Configuration conf
.setEncryptionKey(GCS_ENCRYPTION_KEY.getPassword(config))
.setEncryptionKeyHash(GCS_ENCRYPTION_KEY_HASH.getPassword(config))
.setGrpcEnabled(GCS_GRPC_ENABLE.get(config, config::getBoolean))
.setHnBucketRenameEnabled(GCS_HIERARCHICAL_NAMESPACE_ENABLE.get(config, config::getBoolean))
.setGrpcMessageTimeoutCheckInterval(GCS_GRPC_CHECK_INTERVAL_TIMEOUT.getTimeDuration(config))
.setGrpcServerAddress(GCS_GRPC_SERVER_ADDRESS.get(config, config::get))
.setHttpRequestConnectTimeout(GCS_HTTP_CONNECT_TIMEOUT.getTimeDuration(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.grpc.trafficdirector.enable", true);
put("fs.gs.grpc.write.buffered.requests", 20);
put("fs.gs.grpc.write.message.timeout", 3_000L);
put("fs.gs.hierarchical.namespace.folders.enable", false);
put("fs.gs.grpc.write.timeout", 600_000L);
put("fs.gs.http.connect-timeout", 5_000L);
put("fs.gs.http.max.retry", 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,8 @@
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.GlobAlgorithm;
import com.google.cloud.hadoop.fs.gcs.auth.AbstractDelegationTokenBinding;
import com.google.cloud.hadoop.fs.gcs.auth.TestDelegationTokenBindingImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions;
import com.google.cloud.hadoop.gcsio.*;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.ClientType;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.MethodOutcome;
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
Expand Down Expand Up @@ -162,6 +158,43 @@ public MethodOutcome renameFileIntoRootOutcome() {
});
}

@Test
public void testRenameHnBucket() throws Exception {
String bucketName = this.gcsiHelper.getUniqueBucketName("hn");
GoogleHadoopFileSystem googleHadoopFileSystem = new GoogleHadoopFileSystem();

URI initUri = new URI("gs://" + bucketName);
Configuration config = loadConfig();
config.setBoolean("fs.gs.hierarchical.namespace.folders.enable", true);
googleHadoopFileSystem.initialize(initUri, config);

GoogleCloudStorage theGcs = googleHadoopFileSystem.getGcsFs().getGcs();
theGcs.createBucket(
bucketName, CreateBucketOptions.builder().setHierarchicalNamespaceEnabled(true).build());

try {
GoogleCloudStorageFileSystemIntegrationHelper helper =
new HadoopFileSystemIntegrationHelper(googleHadoopFileSystem);

renameHelper(
new HdfsBehavior() {
/**
* Returns the MethodOutcome of trying to rename an existing file into the root
* directory.
*/
@Override
public MethodOutcome renameFileIntoRootOutcome() {
return new MethodOutcome(MethodOutcome.Type.RETURNS_TRUE);
}
},
bucketName,
bucketName,
helper);
} finally {
googleHadoopFileSystem.delete(new Path(initUri));
}
}

@Test
public void testInitializePath_success() throws Exception {
List<String> validPaths = Arrays.asList("gs://foo", "gs://foo/bar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,7 @@ public boolean isOpen() {
};
}
}

@Override
public void testRenameHnBucket() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected static Configuration loadConfig(Configuration config, ClientType stora
Configuration newConfig = new Configuration(config);
newConfig.setBoolean("fs.gs.implicit.dir.repair.enable", true);
newConfig.setBoolean("fs.gs.bucket.delete.enable", true);
newConfig.setBoolean("fs.gs.hierarchical.namespace.folders.enable", true);
newConfig.setEnum("fs.gs.client.type", storageClientType);
return newConfig;
}
Expand Down
4 changes: 4 additions & 0 deletions gcsio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage-control</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-storage-v2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class CreateBucketOptions {
public static final CreateBucketOptions DEFAULT = builder().build();

public static Builder builder() {
return new AutoValue_CreateBucketOptions.Builder();
return new AutoValue_CreateBucketOptions.Builder().setHierarchicalNamespaceEnabled(false);
}

public abstract Builder toBuilder();
Expand All @@ -41,13 +41,17 @@ public static Builder builder() {
@Nullable
public abstract String getStorageClass();

public abstract boolean getHierarchicalNamespaceEnabled();

/** Returns the bucket retention period. */
@Nullable
public abstract Duration getTtl();

/** Builder for {@link CreateBucketOptions} */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setHierarchicalNamespaceEnabled(boolean enabled);

public abstract Builder setLocation(String location);

public abstract Builder setStorageClass(String storageClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.List;
Expand Down Expand Up @@ -145,12 +146,22 @@ public void copy(Map<StorageResourceId, StorageResourceId> sourceToDestinationOb
delegate.copy(sourceToDestinationObjectsMap);
}

@Override
public boolean isHnBucket(URI src) throws IOException {
return delegate.isHnBucket(src);
}

@Override
public List<String> listBucketNames() throws IOException {
logger.atFiner().log("%s.listBucketNames()", delegateClassName);
return delegate.listBucketNames();
}

@Override
public void renameHnFolder(URI src, URI dst) throws IOException {
delegate.renameHnFolder(src, dst);
}

@Override
public List<GoogleCloudStorageItemInfo> listBucketInfo() throws IOException {
logger.atFiner().log("%s.listBucketInfo()", delegateClassName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -270,9 +271,29 @@ default void copy(Map<StorageResourceId, StorageResourceId> sourceToDestinationO
copy(srcBucketName, srcObjectNames, dstBucketName, dstObjectNames);
}

/**
* Checks if {@code resourceId} belongs to a Hierarchical namespace enabled bucket. This takes a
* path and not the bucket name since the caller may not have permission to query the bucket.
*
* @param path Path for which the check need to be performed
* @return
* @throws IOException
*/
boolean isHnBucket(URI path) throws IOException;

/** Gets a list of names of buckets in this project. */
List<String> listBucketNames() throws IOException;

/**
* Renames {@code src} to {@code dst} using the rename LRO API. This should be called only on an
* Hierarchical namespace enabled bucket.
*
* @param src source path
* @param dst destination path
* @throws IOException
*/
void renameHnFolder(URI src, URI dst) throws IOException;

/**
* Gets a list of GoogleCloudStorageItemInfo for all buckets of this project. This is no more
* expensive than calling listBucketNames(), since the list API for buckets already retrieves all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ private void renameDirectoryInternal(FileInfo srcInfo, URI dst) throws IOExcepti
checkArgument(dst.toString().endsWith(PATH_DELIMITER), "'%s' should be a directory", dst);

URI src = srcInfo.getPath();
if (this.options.getCloudStorageOptions().isHnBucketRenameEnabled()
&& this.gcs.isHnBucket(src)) {
this.gcs.renameHnFolder(src, dst);
return;
}

// Mapping from each src to its respective dst.
// Sort src items so that parent directories appear before their children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.api.client.util.Data;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.Bucket;
Expand All @@ -66,6 +67,7 @@
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.cloud.hadoop.util.TraceOperation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -75,9 +77,11 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.storage.control.v2.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
Expand Down Expand Up @@ -123,6 +127,9 @@ public class GoogleCloudStorageImpl implements GoogleCloudStorage {
.setEnsureEmptyObjectsMetadataMatch(false)
.build();

private static Cache<String, Boolean> cache =
CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();

// Object field that are used in GoogleCloudStorageItemInfo
static final String OBJECT_FIELDS =
String.join(
Expand All @@ -142,6 +149,11 @@ public class GoogleCloudStorageImpl implements GoogleCloudStorage {

private final MetricsRecorder metricsRecorder;

private final Credentials credential;

// Lazily created since HN folders may not be enabled.
private StorageControlClient storageControlClient;

// A function to encode metadata map values
static String encodeMetadataValues(byte[] bytes) {
return bytes == null ? Data.NULL_STRING : BaseEncoding.base64().encode(bytes);
Expand Down Expand Up @@ -336,7 +348,7 @@ public Boolean load(String bucketName) {
.setServicePath(options.getStorageServicePath())
.setApplicationName(options.getAppName())
.build();

this.credential = credentials;
this.storageRequestFactory = new StorageRequestFactory(storage);

this.metricsRecorder =
Expand Down Expand Up @@ -450,8 +462,19 @@ public void createBucket(String bucketName, CreateBucketOptions options) throws
Bucket bucket =
new Bucket()
.setName(bucketName)
.setHierarchicalNamespace(
new Bucket.HierarchicalNamespace()
.setEnabled(options.getHierarchicalNamespaceEnabled()))
.setLocation(options.getLocation())
.setStorageClass(options.getStorageClass());

if (options.getHierarchicalNamespaceEnabled()) {
bucket.setIamConfiguration(
new Bucket.IamConfiguration()
.setUniformBucketLevelAccess(
new Bucket.IamConfiguration.UniformBucketLevelAccess().setEnabled(true)));
}

if (options.getTtl() != null) {
Bucket.Lifecycle.Rule lifecycleRule =
new Bucket.Lifecycle.Rule()
Expand Down Expand Up @@ -1972,6 +1995,11 @@ public void close() {
backgroundTasksThreadPool = null;
manualBatchingThreadPool = null;
}

if (this.storageControlClient != null) {
this.storageControlClient.close();
this.storageControlClient = null;
}
}

/**
Expand Down Expand Up @@ -2116,6 +2144,68 @@ private boolean canIgnoreExceptionForEmptyObject(
return false;
}

@Override
public boolean isHnBucket(URI src) throws IOException {
String bucketName = src.getAuthority();
Boolean isEnabled = cache.getIfPresent(bucketName);
if (isEnabled != null) {
return isEnabled;
}

String prefix = src.getPath().substring(1);

StorageControlClient storageControlClient = lazyGetStorageControlClient();
GetStorageLayoutRequest request =
GetStorageLayoutRequest.newBuilder()
.setPrefix(prefix)
.setName(StorageLayoutName.format("_", bucketName))
.build();

try (ITraceOperation to = TraceOperation.addToExistingTrace("getStorageLayout.HN")) {
StorageLayout storageLayout = storageControlClient.getStorageLayout(request);
boolean result =
storageLayout.hasHierarchicalNamespace()
&& storageLayout.getHierarchicalNamespace().getEnabled();

logger.atInfo().log("Checking if %s is HN enabled returned %s", src, result);

cache.put(bucketName, result);

return result;
}
}

private StorageControlClient lazyGetStorageControlClient() throws IOException {
if (this.storageControlClient == null) {
this.storageControlClient =
StorageControlClient.create(
StorageControlSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(this.credential))
.build());
}

return this.storageControlClient;
}

@Override
public void renameHnFolder(URI src, URI dst) throws IOException {
String bucketName = src.getAuthority();
String srcFolder = FolderName.of("_", bucketName, src.getPath().substring(1)).toString();
RenameFolderRequest request =
RenameFolderRequest.newBuilder()
.setDestinationFolderId(dst.getPath().substring(1))
.setName(srcFolder)
.build();

try (ITraceOperation to = TraceOperation.addToExistingTrace("renameHnFolder")) {
logger.atFine().log("Renaming HN folder (%s -> %s)", src, dst);
this.storageControlClient.renameFolderOperationCallable().call(request);
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Renaming %s to %s failed", src, dst);
throw t;
}
}

@Override
public void compose(
String bucketName, List<String> sources, String destination, String contentType)
Expand Down
Loading

0 comments on commit fc94f28

Please sign in to comment.