Skip to content

Commit

Permalink
optimize readVectored performance (#1165)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhravidutt committed Jun 11, 2024
1 parent c2d4bfb commit 69afffc
Show file tree
Hide file tree
Showing 17 changed files with 1,415 additions and 13 deletions.
2 changes: 2 additions & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@
1. Change default value of `fs.gs.inputstream.min.range.request.size` property
from `524288` to `2097152`.

1. Add readVectored API implementation.

### 2.1.1 - 2020-03-11

1. Add upload cache to support high-level retries of failed uploads. Cache size
Expand Down
22 changes: 22 additions & 0 deletions gcs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,28 @@ default service account impersonation.
be no-op if called more frequently than minimum sync interval and `hsync()`
will block until an end of a min sync interval.

#### Vectored Read configuration

Knobs configure the vectoredRead API

* `fs.gs.vectored.read.min.range.seek.size` (default: `4k`)

If next range (in sorted rangeRequest list) is in within these many bytes, it
will be combined with exiting rangeRequest while fetching data from
underneath channel. Result will again be decoupled once data is fetched for
combined range request.

* `fs.gs.vectored.read.merged.range.max.size` (default: `8m`)
It controls the length of content requested via merged/combined range request.
If by merging ranges resulted content is greater than this value, ranges will
not be merged. Do, consider increasing this value if task queue of range
request is overloaded.

* `fs.gs.vectored.read.threads` (default: `16`)
It controls the parallel processing of range request. These threads will be
shared across all readVectored invocation. If the task queue of range request
is overloaded do consider increasing this value.

### HTTP transport configuration

* `fs.gs.application.name.suffix` (not set by default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private InputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatisti
StreamStatisticNames.STREAM_READ_TOTAL_BYTES)
.withDurationTracking(
GhfsStatistic.STREAM_READ_SEEK_OPERATIONS.getSymbol(),
GhfsStatistic.STREAM_READ_VECTORED_OPERATIONS.getSymbol(),
GhfsStatistic.STREAM_READ_CLOSE_OPERATIONS.getSymbol(),
GhfsStatistic.STREAM_READ_OPERATIONS.getSymbol())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,29 @@ public enum GhfsStatistic {
STREAM_READ_OPERATIONS(
StreamStatisticNames.STREAM_READ_OPERATIONS, "Calls of read()", TYPE_DURATION),

STREAM_READ_VECTORED_OPERATIONS(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
"Calls of readVectored()",
TYPE_DURATION),

STREAM_READ_VECTORED_EXTRA_READ_BYTES(
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
"Discarded read bytes during readVectored operation",
TYPE_COUNTER),

STREAM_READ_VECTORED_READ_RANGE_DURATION(
"stream_readVectored_range_duration", "Latency of individual FileRange", TYPE_DURATION),

STREAM_READ_VECTORED_READ_INCOMING_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
"size of fileRanges requested in readVectoredRequest",
TYPE_COUNTER),

STREAM_READ_VECTORED_READ_COMBINED_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
"size of combined fileRange requested per readVectoredRequest",
TYPE_COUNTER),

STREAM_READ_OPERATIONS_INCOMPLETE(
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE,
"Count of incomplete read() operations in an input stream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ void updateReadStreamStats(int len, long start) {
storageStatistics.streamReadBytes(len);
}

void updateVectoredReadStreamStats(long start) {
updateStats(start);
}

private static long toMillis(long nano) {
return nano / 1000_000;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_READ_OPERATIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_READ_SEEK_OPERATIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_READ_VECTORED_OPERATIONS;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.max;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;

import com.google.cloud.hadoop.gcsio.FileInfo;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.ClientType;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.common.flogger.GoogleLogger;
Expand All @@ -33,9 +36,13 @@
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand All @@ -51,6 +58,9 @@ class GoogleHadoopFSInputStream extends FSInputStream implements IOStatisticsSou

// Path of the file to read.
private final URI gcsPath;
// File Info of gcsPath, will be pre-populated in some cases i.e. when Json client is used and
// failFast is disabled.
private final FileInfo fileInfo;
// All store IO access goes through this.
private final SeekableByteChannel channel;
// Number of bytes read through this channel.
Expand All @@ -72,18 +82,49 @@ class GoogleHadoopFSInputStream extends FSInputStream implements IOStatisticsSou

private final GhfsStreamStats streamStats;
private final GhfsStreamStats seekStreamStats;
private final GhfsStreamStats vectoredReadStats;

// Statistic tracker of the Input stream
private final GhfsInputStreamStatistics streamStatistics;
private final Supplier<VectoredIOImpl> vectoredIOSupplier;
private final GoogleCloudStorageFileSystem gcsFs;

static GoogleHadoopFSInputStream create(
GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics)
throws IOException {
logger.atFiner().log("create(gcsPath: %s)", gcsPath);
GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs();
SeekableByteChannel channel =
gcsFs.open(gcsPath, gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
return new GoogleHadoopFSInputStream(ghfs, gcsPath, channel, statistics);
FileInfo fileInfo = null;
SeekableByteChannel channel;
// Extract out the fileInfo call here and use it in readChannel as well as in vectoredRead API
if (shouldPreFetchFileInfo(gcsFs.getOptions())) {
// ingest the fileInfo extracted while creating gcsio channel to avoid duplicate call.
fileInfo = gcsFs.getFileInfoObject(gcsPath);
channel =
gcsFs.open(fileInfo, gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
} else {
// cases where fileInfo wouldn't have been requested in gcsio layer.
channel =
gcsFs.open(gcsPath, gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
}
return new GoogleHadoopFSInputStream(ghfs, gcsPath, fileInfo, channel, statistics);
}

private static boolean shouldPreFetchFileInfo(GoogleCloudStorageFileSystemOptions gcsFSOptions) {
// FileInfo is requested while opening the channel in gcsio channel layer in following
// conditions
// 1. failFastOnNotFound is enabled
// 2. java-storage library is in use (failedFast in no-op for grpc flow).
// prefecthing the fileInfo in FsInputSteam. So, that it can be used across other read API i.e.
// vectoredRead
if (gcsFSOptions.getClientType() == ClientType.STORAGE_CLIENT
|| gcsFSOptions
.getCloudStorageOptions()
.getReadChannelOptions()
.isFastFailOnNotFoundEnabled()) {
return true;
}
return false;
}

static GoogleHadoopFSInputStream create(
Expand All @@ -93,17 +134,20 @@ static GoogleHadoopFSInputStream create(
GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs();
SeekableByteChannel channel =
gcsFs.open(fileInfo, gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
return new GoogleHadoopFSInputStream(ghfs, fileInfo.getPath(), channel, statistics);
return new GoogleHadoopFSInputStream(ghfs, fileInfo.getPath(), fileInfo, channel, statistics);
}

private GoogleHadoopFSInputStream(
GoogleHadoopFileSystem ghfs,
URI gcsPath,
FileInfo fileInfo,
SeekableByteChannel channel,
FileSystem.Statistics statistics) {
logger.atFiner().log("GoogleHadoopFSInputStream(gcsPath: %s)", gcsPath);
this.gcsPath = gcsPath;
this.channel = channel;
this.fileInfo = fileInfo;
this.gcsFs = ghfs.getGcsFs();
this.statistics = statistics;
this.storageStatistics = ghfs.getGlobalGcsStorageStatistics();

Expand All @@ -113,8 +157,34 @@ private GoogleHadoopFSInputStream(
new GhfsStreamStats(storageStatistics, GhfsStatistic.STREAM_READ_OPERATIONS, gcsPath);
this.seekStreamStats =
new GhfsStreamStats(storageStatistics, GhfsStatistic.STREAM_READ_SEEK_OPERATIONS, gcsPath);
this.vectoredReadStats =
new GhfsStreamStats(
storageStatistics, GhfsStatistic.STREAM_READ_VECTORED_OPERATIONS, gcsPath);

this.traceFactory = ghfs.getTraceFactory();
this.vectoredIOSupplier = ghfs.getVectoredIOSupplier();
}

/**
* {@inheritDoc} Vectored read implementation for GoogleHadoopFSInputStream.
*
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
trackDuration(
streamStatistics,
STREAM_READ_VECTORED_OPERATIONS.getSymbol(),
() -> {
long startTimeNs = System.nanoTime();
vectoredIOSupplier.get().readVectored(ranges, allocate, gcsFs, fileInfo, gcsPath);
statistics.incrementReadOps(1);
vectoredReadStats.updateVectoredReadStreamStats(startTimeNs);
return null;
});
}

@Override
Expand Down Expand Up @@ -214,14 +284,20 @@ public synchronized void close() throws IOException {
closed = true;
try {
logger.atFiner().log("close(): %s", gcsPath);
if (channel != null) {
logger.atFiner().log(
"Closing '%s' file with %d total bytes read", gcsPath, totalBytesRead);
channel.close();
try {
if (channel != null) {
logger.atFiner().log(
"Closing '%s' file with %d total bytes read", gcsPath, totalBytesRead);
channel.close();
}
} catch (Exception e) {
logger.atWarning().withCause(e).log(
"Error while closing underneath read channel resources for path: %s", gcsPath);
}
} finally {
streamStats.close();
seekStreamStats.close();
vectoredReadStats.close();
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,16 @@ public class GoogleHadoopFileSystem extends FileSystem implements IOStatisticsSo
/** Underlying GCS file system object. */
private Supplier<GoogleCloudStorageFileSystem> gcsFsSupplier;

private Supplier<VectoredIOImpl> vectoredIOSupplier;

private boolean gcsFsInitialized = false;
private boolean vectoredIOInitialized = false;
/**
* Current working directory; overridden in initialize() if {@link
* GoogleHadoopFileSystemConfiguration#GCS_WORKING_DIRECTORY} is set.
*/
private Path workingDirectory;

/** The fixed reported permission of all files. */
private FsPermission reportedPermissions;

Expand Down Expand Up @@ -295,6 +299,8 @@ public void initialize(URI path, Configuration config) throws IOException {
GhfsStorageStatistics.NAME,
() -> new GhfsStorageStatistics(instrumentation.getIOStatistics()));

initializeVectoredIO(config, globalStorageStatistics, statistics);

initializeGcsFs(config);

this.traceFactory = TraceFactory.get(GCS_TRACE_LOG_ENABLE.get(config, config::getBoolean));
Expand Down Expand Up @@ -366,6 +372,31 @@ private synchronized void initializeGcsFs(Configuration config) throws IOExcepti
}
}

private synchronized void initializeVectoredIO(
Configuration config,
GhfsGlobalStorageStatistics globalStorageStatistics,
FileSystem.Statistics statistics)
throws IOException {
if (vectoredIOSupplier == null) {
vectoredIOSupplier =
Suppliers.memoize(
() -> {
try {
VectoredIOImpl vectoredIO =
new VectoredIOImpl(
GoogleHadoopFileSystemConfiguration.getVectoredReadOptionBuilder(config)
.build(),
globalStorageStatistics,
statistics);
vectoredIOInitialized = true;
return vectoredIO;
} catch (Exception e) {
throw new RuntimeException("Failure initializing vectoredIO", e);
}
});
}
}

private void initializeGcsFs(GoogleCloudStorageFileSystem gcsFs) {
gcsFsSupplier = Suppliers.ofInstance(gcsFs);
gcsFsInitialized = true;
Expand Down Expand Up @@ -1554,6 +1585,10 @@ public GoogleCloudStorageFileSystem getGcsFs() {
return gcsFsSupplier.get();
}

public Supplier<VectoredIOImpl> getVectoredIOSupplier() {
return vectoredIOSupplier;
}

/**
* Loads an {@link AccessTokenProvider} implementation retrieved from the provided {@code
* AbstractDelegationTokenBinding} if configured, otherwise it's null.
Expand Down Expand Up @@ -1645,6 +1680,20 @@ public void close() throws IOException {
}
}

if (vectoredIOSupplier != null) {
if (vectoredIOInitialized) {
try {
vectoredIOSupplier.get().close();
} catch (Exception e) {
logger.atWarning().withCause(e).log(
"Failed to close the underneath vectoredIO implementation");
} finally {
vectoredIOSupplier = null;
vectoredIOInitialized = false;
}
}
}

backgroundTasksThreadPool.shutdown();
backgroundTasksThreadPool = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,23 @@ public class GoogleHadoopFileSystemConfiguration {
"fs.gs.inputstream.min.range.request.size",
GoogleCloudStorageReadOptions.DEFAULT.getMinRangeRequestSize());

/** Minimum distance that will be seeked without merging the ranges together. */
public static final HadoopConfigurationProperty<Integer> GCS_VECTORED_READ_RANGE_MIN_SEEK =
new HadoopConfigurationProperty<>(
"fs.gs.vectored.read.min.range.seek.size",
VectoredReadOptions.DEFAULT.getMinSeekVectoredReadSize());

/** Maximum size allowed for a merged range request. */
public static final HadoopConfigurationProperty<Integer> GCS_VECTORED_READ_MERGED_RANGE_MAX_SIZE =
new HadoopConfigurationProperty<>(
"fs.gs.vectored.read.merged.range.max.size",
VectoredReadOptions.DEFAULT.getMergeRangeMaxSize());

/** Maximum threads to process individual FileRange requests */
public static final HadoopConfigurationProperty<Integer> GCS_VECTORED_READ_THREADS =
new HadoopConfigurationProperty<>(
"fs.gs.vectored.read.threads", VectoredReadOptions.DEFAULT.getReadThreads());

/** Configuration key for enabling use of the gRPC API for read/write. */
public static final HadoopConfigurationProperty<Boolean> GCS_GRPC_ENABLE =
new HadoopConfigurationProperty<>(
Expand Down Expand Up @@ -538,6 +555,13 @@ static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Config
.setStatusParallelEnabled(GCS_STATUS_PARALLEL_ENABLE.get(config, config::getBoolean));
}

static VectoredReadOptions.Builder getVectoredReadOptionBuilder(Configuration config) {
return VectoredReadOptions.builder()
.setMinSeekVectoredReadSize(GCS_VECTORED_READ_RANGE_MIN_SEEK.get(config, config::getInt))
.setMergeRangeMaxSize(GCS_VECTORED_READ_MERGED_RANGE_MAX_SIZE.get(config, config::getInt))
.setReadThreads(GCS_VECTORED_READ_THREADS.get(config, config::getInt));
}

@VisibleForTesting
static GoogleCloudStorageOptions.Builder getGcsOptionsBuilder(Configuration config) {
String projectId = GCS_PROJECT_ID.get(config, config::get);
Expand Down
Loading

0 comments on commit 69afffc

Please sign in to comment.