Skip to content

Commit

Permalink
Add support for operation tracing (#1075)
Browse files Browse the repository at this point in the history
* Add support for operation tracing
  • Loading branch information
arunkumarchacko committed Dec 14, 2023
1 parent df2b7f0 commit 831bc38
Show file tree
Hide file tree
Showing 15 changed files with 1,368 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GLOB_ALGORITHM;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_LAZY_INITIALIZATION_ENABLE;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.PERMISSIONS_TO_REPORT;
import static com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.CLOUD_PLATFORM_SCOPE;
Expand Down Expand Up @@ -53,7 +54,10 @@
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AccessTokenProviderCredentials;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.cloud.hadoop.util.PropertyUtil;
import com.google.cloud.hadoop.util.TraceFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Suppliers;
Expand Down Expand Up @@ -110,6 +114,7 @@
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.Text;
Expand All @@ -118,6 +123,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.CallableRaisingIOE;

/**
* GoogleHadoopFileSystem is rooted in a single bucket at initialization time; in this case, Hadoop
Expand Down Expand Up @@ -210,6 +216,8 @@ public class GoogleHadoopFileSystem extends FileSystem implements IOStatisticsSo
/** The fixed reported permission of all files. */
private FsPermission reportedPermissions;

private ITraceFactory traceFactory = TraceFactory.get(/* isEnabled */ false);

/**
* Constructs an instance of GoogleHadoopFileSystem; the internal GoogleCloudStorageFileSystem
* will be set up with config settings when initialize() is called.
Expand Down Expand Up @@ -261,6 +269,8 @@ public void initialize(URI path, Configuration config) throws IOException {
GhfsStorageStatistics.NAME,
() -> new GhfsStorageStatistics(instrumentation.getIOStatistics()));
initializeGcsFs(config);

this.traceFactory = TraceFactory.get(GCS_TRACE_LOG_ENABLE.get(config, config::getBoolean));
}

private void initializeFsRoot() {
Expand Down Expand Up @@ -495,9 +505,10 @@ public String getScheme() {

@Override
public FSDataInputStream open(Path hadoopPath, int bufferSize) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_OPEN.getSymbol(),
hadoopPath,
() -> {
checkArgument(hadoopPath != null, "hadoopPath must not be null");
checkOpen();
Expand All @@ -518,9 +529,10 @@ public FSDataOutputStream create(
long blockSize,
Progressable progress)
throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_CREATE.getSymbol(),
hadoopPath,
() -> {
checkArgument(hadoopPath != null, "hadoopPath must not be null");
checkArgument(replication > 0, "replication must be a positive integer: %s", replication);
Expand Down Expand Up @@ -562,9 +574,10 @@ public FSDataOutputStream createNonRecursive(
long blockSize,
Progressable progress)
throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_CREATE_NON_RECURSIVE.getSymbol(),
hadoopPath,
() -> {

// incrementStatistic(GhfsStatistic.INVOCATION_CREATE_NON_RECURSIVE);
Expand All @@ -590,9 +603,10 @@ public FSDataOutputStream createNonRecursive(

@Override
public boolean rename(Path src, Path dst) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_RENAME.getSymbol(),
String.format("%s->%s", src, dst),
() -> {
checkArgument(src != null, "src must not be null");
checkArgument(dst != null, "dst must not be null");
Expand All @@ -619,11 +633,27 @@ public boolean rename(Path src, Path dst) throws IOException {
});
}

/**
* Tracks the duration of the operation {@code operation}. Also setup operation tracking using
* {@code ThreadTrace}.
*/
private <B> B trackDurationWithTracing(
DurationTrackerFactory factory,
String statistic,
Object context,
CallableRaisingIOE<B> operation)
throws IOException {
try (ITraceOperation op = traceFactory.createRootWithLogging(statistic, context)) {
return trackDuration(factory, statistic, operation);
}
}

@Override
public boolean delete(Path hadoopPath, boolean recursive) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_DELETE.getSymbol(),
hadoopPath,
() -> {
boolean response;
try {
Expand Down Expand Up @@ -825,9 +855,10 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOExce

@Override
public byte[] getXAttr(Path path, String name) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_XATTR_GET_NAMED.getSymbol(),
String.format("%s:%s", path, name),
() -> {
checkNotNull(path, "path should not be null");
checkNotNull(name, "name should not be null");
Expand All @@ -847,9 +878,10 @@ public byte[] getXAttr(Path path, String name) throws IOException {

@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_XATTR_GET_MAP.getSymbol(),
path,
() -> {
checkNotNull(path, "path should not be null");

Expand All @@ -869,9 +901,10 @@ public Map<String, byte[]> getXAttrs(Path path) throws IOException {

@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_XATTR_GET_NAMED_MAP.getSymbol(),
String.format("%s:%s", path, names == null ? -1 : names.size()),
() -> {
checkNotNull(path, "path should not be null");
checkNotNull(names, "names should not be null");
Expand All @@ -895,9 +928,10 @@ public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOExc

@Override
public List<String> listXAttrs(Path path) throws IOException {
return trackDuration(
return trackDurationWithTracing(
instrumentation,
GhfsStatistic.INVOCATION_OP_XATTR_LIST.getSymbol(),
path,
() -> {
checkNotNull(path, "path should not be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,4 +1832,24 @@ public void testInitializeCompatibleWithHadoopCredentialProvider() throws Except
FileSystem.get(new URI(PUBLIC_BUCKET), config);
// Initialization successful with no exception thrown.
}

@Test
public void testThreadTraceEnabledRename() throws Exception {
Configuration config = ghfs.getConf();
config.set("fs.gs.tracelog.enable", "true");
ghfs.initialize(ghfs.getUri(), config);

Path testRoot = new Path(sharedBucketName1, "/directory1/");
ghfs.mkdirs(testRoot);
assertThat(ghfs.exists(testRoot)).isTrue();

Path source = new Path(sharedBucketName1, "/directory1/file1");
ghfs.mkdirs(source);
assertThat(ghfs.exists(source)).isTrue();

Path dest = new Path(sharedBucketName1, "/directory2/");
assertThat(ghfs.exists(dest)).isFalse();
ghfs.rename(testRoot, dest);
assertThat(ghfs.exists(dest)).isTrue();
}
}
36 changes: 27 additions & 9 deletions gcsio/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageRequest;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.cloud.hadoop.util.ThreadTrace;
import com.google.cloud.hadoop.util.TraceOperation;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
Expand Down Expand Up @@ -65,6 +68,8 @@ public class BatchHelper {

private static final ThreadFactory THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("gcsfs-batch-helper-%d").setDaemon(true).build();
private final String traceContext;
private final ThreadTrace rootTrace;

/**
* Since each BatchHelper instance should be tied to a particular related set of requests, use
Expand All @@ -73,25 +78,31 @@ public class BatchHelper {
public static class Factory {
public BatchHelper newBatchHelper(
HttpRequestInitializer requestInitializer, Storage gcs, int maxRequestsPerBatch) {
return new BatchHelper(requestInitializer, gcs, maxRequestsPerBatch, /* numThreads= */ 0);
return new BatchHelper(requestInitializer, gcs, maxRequestsPerBatch, /* numThreads= */ 0, "");
}

BatchHelper newBatchHelper(
HttpRequestInitializer requestInitializer,
Storage gcs,
int maxRequestsPerBatch,
int totalRequests,
int maxThreads) {
int maxThreads,
String traceContext) {
checkArgument(maxRequestsPerBatch > 0, "maxRequestsPerBatch should be greater than 0");
checkArgument(totalRequests > 0, "totalRequests should be greater than 0");
checkArgument(maxThreads >= 0, "maxThreads should be greater or equal to 0");
// Do not send batch request when performing operations on 1 object.
if (totalRequests == 1) {
return new BatchHelper(
requestInitializer, gcs, /* maxRequestsPerBatch= */ 1, /* numThreads= */ 0);
requestInitializer,
gcs,
/* maxRequestsPerBatch= */ 1,
/* numThreads= */ 0,
traceContext);
}
if (maxThreads == 0) {
return new BatchHelper(requestInitializer, gcs, maxRequestsPerBatch, maxThreads);
return new BatchHelper(
requestInitializer, gcs, maxRequestsPerBatch, maxThreads, traceContext);
}
// If maxRequestsPerBatch is too high to fill up all parallel batches (maxThreads)
// then reduce it to evenly distribute requests across the batches
Expand All @@ -101,7 +112,7 @@ BatchHelper newBatchHelper(
// in batches (requestsPerBatch) then reduce it to minimum required number of threads
int numThreads = toIntExact((long) ceil((double) totalRequests / requestsPerBatch));
numThreads = min(numThreads, maxThreads);
return new BatchHelper(requestInitializer, gcs, requestsPerBatch, numThreads);
return new BatchHelper(requestInitializer, gcs, requestsPerBatch, numThreads, traceContext);
}
}

Expand Down Expand Up @@ -130,12 +141,15 @@ private BatchHelper(
HttpRequestInitializer requestInitializer,
Storage gcs,
long maxRequestsPerBatch,
int numThreads) {
int numThreads,
String traceContext) {
this.requestInitializer = requestInitializer;
this.gcs = gcs;
this.requestsExecutor =
numThreads == 0 ? newDirectExecutorService() : newRequestsExecutor(numThreads);
this.maxRequestsPerBatch = maxRequestsPerBatch;
this.traceContext = traceContext;
this.rootTrace = TraceOperation.current();
}

private static ExecutorService newRequestsExecutor(int numThreads) {
Expand Down Expand Up @@ -175,9 +189,10 @@ public <T> void queue(StorageRequest<T> req, JsonBatchCallback<T> callback) thro
}
}

private static <T> void execute(StorageRequest<T> req, JsonBatchCallback<T> callback)
private <T> void execute(StorageRequest<T> req, JsonBatchCallback<T> callback)
throws IOException {
try {
String traceName = this.traceContext + "(batchSize=1)";
try (ITraceOperation to = TraceOperation.getChildTrace(this.rootTrace, traceName)) {
T result = req.execute();
callback.onSuccess(result, req.getLastResponseHeaders());
} catch (IOException e) {
Expand Down Expand Up @@ -227,7 +242,10 @@ private void flushPendingRequests() throws IOException {
responseFutures.add(
requestsExecutor.submit(
() -> {
batch.execute();
String traceName = String.format("%s(batchSize=%s)", this.traceContext, batch.size());
try (ITraceOperation to = TraceOperation.getChildTrace(this.rootTrace, traceName)) {
batch.execute();
}
return null;
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import com.google.cloud.hadoop.util.AccessBoundary;
import com.google.cloud.hadoop.util.CheckedFunction;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.cloud.hadoop.util.LazyExecutorService;
import com.google.cloud.hadoop.util.ThreadTrace;
import com.google.cloud.hadoop.util.TraceOperation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -54,6 +57,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -495,8 +499,10 @@ public void rename(URI src, URI dst) throws IOException {
StorageResourceId.fromUriPath(
UriPaths.getParentPath(src), /* allowEmptyObjectName= */ true);
srcParentInfoFuture =
cachedExecutor.submit(
() -> getFileInfoInternal(srcParentId, /* inferImplicitDirectories= */ false));
runFuture(
cachedExecutor,
() -> getFileInfoInternal(srcParentId, /* inferImplicitDirectories= */ false),
"getParentFileInfo");
}

if (srcInfo.isDirectory()) {
Expand Down Expand Up @@ -918,7 +924,7 @@ public List<FileInfo> getFileInfos(List<URI> paths) throws IOException {
try {
List<Future<FileInfo>> infoFutures = new ArrayList<>(paths.size());
for (URI path : paths) {
infoFutures.add(fileInfoExecutor.submit(() -> getFileInfo(path)));
infoFutures.add(runFuture(fileInfoExecutor, () -> getFileInfo(path), "getFileInfo"));
}
fileInfoExecutor.shutdown();

Expand Down Expand Up @@ -996,6 +1002,16 @@ private void checkNoFilesConflictingWithDirs(StorageResourceId resourceId) throw
}
}

private <T> Future<T> runFuture(ExecutorService service, Callable<T> task, String name) {
ThreadTrace trace = TraceOperation.current();
return service.submit(
() -> {
try (ITraceOperation traceOperation = TraceOperation.getChildTrace(trace, name)) {
return task.call();
}
});
}

/**
* For objects whose name looks like a path (foo/bar/zoo), returns all directory paths.
*
Expand Down
Loading

0 comments on commit 831bc38

Please sign in to comment.