Skip to content

Commit

Permalink
Remove piped write in when using Java Storage Client (#1133)
Browse files Browse the repository at this point in the history
  • Loading branch information
arunkumarchacko committed Apr 15, 2024
1 parent af94c95 commit 2610f60
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,8 @@ public WritableByteChannel create(StorageResourceId resourceId, CreateObjectOpti
getWriteGeneration(resourceId, options.isOverwriteExisting()));
}

GoogleCloudStorageClientWriteChannel channel =
new GoogleCloudStorageClientWriteChannel(
storage, storageOptions, resourceIdWithGeneration, options, backgroundTasksThreadPool);
channel.initialize();
return channel;
return new GoogleCloudStorageClientWriteChannel(
storage, storageOptions, resourceIdWithGeneration, options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.storage.v2.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES;

import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand All @@ -28,40 +26,29 @@
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.ByteStreams;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;

/** Implements WritableByteChannel to provide write access to GCS via java-storage client */
class GoogleCloudStorageClientWriteChannel extends AbstractGoogleAsyncWriteChannel<Boolean> {
class GoogleCloudStorageClientWriteChannel implements WritableByteChannel {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final StorageResourceId resourceId;
private WritableByteChannel writableByteChannel;
private final BlobWriteSession blobWriteSession;
private boolean uploadSucceeded = false;
// TODO: not supported as of now
// private final String requesterPaysProject;

public GoogleCloudStorageClientWriteChannel(
Storage storage,
GoogleCloudStorageOptions storageOptions,
StorageResourceId resourceId,
CreateObjectOptions createOptions,
ExecutorService uploadThreadPool)
CreateObjectOptions createOptions)
throws IOException {
super(uploadThreadPool, storageOptions.getWriteChannelOptions());
this.resourceId = resourceId;
this.blobWriteSession = getBlobWriteSession(storage, resourceId, createOptions, storageOptions);
BlobWriteSession blobWriteSession =
getBlobWriteSession(storage, resourceId, createOptions, storageOptions);
try {
this.writableByteChannel = blobWriteSession.open();
} catch (StorageException e) {
Expand All @@ -70,18 +57,6 @@ public GoogleCloudStorageClientWriteChannel(
}
}

@Override
public void startUpload(InputStream pipeSource) throws IOException {
// Given that the two ends of the pipe must operate asynchronous relative
// to each other, we need to start the upload operation on a separate thread.
try {
uploadOperation = threadPool.submit(new UploadOperation(pipeSource, this.resourceId));
} catch (Exception e) {
GoogleCloudStorageEventBus.postOnException();
throw new RuntimeException(String.format("Failed to start upload for '%s'", resourceId), e);
}
}

private static BlobInfo getBlobInfo(
StorageResourceId resourceId, CreateObjectOptions createOptions) {
BlobInfo blobInfo =
Expand All @@ -107,61 +82,6 @@ private static BlobWriteSession getBlobWriteSession(
generateWriteOptions(createOptions, storageOptions));
}

private class UploadOperation implements Callable<Boolean> {

// Read end of the pipe.
private final InputStream pipeSource;
private final StorageResourceId resourceId;
private final int MAX_BYTES_PER_MESSAGE = MAX_WRITE_CHUNK_BYTES.getNumber();

UploadOperation(@Nonnull InputStream pipeSource, @Nonnull StorageResourceId resourceId) {
this.resourceId = resourceId;
this.pipeSource = pipeSource;
}

@Override
public Boolean call() throws Exception {
// Try-with-resource will close this end of the pipe so that
// the writer at the other end will not hang indefinitely.
logger.atFiner().log("Starting upload for resource %s", resourceId);
try (pipeSource) {
boolean lastChunk = false;
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_BYTES_PER_MESSAGE);
while (!lastChunk) {
int remainingCapacity = byteBuffer.remaining();
ByteString data =
ByteString.readFrom(
ByteStreams.limit(pipeSource, remainingCapacity), remainingCapacity);
if (data.size() < remainingCapacity) {
lastChunk = true;
}
byteBuffer.put(data.toByteArray());
// switch to read mode
byteBuffer.flip();
// this could result into partial write
writeInternal(byteBuffer);
if (!lastChunk) {
// compact buffer for further writing
byteBuffer.compact();
}
}
// last chunk could be partially written
// uploading all bytes of last chunk
if (lastChunk && byteBuffer.hasRemaining()) {
while (byteBuffer.hasRemaining()) {
writeInternal(byteBuffer);
}
}
logger.atFiner().log("Uploaded all chunks for resource %s", resourceId);
return true;
} catch (Exception e) {
GoogleCloudStorageEventBus.postOnException();
throw new IOException(
String.format("Error occurred while uploading resource %s", resourceId), e);
}
}
}

private static BlobWriteOption[] generateWriteOptions(
CreateObjectOptions createOptions, GoogleCloudStorageOptions storageOptions) {
List<BlobWriteOption> blobWriteOptions = new ArrayList<>();
Expand All @@ -181,13 +101,18 @@ private static BlobWriteOption[] generateWriteOptions(
return blobWriteOptions.toArray(new BlobWriteOption[blobWriteOptions.size()]);
}

@Override
public boolean isOpen() {
return writableByteChannel != null && writableByteChannel.isOpen();
}

@Override
public void close() throws IOException {
try {
if (!isOpen()) {
return;
}
super.close();

// WriteChannel close is overloaded with
// 1. object closable
// 2. finalizing gcs-object
Expand All @@ -196,31 +121,23 @@ public void close() throws IOException {
writableByteChannel.close();
} catch (Exception e) {
GoogleCloudStorageEventBus.postOnException();
throw new IOException(String.format("Upload failed for '%s'", resourceId), e);
throw new IOException(
String.format("Upload failed for '%s'. reason=%s", resourceId, e.getMessage()), e);
} finally {
writableByteChannel = null;
}
}

@Override
public void handleResponse(Boolean response) {
this.uploadSucceeded = response;
}

@Override
protected String getResourceString() {
return resourceId.toString();
}

public boolean isUploadSuccessful() {
return uploadSucceeded;
}

private int writeInternal(ByteBuffer byteBuffer) throws IOException {
int bytesWritten = writableByteChannel.write(byteBuffer);
logger.atFinest().log(
"%d bytes were written out of provided buffer of capacity %d",
bytesWritten, byteBuffer.limit());
return bytesWritten;
}

@Override
public int write(ByteBuffer src) throws IOException {
return writeInternal(src);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.jsonErrorResponse;
import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.mockTransport;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static com.google.storage.v2.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -38,23 +33,16 @@
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.cloud.hadoop.util.RetryHttpInitializerOptions;
import com.google.cloud.hadoop.util.testing.FakeCredentials;
import com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.ErrorResponses;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobWriteSession;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -111,121 +99,32 @@ public static void cleanUp() {
@Test
public void writeMultipleChunksSuccess() throws IOException {
int numberOfChunks = 10;
writeChannel.initialize();
ByteString data =
GoogleCloudStorageTestHelper.createTestData(
MAX_WRITE_CHUNK_BYTES.getNumber() * numberOfChunks);
writeChannel.write(data.asReadOnlyByteBuffer());
writeChannel.close();
// Fake writer only writes half the buffer at a time
verify(fakeWriteChannel, times(numberOfChunks * 2)).write(any());
verify(fakeWriteChannel, times(1)).write(any());
verify(fakeWriteChannel, times(1)).close();
verifyBlobInfoProperties(blobInfoCapture, resourceId);
verifyBlobWriteOptionProperties(blobWriteOptionsCapture);
assertThat(writeChannel.isUploadSuccessful()).isTrue();
}

@Test
public void writeSingleChunkSuccess() throws IOException {
int numberOfChunks = 1;
writeChannel.initialize();
// (chunkSize/2) < data.size < chunkSize
ByteString data =
GoogleCloudStorageTestHelper.createTestData(
MAX_WRITE_CHUNK_BYTES.getNumber() * numberOfChunks - 1);
writeChannel.write(data.asReadOnlyByteBuffer());
writeChannel.close();
// Fake writer only writes half the buffer at a time
verify(fakeWriteChannel, times(numberOfChunks * 2)).write(any());
verify(fakeWriteChannel, times(1)).close();
verifyBlobInfoProperties(blobInfoCapture, resourceId);
verifyBlobWriteOptionProperties(blobWriteOptionsCapture);
assertThat(writeChannel.isUploadSuccessful()).isTrue();
}

@Test
public void writeMultipleChunksFailure() throws IOException {
fakeWriteChannel = spy(new FakeWriteChannel(true));
ArgumentCaptor<BlobInfo> blobInfoCapture = ArgumentCaptor.forClass(BlobInfo.class);
ArgumentCaptor<BlobWriteOption> blobWriteOptionsCapture =
ArgumentCaptor.forClass(BlobWriteOption.class);
when(mockedStorage.blobWriteSession(
blobInfoCapture.capture(), blobWriteOptionsCapture.capture()))
.thenReturn(mockBlobWriteSession);
when(mockBlobWriteSession.open()).thenReturn(fakeWriteChannel);
writeChannel = getJavaStorageChannel();
writeChannel.initialize();
ByteString data =
GoogleCloudStorageTestHelper.createTestData(MAX_WRITE_CHUNK_BYTES.getNumber() * 10);
assertThrows(IOException.class, () -> writeChannel.write(data.asReadOnlyByteBuffer()));
verify(fakeWriteChannel, times(1)).write(any());
verify(fakeWriteChannel, times(1)).close();
verifyBlobInfoProperties(blobInfoCapture, resourceId);
verifyBlobWriteOptionProperties(blobWriteOptionsCapture);
assertThrows(IOException.class, writeChannel::close);
assertThat(writeChannel.isUploadSuccessful()).isFalse();
}

/**
* Test handling when the parent thread waiting for the write to finish via the close call is
* interrupted, that the actual write is cancelled and interrupted as well.
*/
@Test
public void testCreateObjectApiInterruptedException() throws Exception {

Storage mockedJavaClientStorage = mock(Storage.class);
BlobWriteSession mockBlobWriteSession = mock(BlobWriteSession.class);
// Set up the mock Insert to wait forever.
CountDownLatch waitForEverLatch = new CountDownLatch(1);
CountDownLatch writeStartedLatch = new CountDownLatch(2);
CountDownLatch threadsDoneLatch = new CountDownLatch(2);
// Mock getItemInfo call
MockHttpTransport transport = mockTransport(jsonErrorResponse(ErrorResponses.NOT_FOUND));
when(mockedJavaClientStorage.blobWriteSession(any(), any())).thenReturn(mockBlobWriteSession);
when(mockBlobWriteSession.open())
.thenReturn(
new FakeWriteChannel() {
@Override
public int write(ByteBuffer src) {
try {
writeStartedLatch.countDown();
waitForEverLatch.await();
} catch (InterruptedException e) {
// Expected test behavior. Do nothing.
} finally {
threadsDoneLatch.countDown();
}
fail("Unexpected to get here.");
return 0;
}
});

GoogleCloudStorage gcs = mockGcsJavaStorage(transport, mockedJavaClientStorage);

WritableByteChannel writeChannel = gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME));
assertThat(writeChannel.isOpen()).isTrue();

ExecutorService executorService = Executors.newCachedThreadPool();
Future<?> callForClose =
executorService.submit(
() -> {
writeStartedLatch.countDown();
try {
IOException ioe = assertThrows(IOException.class, writeChannel::close);
assertThat(ioe).isInstanceOf(ClosedByInterruptException.class);
} finally {
threadsDoneLatch.countDown();
}
});
// Wait for the insert object to be executed, then cancel the writing thread, and finally wait
// for the two threads to finish.
assertWithMessage("Neither thread started.")
.that(writeStartedLatch.await(5000, TimeUnit.MILLISECONDS))
.isTrue();
// callForClose will be waiting on write(waiting forever) to finish. Interrupt it.
callForClose.cancel(/* interrupt= */ true);
assertWithMessage("Failed to wait for tasks to get interrupted.")
.that(threadsDoneLatch.await(5000, TimeUnit.MILLISECONDS))
.isTrue();
}

private GoogleCloudStorageClientWriteChannel getJavaStorageChannel() throws IOException {
Expand All @@ -241,8 +140,7 @@ private GoogleCloudStorageClientWriteChannel getJavaStorageChannel() throws IOEx
.setContentEncoding(CONTENT_ENCODING)
.setMetadata(GoogleCloudStorageTestHelper.getDecodedMetadata(metadata))
.setKmsKeyName(KMS_KEY)
.build(),
EXECUTOR_SERVICE);
.build());
}

private static void verifyBlobInfoProperties(
Expand Down
Loading

0 comments on commit 2610f60

Please sign in to comment.