From 9dfeeb4a9e1d3ed340a4b808974eba90b165b47f Mon Sep 17 00:00:00 2001 From: Alex Weibel Date: Wed, 5 Jun 2019 16:58:35 -0700 Subject: [PATCH] Add Http Request and Response APIs (#57) * Add Http Request and Response APIs --- codebuild/common-linux.sh | 2 + codebuild/linux-clang3-x64.yml | 1 + codebuild/linux-clang6-x64.yml | 1 + codebuild/linux-gcc-4x-x64.yml | 1 + codebuild/linux-gcc-4x-x86.yml | 1 + codebuild/linux-gcc-5x-x64.yml | 1 + codebuild/linux-gcc-6x-x64.yml | 1 + codebuild/linux-gcc-7x-x64.yml | 1 + .../awssdk/crt/http/CrtHttpStreamHandler.java | 96 ++++ .../awssdk/crt/http/HttpConnection.java | 106 +++- .../amazon/awssdk/crt/http/HttpHeader.java | 53 ++ .../amazon/awssdk/crt/http/HttpRequest.java | 48 ++ .../amazon/awssdk/crt/http/HttpStream.java | 56 ++ src/native/crt.c | 122 +++- src/native/crt.h | 45 ++ src/native/http_connection.c | 37 +- src/native/http_connection.h | 32 ++ src/native/http_request_response.c | 537 ++++++++++++++++++ .../crt/test/HttpRequestResponseTest.java | 318 +++++++++++ 19 files changed, 1428 insertions(+), 31 deletions(-) create mode 100644 src/main/java/software/amazon/awssdk/crt/http/CrtHttpStreamHandler.java create mode 100644 src/main/java/software/amazon/awssdk/crt/http/HttpHeader.java create mode 100644 src/main/java/software/amazon/awssdk/crt/http/HttpRequest.java create mode 100644 src/main/java/software/amazon/awssdk/crt/http/HttpStream.java create mode 100644 src/native/http_connection.h create mode 100644 src/native/http_request_response.c create mode 100644 src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseTest.java diff --git a/codebuild/common-linux.sh b/codebuild/common-linux.sh index 887a10f3f..2a2a84399 100755 --- a/codebuild/common-linux.sh +++ b/codebuild/common-linux.sh @@ -9,4 +9,6 @@ ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "unit-test/endpoint" # build java package cd $CODEBUILD_SRC_DIR + +ulimit -c unlimited mvn -B test -DredirectTestOutputToFile=true -DreuseForks=false -Dendpoint=$ENDPOINT -Dcertificate=/tmp/certificate.pem -Dprivatekey=/tmp/privatekey.pem -Drootca=/tmp/AmazonRootCA1.pem diff --git a/codebuild/linux-clang3-x64.yml b/codebuild/linux-clang3-x64.yml index ce2f0d48e..4e3275ecf 100644 --- a/codebuild/linux-clang3-x64.yml +++ b/codebuild/linux-clang3-x64.yml @@ -21,3 +21,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-clang6-x64.yml b/codebuild/linux-clang6-x64.yml index 3a8e2dfc2..1c8431c7a 100644 --- a/codebuild/linux-clang6-x64.yml +++ b/codebuild/linux-clang6-x64.yml @@ -27,3 +27,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-gcc-4x-x64.yml b/codebuild/linux-gcc-4x-x64.yml index b028723d4..920cd551c 100644 --- a/codebuild/linux-gcc-4x-x64.yml +++ b/codebuild/linux-gcc-4x-x64.yml @@ -21,3 +21,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-gcc-4x-x86.yml b/codebuild/linux-gcc-4x-x86.yml index cde786264..32cd2670a 100644 --- a/codebuild/linux-gcc-4x-x86.yml +++ b/codebuild/linux-gcc-4x-x86.yml @@ -23,3 +23,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-gcc-5x-x64.yml b/codebuild/linux-gcc-5x-x64.yml index b117bc4ed..1848ed30f 100644 --- a/codebuild/linux-gcc-5x-x64.yml +++ b/codebuild/linux-gcc-5x-x64.yml @@ -22,3 +22,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-gcc-6x-x64.yml b/codebuild/linux-gcc-6x-x64.yml index a0253d73b..1fbfb6878 100644 --- a/codebuild/linux-gcc-6x-x64.yml +++ b/codebuild/linux-gcc-6x-x64.yml @@ -22,3 +22,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/codebuild/linux-gcc-7x-x64.yml b/codebuild/linux-gcc-7x-x64.yml index ada253aa0..a11864061 100644 --- a/codebuild/linux-gcc-7x-x64.yml +++ b/codebuild/linux-gcc-7x-x64.yml @@ -23,3 +23,4 @@ artifacts: files: - 'target/surefire-reports/**' - 'hs_err_pid*' + - 'core*' diff --git a/src/main/java/software/amazon/awssdk/crt/http/CrtHttpStreamHandler.java b/src/main/java/software/amazon/awssdk/crt/http/CrtHttpStreamHandler.java new file mode 100644 index 000000000..4cd94cf03 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/CrtHttpStreamHandler.java @@ -0,0 +1,96 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.crt.http; + +import java.nio.ByteBuffer; + +/** + * Interface that Native code knows how to call when handling Http Request/Responses + * + * Maps 1-1 to the Native Http API here: https://github.com/awslabs/aws-c-http/blob/master/include/aws/http/request_response.h + */ +public interface CrtHttpStreamHandler { + + /** + * Called from Native when new Http Headers have been received. + * Note that this function may be called multiple times as HTTP headers are received. + * + * @param stream The HttpStream object + * @param responseStatusCode The HTTP Response Status Code + * @param nextHeaders The headers received in the latest IO event. + */ + void onResponseHeaders(HttpStream stream, int responseStatusCode, HttpHeader[] nextHeaders); + + /** + * Called from Native once all HTTP Headers are processed. Will not be called if there are no Http Headers in the + * response. Guaranteed to be called exactly once if there is at least 1 Header. + * + * @param stream The HttpStream object + * @param hasBody True if the HTTP Response had a Body, false otherwise. + */ + default void onResponseHeadersDone(HttpStream stream, boolean hasBody) { + /* Optional Callback, do nothing by default */ + } + + /** + * Called when new Body bytes have been received. + * Note that this function may be called multiple times as bodyBytes are received. + * + * Do NOT keep a reference to this ByteBuffer past the lifetime of this function call. The CommonRuntime reserves + * the right to use DirectByteBuffers pointing to memory that only lives as long as the function call. + * + * Sliding Window: + * The Native HttpConnection EventLoop will keep sending data until the end of the sliding Window is reached. + * The user application is responsible for setting the initial Window size appropriately when creating the + * HttpConnection, and for incrementing the sliding window appropriately throughout the lifetime of the HttpStream. + * + * For more info, see: + * - https://en.wikipedia.org/wiki/Sliding_window_protocol + * + * @param bodyBytesIn The HTTP Body Bytes received in the last IO Event. The user MUST either copy all bytes from + * this Buffer, since there will not be another chance to read this data. + * @return The number of bytes to move the sliding window by. Repeatedly returning zero will eventually cause the + * sliding window to fill up and data to stop flowing until the user slides the window back open. + */ + default int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) { + /* Optional Callback, ignore incoming response body by default unless user wants to capture it. */ + return bodyBytesIn.remaining(); + } + + /** + * Called from Native when the Response has completed. + * @param stream + * @param errorCode + */ + void onResponseComplete(HttpStream stream, int errorCode); + + /** + * Called from Native when the Http Request has a Body (Eg PUT/POST requests). + * Note that this function may be called many times as Native sends the Request Body. + * + * Do NOT keep a reference to this ByteBuffer past the lifetime of this function call. The CommonRuntime reserves + * the right to use DirectByteBuffers pointing to memory that only lives as long as the function call. + * + * @param stream The HttpStream for this Request/Response Pair + * @param bodyBytesOut The Buffer to write the Request Body Bytes to. + * @return True if Request body is complete, false otherwise. + */ + default boolean sendRequestBody(HttpStream stream, ByteBuffer bodyBytesOut) { + /* Optional Callback, return empty request body by default unless user wants to return one. */ + return true; + } + +} \ No newline at end of file diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpConnection.java b/src/main/java/software/amazon/awssdk/crt/http/HttpConnection.java index e59f86284..f93d4a18d 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpConnection.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpConnection.java @@ -15,7 +15,6 @@ package software.amazon.awssdk.crt.http; -import software.amazon.awssdk.crt.AsyncCallback; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.ClientBootstrap; @@ -24,6 +23,7 @@ import java.net.URI; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static software.amazon.awssdk.crt.CRT.AWS_CRT_SUCCESS; @@ -40,10 +40,12 @@ public class HttpConnection extends CrtResource { private static final String HTTPS = "https"; private static final int DEFAULT_HTTP_PORT = 80; private static final int DEFAULT_HTTPS_PORT = 443; + private static final int DEFAULT_MAX_WINDOW_SIZE = Integer.MAX_VALUE; private final ClientBootstrap clientBootstrap; private final SocketOptions socketOptions; private final TlsContext tlsContext; + private final int windowSize; private final URI uri; private final int port; private final boolean useTls; @@ -62,7 +64,25 @@ public class HttpConnection extends CrtResource { */ public static CompletableFuture createConnection(URI uri, ClientBootstrap bootstrap, SocketOptions socketOptions, TlsContext tlsContext) throws CrtRuntimeException { - HttpConnection conn = new HttpConnection(uri, bootstrap, socketOptions, tlsContext); + HttpConnection conn = new HttpConnection(uri, bootstrap, socketOptions, tlsContext, DEFAULT_MAX_WINDOW_SIZE); + return conn.connect(); + } + + /** + * Creates a new CompletableFuture for a new HttpConnection. + * @param uri Must be non-null and contain a hostname + * @param bootstrap The ClientBootstrap to use for the Connection + * @param socketOptions The SocketOptions to use for the Connection + * @param tlsContext The TlsContext to use for the Connection + * @param windowSize The Initial Window size for requests made on this connection + * @return CompletableFuture indicating when the connection has completed + * @throws CrtRuntimeException if Native threw a CrtRuntimeException + */ + public static CompletableFuture createConnection(URI uri, ClientBootstrap bootstrap, + SocketOptions socketOptions, + TlsContext tlsContext, + int windowSize) throws CrtRuntimeException { + HttpConnection conn = new HttpConnection(uri, bootstrap, socketOptions, tlsContext, windowSize); return conn.connect(); } @@ -73,7 +93,7 @@ public static CompletableFuture createConnection(URI uri, Client * @param socketOptions The SocketOptions to use for the Connection * @param tlsContext The TlsContext to use for the Connection */ - private HttpConnection(URI uri, ClientBootstrap bootstrap, SocketOptions socketOptions, TlsContext tlsContext) { + private HttpConnection(URI uri, ClientBootstrap bootstrap, SocketOptions socketOptions, TlsContext tlsContext, int windowSize) { if (uri == null) { throw new IllegalArgumentException("URI must not be null"); } if (uri.getScheme() == null) { throw new IllegalArgumentException("URI does not have a Scheme"); } if (!HTTP.equals(uri.getScheme()) && !HTTPS.equals(uri.getScheme())) { throw new IllegalArgumentException("URI has unknown Scheme"); } @@ -81,6 +101,7 @@ private HttpConnection(URI uri, ClientBootstrap bootstrap, SocketOptions socketO if (bootstrap == null || bootstrap.isNull()) { throw new IllegalArgumentException("ClientBootstrap must not be null"); } if (socketOptions == null || socketOptions.isNull()) { throw new IllegalArgumentException("SocketOptions must not be null"); } if (HTTPS.equals(uri.getScheme()) && tlsContext == null) { throw new IllegalArgumentException("TlsContext must not be null if https is used"); } + if (windowSize <= 0) { throw new IllegalArgumentException("Window Size must be greater than zero.");} int port = uri.getPort(); @@ -100,6 +121,7 @@ private HttpConnection(URI uri, ClientBootstrap bootstrap, SocketOptions socketO this.clientBootstrap = bootstrap; this.socketOptions = socketOptions; this.tlsContext = tlsContext; + this.windowSize = windowSize; this.connectedFuture = new CompletableFuture<>(); this.shutdownFuture = new CompletableFuture<>(); } @@ -117,18 +139,81 @@ private CompletableFuture connect() throws CrtRuntimeException { clientBootstrap.native_ptr(), socketOptions.native_ptr(), useTls ? tlsContext.native_ptr() : 0, + windowSize, uri.getHost(), port)); return connectedFuture; } + /** + * Schedules an HttpRequest on the Native EventLoop for this HttpConnection. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native EventLoop + * @throws CrtRuntimeException + * @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the + * request/response, but must be closed by the user thread making this request when it's done. + */ + public HttpStream makeRequest(HttpRequest request, CrtHttpStreamHandler streamHandler) throws CrtRuntimeException { + if (isShutdownComplete() || isNull()) { + throw new IllegalStateException("HttpConnection has been shut down, can't make requests on it."); + } + + HttpStream stream = httpConnectionMakeRequest(native_ptr(), + request.getMethod(), + request.getEncodedPath(), + request.getHeaders(), + streamHandler); + + if (stream == null || stream.isNull()) { + throw new IllegalStateException("HttpStream is null"); + } + + return stream; + } + /** * Closes and frees this HttpConnection and any native sub-resources associated with this connection */ @Override public void close() { + if (didConnectSuccessfully() && !isShutdownComplete()) { + /** + * We have to wait for the connection to finish shutting down to avoid race conditions between + * shutdown tasks and memory release tasks. + * + * The httpConnectionShutdown() call schedules shutdown tasks on the Native EventLoop that may send + * HTTP/TLS/TCP shutdown messages to peers if necessary and will eventually cause internal connection + * memory to stop being accessed. + * + * The httpConnectionRelease() call will begin releasing internal connection memory. If the shutdown isn't + * complete before httpConnectionRelease(), it can lead to the shutdown tasks accessing memory that's been + * released, resulting in Segfaults. + */ + try { + // Give Shutdown 10 seconds to complete, otherwise throw a Timeout Exception + this.shutdown().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (!isNull()) { + try { + /** + * FIXME: The above shutdown().get() should be enough to avoid race conditions, but aws-c-http has a + * bug in the way it orders it's shutdown callbacks. Add an artificial sleep here to avoid Race + * Condition with the EventLoop when shutting down the TLS Connection. + * + * Tracking Issue: https://github.com/awslabs/aws-c-http/issues/66 + * TraceLog: https://gist.github.com/alexw91/e6205fd38ecc530a55b956c98ca189dc + */ + Thread.sleep(1000); + } catch (Exception e) { + throw new RuntimeException(e); + } + httpConnectionRelease(release()); } @@ -164,6 +249,15 @@ public CompletableFuture getShutdownFuture() { return shutdownFuture; } + private boolean didConnectSuccessfully() { + return connectedFuture.isDone() && !connectedFuture.isCompletedExceptionally(); + } + + + private boolean isShutdownComplete() { + return shutdownFuture.isDone(); + } + /** * Schedules a task on the Native EventLoop to shut down the current connection * @return When this future completes, the shutdown is complete @@ -188,10 +282,16 @@ private static native long httpConnectionNew(HttpConnection thisObj, long client_bootstrap, long socketOptions, long tlsContext, + int windowSize, String endpoint, int port) throws CrtRuntimeException; private static native void httpConnectionShutdown(long connection) throws CrtRuntimeException; private static native void httpConnectionRelease(long connection); + private static native HttpStream httpConnectionMakeRequest(long connection, + String method, + String uri, + HttpHeader[] headers, + CrtHttpStreamHandler crtHttpStreamHandler) throws CrtRuntimeException; } diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpHeader.java b/src/main/java/software/amazon/awssdk/crt/http/HttpHeader.java new file mode 100644 index 000000000..878a0881f --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpHeader.java @@ -0,0 +1,53 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.crt.http; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +public class HttpHeader { + private final static Charset UTF8 = StandardCharsets.UTF_8; + private byte[] name; /* Not final, Native will manually set name after calling empty Constructor. */ + private byte[] value; /* Not final, Native will manually set value after calling empty Constructor. */ + + /** Called by Native to create a new HttpHeader. This is so that Native doesn't have to worry about UTF8 + * encoding/decoding issues. The user thread will deal with them when they call getName() or getValue() **/ + private HttpHeader() {} + + public HttpHeader(String name, String value){ + this.name = name.getBytes(UTF8); + this.value = value.getBytes(UTF8); + } + + public String getName() { + if (name == null) { + return ""; + } + return new String(name, UTF8); + } + + public String getValue() { + if (value == null) { + return ""; + } + return new String(value, UTF8); + } + + @Override + public String toString() { + return getName() + ":" + getValue(); + } +} \ No newline at end of file diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpRequest.java b/src/main/java/software/amazon/awssdk/crt/http/HttpRequest.java new file mode 100644 index 000000000..6e099a5e8 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpRequest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.crt.http; + +/** + * Represents a single Client Request to be sent on a HTTP connection + */ +public class HttpRequest { + private final String method; + private final String encodedPath; + private final HttpHeader[] headers; + + public HttpRequest(String method, String encodedPath) { + this(method, encodedPath, new HttpHeader[]{}); + } + + public HttpRequest(String method, String encodedPath, HttpHeader[] headers) { + if (headers == null) { throw new IllegalArgumentException("Headers can be empty, but can't be null"); } + this.method = method; + this.encodedPath = encodedPath; + this.headers = headers; + } + + public String getMethod() { + return method; + } + + public String getEncodedPath() { + return encodedPath; + } + + public HttpHeader[] getHeaders() { + return headers; + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java new file mode 100644 index 000000000..b33c2a4d0 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java @@ -0,0 +1,56 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.crt.http; + +import software.amazon.awssdk.crt.CrtResource; + +/** + * An HttpStream represents a single Http Request/Response pair within a HttpConnection, and wraps the native resources + * from the aws-c-http library. + * + * Can be used to update the Window size, or to abort the stream early in the middle of sending/receiving Http Bodies. + */ +public class HttpStream extends CrtResource { + + /* Native code can call this constructor */ + protected HttpStream(long ptr) { + acquire(ptr); + } + + @Override + public void close() { + httpStreamRelease(release()); + } + + /** + * Opens the Sliding Read/Write Window by the number of bytes passed as an argument for this HttpStream. + * + * This function should only be called if the user application previously returned less than the length of the input + * ByteBuffer from a onResponseBody() call in a CrtHttpStreamHandler, and should be <= to the total number of + * un-acked bytes. + * + * @param windowSize How many bytes to increment the sliding window by. + */ + public void incrementWindow(int windowSize) { + if (windowSize < 0) { + throw new IllegalArgumentException("windowSize must be >= 0. Actual value: " + windowSize); + } + httpStreamIncrementWindow(native_ptr(), windowSize); + } + + private static native void httpStreamRelease(long http_stream); + private static native void httpStreamIncrementWindow(long http_stream, int window_size); +} \ No newline at end of file diff --git a/src/native/crt.c b/src/native/crt.c index 371bf0bcb..9609a3ba6 100644 --- a/src/native/crt.c +++ b/src/native/crt.c @@ -44,6 +44,120 @@ void aws_jni_throw_runtime_exception(JNIEnv *env, const char *msg, ...) { (*env)->ThrowNew(env, runtime_exception, exception); } +/* methods of Java's ByteBuffer Class */ +static struct { + jclass cls; + jmethodID get_capacity; /* The total number of bytes in the internal byte array. Stays constant. */ + jmethodID get_limit; /* The max allowed read/write position of the Buffer. limit must be <= capacity. */ + jmethodID set_limit; + jmethodID get_position; /* The current read/write position of the Buffer. position must be <= limit */ + jmethodID set_position; + jmethodID get_remaining; /* Remaining number of bytes before the limit is reached. Equal to (limit - position). */ + jmethodID wrap; /* Creates a new ByteBuffer Object from a Java byte[]. */ +} s_java_byte_buffer = {0}; + +void s_cache_java_byte_buffer(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "java/nio/ByteBuffer"); + assert(cls); + + // FindClass() returns local JNI references that become eligible for GC once this native method returns to Java. + // Call NewGlobalRef() so that this class reference doesn't get Garbage collected. + s_java_byte_buffer.cls = (*env)->NewGlobalRef(env, cls); + + s_java_byte_buffer.get_capacity = (*env)->GetMethodID(env, cls, "capacity", "()I"); + assert(s_java_byte_buffer.get_capacity); + + s_java_byte_buffer.get_limit = (*env)->GetMethodID(env, cls, "limit", "()I"); + assert(s_java_byte_buffer.get_limit); + + s_java_byte_buffer.set_limit = (*env)->GetMethodID(env, cls, "limit", "(I)Ljava/nio/Buffer;"); + assert(s_java_byte_buffer.set_limit); + + s_java_byte_buffer.get_position = (*env)->GetMethodID(env, cls, "position", "()I"); + assert(s_java_byte_buffer.get_position); + + s_java_byte_buffer.set_position = (*env)->GetMethodID(env, cls, "position", "(I)Ljava/nio/Buffer;"); + assert(s_java_byte_buffer.set_position); + + s_java_byte_buffer.get_remaining = (*env)->GetMethodID(env, cls, "remaining", "()I"); + assert(s_java_byte_buffer.get_remaining); + + s_java_byte_buffer.wrap = (*env)->GetStaticMethodID(env, cls, "wrap", "([B)Ljava/nio/ByteBuffer;"); + assert(s_java_byte_buffer.wrap); +} + +jbyteArray aws_java_byte_array_new(JNIEnv *env, size_t size) { + jbyteArray jArray = (*env)->NewByteArray(env, (jsize)size); + return jArray; +} + +void aws_copy_java_byte_array_to_native_array(JNIEnv *env, jbyteArray src, uint8_t *dst, size_t amount) { + (*env)->GetByteArrayRegion(env, src, 0, amount, (jbyte *)dst); +} + +void aws_copy_native_array_to_java_byte_array(JNIEnv *env, jbyteArray dst, uint8_t *src, size_t amount) { + (*env)->SetByteArrayRegion(env, dst, 0, amount, (jbyte *)src); +} + +jobject aws_java_byte_array_to_java_byte_buffer(JNIEnv *env, jbyteArray jArray) { + jobject jByteBuffer = (*env)->CallStaticObjectMethod(env, s_java_byte_buffer.cls, s_java_byte_buffer.wrap, jArray); + return jByteBuffer; +} + +/** + * Converts a Java byte[] to a Native aws_byte_cursor + */ +struct aws_byte_cursor aws_jni_byte_cursor_from_jbyteArray(JNIEnv *env, jbyteArray array) { + + jboolean isCopy; + jbyte *data = (*env)->GetByteArrayElements(env, array, &isCopy); + jsize len = (*env)->GetArrayLength(env, array); + return aws_byte_cursor_from_array((const uint8_t *)data, (size_t)len); +} + +/** + * Converts a Native aws_byte_cursor to a Java byte[] + */ +jbyteArray aws_jni_byte_array_from_cursor(JNIEnv *env, const struct aws_byte_cursor *native_data) { + jbyteArray jArray = aws_java_byte_array_new(env, native_data->len); + aws_copy_native_array_to_java_byte_array(env, jArray, native_data->ptr, native_data->len); + return jArray; +} + +/** + * Converts a Native aws_byte_cursor to a Java ByteBuffer Object + */ +jobject aws_jni_byte_buffer_copy_from_cursor(JNIEnv *env, const struct aws_byte_cursor *native_data) { + assert(env); + jbyteArray jArray = aws_jni_byte_array_from_cursor(env, native_data); + jobject jByteBuffer = aws_java_byte_array_to_java_byte_buffer(env, jArray); + + return jByteBuffer; +} + +/** + * Converts a Native aws_byte_cursor to a Java DirectByteBuffer + */ +jobject aws_jni_direct_byte_buffer_from_byte_buf(JNIEnv *env, const struct aws_byte_buf *dst) { + jobject jByteBuf = (*env)->NewDirectByteBuffer(env, (void *)dst->buffer, (jlong)dst->capacity); + + // Set the Buffer Limit (the max allowed element to read/write) + (*env)->CallObjectMethod(env, jByteBuf, s_java_byte_buffer.set_limit, (jint)dst->capacity); + + // Set the Buffer Position (the next element to read/write) + (*env)->CallObjectMethod(env, jByteBuf, s_java_byte_buffer.set_position, (jint)dst->len); + + return jByteBuf; +} + +/** + * Returns the read/write position of a Java ByteBuffer + */ +int aws_jni_byte_buffer_get_position(JNIEnv *env, jobject java_byte_buffer) { + jint position = (*env)->CallIntMethod(env, java_byte_buffer, s_java_byte_buffer.get_position); + return (int)position; +} + struct aws_byte_cursor aws_jni_byte_cursor_from_jstring(JNIEnv *env, jstring str) { return aws_byte_cursor_from_array( (*env)->GetStringUTFChars(env, str, NULL), (size_t)(*env)->GetStringUTFLength(env, str)); @@ -87,11 +201,18 @@ static void s_cache_jni_classes(JNIEnv *env) { extern void s_cache_message_handler(JNIEnv *); extern void s_cache_mqtt_exception(JNIEnv *); extern void s_cache_http_connection(JNIEnv *); + extern void s_cache_crt_http_stream_handler(JNIEnv *); + extern void s_cache_http_header(JNIEnv *); + extern void s_cache_http_stream(JNIEnv *); + s_cache_java_byte_buffer(env); s_cache_mqtt_connection(env); s_cache_async_callback(env); s_cache_message_handler(env); s_cache_mqtt_exception(env); s_cache_http_connection(env); + s_cache_crt_http_stream_handler(env); + s_cache_http_header(env); + s_cache_http_stream(env); } #if defined(_MSC_VER) # pragma warning(pop) @@ -122,7 +243,6 @@ void JNICALL Java_software_amazon_awssdk_crt_CRT_awsCrtInit(JNIEnv *env, jclass // aws_jni_throw_runtime_exception(env, "Failed to initialize logging"); // return; // } - // aws_logger_set(&s_logger); s_cache_jni_classes(env); diff --git a/src/native/crt.h b/src/native/crt.h index 7e0e7995c..6c4254003 100644 --- a/src/native/crt.h +++ b/src/native/crt.h @@ -30,6 +30,51 @@ struct aws_allocator *aws_jni_get_allocator(); ******************************************************************************/ void aws_jni_throw_runtime_exception(JNIEnv *env, const char *msg, ...); +/******************************************************************************* + * aws_java_byte_array_new - Creates a new Java byte[] + ******************************************************************************/ +jbyteArray aws_java_byte_array_new(JNIEnv *env, size_t size); + +/******************************************************************************* + * aws_copy_java_byte_array_to_native_array - Copies from a Java byte[] to a Native byte array + ******************************************************************************/ +void aws_copy_java_byte_array_to_native_array(JNIEnv *env, jbyteArray src, uint8_t *dst, size_t amount); + +/******************************************************************************* + * aws_copy_java_byte_array_to_native_array - Copies from a Native byte array to a Java byte[] + ******************************************************************************/ +void aws_copy_native_array_to_java_byte_array(JNIEnv *env, jbyteArray dst, uint8_t *src, size_t amount); + +/******************************************************************************* + * aws_java_byte_array_to_java_byte_buffer - Creates a Java ByteBuffer Object from a Java byte[] + ******************************************************************************/ +jobject aws_java_byte_array_to_java_byte_buffer(JNIEnv *env, jbyteArray jArray); + +/******************************************************************************* + * aws_jni_byte_cursor_from_jbyteArray - Creates an aws_byte_cursor from a jbyteArray. + ******************************************************************************/ +struct aws_byte_cursor aws_jni_byte_cursor_from_jbyteArray(JNIEnv *env, jbyteArray array); + +/******************************************************************************* + * aws_jni_byte_cursor_from_jbyteArray - Creates an aws_byte_cursor from a jbyteArray. + ******************************************************************************/ +jbyteArray aws_jni_byte_array_from_cursor(JNIEnv *env, const struct aws_byte_cursor *native_data); + +/******************************************************************************* + * jni_byte_buffer_copy_from_cursor - Creates a Java ByteBuffer from a native aws_byte_cursor + ******************************************************************************/ +jobject aws_jni_byte_buffer_copy_from_cursor(JNIEnv *env, const struct aws_byte_cursor *native_data); + +/******************************************************************************* + * aws_jni_direct_byte_buffer_from_byte_buf - Creates a Java DirectByteBuffer from a native aws_byte_buf + ******************************************************************************/ +jobject aws_jni_direct_byte_buffer_from_byte_buf(JNIEnv *env, const struct aws_byte_buf *dst); + +/******************************************************************************* + * aws_jni_byte_buffer_get_position - Gets the Read/Write Position of a ByteBuffer + ******************************************************************************/ +int aws_jni_byte_buffer_get_position(JNIEnv *env, jobject java_byte_buffer); + /******************************************************************************* * aws_jni_byte_cursor_from_jstring - Creates an aws_byte_cursor from the UTF-8 * characters extracted from the supplied jstring. The string value is null-terminated. diff --git a/src/native/http_connection.c b/src/native/http_connection.c index ab6254654..91178c2b5 100644 --- a/src/native/http_connection.c +++ b/src/native/http_connection.c @@ -35,25 +35,12 @@ #include #include +#include "http_connection.h" + /******************************************************************************* * JNI class field/method maps ******************************************************************************/ -/* methods of HttpConnection.AsyncCallback */ -static struct { - jmethodID on_success; - jmethodID on_failure; -} s_async_callback = {0}; - -void s_cache_http_async_callback(JNIEnv *env) { - jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/AsyncCallback"); - assert(cls); - s_async_callback.on_success = (*env)->GetMethodID(env, cls, "onSuccess", "()V"); - assert(s_async_callback.on_success); - s_async_callback.on_failure = (*env)->GetMethodID(env, cls, "onFailure", "(Ljava/lang/Throwable;)V"); - assert(s_async_callback.on_failure); -} - /* methods of HttpConnection */ static struct { jmethodID on_connection_complete; @@ -70,18 +57,6 @@ void s_cache_http_connection(JNIEnv *env) { assert(s_http_connection.on_connection_shutdown); } -/******************************************************************************* - * http_jni_connection - represents an aws_http_connection to Java - ******************************************************************************/ -struct http_jni_connection { - struct aws_http_connection *native_http_conn; - struct aws_socket_options *socket_options; - struct aws_tls_connection_options *tls_options; - - JavaVM *jvm; - jobject java_http_conn; /* The Java HttpConnection instance */ -}; - /* on 32-bit platforms, casting pointers to longs throws a warning we don't need */ #if UINTPTR_MAX == 0xffffffff # if defined(_MSC_VER) @@ -132,6 +107,7 @@ JNIEXPORT long JNICALL Java_software_amazon_awssdk_crt_http_HttpConnection_httpC jlong jni_client_bootstrap, jlong jni_socket_options, jlong jni_tls_ctx, + jint jni_window_size, jstring jni_endpoint, jint jni_port) { @@ -157,6 +133,11 @@ JNIEXPORT long JNICALL Java_software_amazon_awssdk_crt_http_HttpConnection_httpC return (jlong)NULL; } + if (jni_window_size <= 0) { + aws_jni_throw_runtime_exception(env, "Window Size must be > 0"); + return (jlong)NULL; + } + uint16_t port = (uint16_t)jni_port; int use_tls = (jni_tls_ctx != 0); @@ -175,6 +156,7 @@ JNIEXPORT long JNICALL Java_software_amazon_awssdk_crt_http_HttpConnection_httpC goto error_cleanup; } + AWS_ZERO_STRUCT(*http_jni_conn); // Create a new reference to the HttpConnection Object. http_jni_conn->java_http_conn = (*env)->NewGlobalRef(env, http_conn_jobject); @@ -191,6 +173,7 @@ JNIEXPORT long JNICALL Java_software_amazon_awssdk_crt_http_HttpConnection_httpC http_options.port = port; http_options.socket_options = socket_options; http_options.tls_options = NULL; + http_options.initial_window_size = (size_t)jni_window_size; http_options.user_data = http_jni_conn; http_options.on_setup = s_on_http_conn_setup; http_options.on_shutdown = s_on_http_conn_shutdown; diff --git a/src/native/http_connection.h b/src/native/http_connection.h new file mode 100644 index 000000000..82531056d --- /dev/null +++ b/src/native/http_connection.h @@ -0,0 +1,32 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include + +#include +#include + +/******************************************************************************* + * http_jni_connection - represents an aws_http_connection to Java + ******************************************************************************/ +struct http_jni_connection { + struct aws_http_connection *native_http_conn; + struct aws_socket_options *socket_options; + struct aws_tls_connection_options *tls_options; + + JavaVM *jvm; + jobject java_http_conn; /* The Java HttpConnection instance */ +}; diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c new file mode 100644 index 000000000..c4bb3fd1d --- /dev/null +++ b/src/native/http_request_response.c @@ -0,0 +1,537 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "http_connection.h" + +/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */ +#if UINTPTR_MAX == 0xffffffff +# if defined(_MSC_VER) +# pragma warning(push) +# pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */ +# else +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wpointer-to-int-cast" +# pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +# endif +#endif + +static struct { + jclass header_class; + jmethodID constructor; + jfieldID name; + jfieldID value; +} s_http_header; + +void s_cache_http_header(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpHeader"); + assert(cls); + s_http_header.header_class = cls; + + s_http_header.constructor = (*env)->GetMethodID(env, cls, "", "()V"); + assert(s_http_header.constructor); + + s_http_header.name = (*env)->GetFieldID(env, cls, "name", "[B"); + assert(s_http_header.name); + + s_http_header.value = (*env)->GetFieldID(env, cls, "value", "[B"); + assert(s_http_header.value); + + // FindClass() returns local JNI references that become eligible for GC once this native method returns to Java. + // Call NewGlobalRef() so that this class reference doesn't get Garbage collected. + s_http_header.header_class = (*env)->NewGlobalRef(env, s_http_header.header_class); +} + +static struct { + jclass stream_class; + jmethodID constructor; + jmethodID close; +} s_http_stream_handler; + +void s_cache_http_stream(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStream"); + assert(cls); + s_http_stream_handler.stream_class = cls; + + // FindClass() returns local JNI references that become eligible for GC once this native method returns to Java. + // Call NewGlobalRef() so that this class reference doesn't get Garbage collected. + s_http_stream_handler.stream_class = (*env)->NewGlobalRef(env, s_http_stream_handler.stream_class); + + s_http_stream_handler.constructor = (*env)->GetMethodID(env, cls, "", "(J)V"); + assert(s_http_stream_handler.constructor); + + s_http_stream_handler.close = (*env)->GetMethodID(env, cls, "close", "()V"); + assert(s_http_stream_handler.close); +} + +static jobject s_java_http_stream_from_native_new(JNIEnv *env, struct aws_http_stream *stream) { + jlong jni_native_ptr = (jlong)stream; + assert(jni_native_ptr); + jobject jHttpStream = + (*env)->NewObject(env, s_http_stream_handler.stream_class, s_http_stream_handler.constructor, jni_native_ptr); + + if ((*env)->ExceptionCheck(env) || jHttpStream == NULL) { + // Close the Connection if the Java Callback throws an Exception + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return NULL; + } + + return jHttpStream; +} + +static void s_java_http_stream_from_native_delete(JNIEnv *env, jobject jHttpStream) { + // Delete our reference to the HttpStream Object from the JVM. + (*env)->DeleteGlobalRef(env, jHttpStream); +} + +/******************************************************************************* + * http_stream_callback_data - carries around data needed by the various http request + * callbacks. + ******************************************************************************/ +struct http_stream_callback_data { + struct aws_mutex lock; + struct http_jni_connection *connection; + jobject java_crt_http_callback_handler; + jobject java_http_stream; +}; + +static struct http_stream_callback_data *http_stream_callback_alloc( + struct http_jni_connection *connection, + jobject java_callback_handler) { + + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct http_stream_callback_data *callback = aws_mem_acquire(allocator, sizeof(struct http_stream_callback_data)); + AWS_ZERO_STRUCT(*callback); + + if (!callback) { + /* caller will throw when they get a null */ + return NULL; + } + + JNIEnv *env = aws_jni_get_thread_env(connection->jvm); + callback->connection = connection; + + // We need to call NewGlobalRef() on jobjects that we want to last after this native method returns to Java. + // Otherwise Java's GC may free the jobject when Native still has a reference to it. + callback->java_crt_http_callback_handler = (*env)->NewGlobalRef(env, java_callback_handler); + + return callback; +} + +static void http_stream_callback_release(JNIEnv *env, struct http_stream_callback_data *callback) { + + s_java_http_stream_from_native_delete(env, callback->java_http_stream); + // Delete our reference to the Java JniHttpCallbackHandler Object from the JVM. + (*env)->DeleteGlobalRef(env, callback->java_crt_http_callback_handler); + + struct aws_allocator *allocator = aws_jni_get_allocator(); + aws_mem_release(allocator, callback); +} + +/* CrtHttpStreamHandler Java Methods */ +static struct { + jmethodID onResponseHeaders; + jmethodID onResponseHeadersDone; + jmethodID onResponseBody; + jmethodID onResponseComplete; + jmethodID sendOutgoingBody; +} s_crt_http_stream_handler; + +void s_cache_crt_http_stream_handler(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/CrtHttpStreamHandler"); + assert(cls); + s_crt_http_stream_handler.onResponseHeaders = (*env)->GetMethodID( + env, + cls, + "onResponseHeaders", + "(Lsoftware/amazon/awssdk/crt/http/HttpStream;I[Lsoftware/amazon/awssdk/crt/http/HttpHeader;)V"); + + assert(s_crt_http_stream_handler.onResponseHeaders); + + s_crt_http_stream_handler.onResponseHeadersDone = + (*env)->GetMethodID(env, cls, "onResponseHeadersDone", "(Lsoftware/amazon/awssdk/crt/http/HttpStream;Z)V"); + assert(s_crt_http_stream_handler.onResponseHeadersDone); + + s_crt_http_stream_handler.onResponseBody = (*env)->GetMethodID( + env, cls, "onResponseBody", "(Lsoftware/amazon/awssdk/crt/http/HttpStream;Ljava/nio/ByteBuffer;)I"); + assert(s_crt_http_stream_handler.onResponseBody); + + s_crt_http_stream_handler.onResponseComplete = + (*env)->GetMethodID(env, cls, "onResponseComplete", "(Lsoftware/amazon/awssdk/crt/http/HttpStream;I)V"); + assert(s_crt_http_stream_handler.onResponseComplete); + + s_crt_http_stream_handler.sendOutgoingBody = (*env)->GetMethodID( + env, cls, "sendRequestBody", "(Lsoftware/amazon/awssdk/crt/http/HttpStream;Ljava/nio/ByteBuffer;)Z"); + assert(s_crt_http_stream_handler.sendOutgoingBody); +} + +static jobjectArray s_java_headers_array_from_native( + struct http_stream_callback_data *callback, + const struct aws_http_header *header_array, + size_t num_headers) { + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + + assert(s_http_header.header_class); + assert(s_http_header.constructor); + assert(s_http_header.name); + assert(s_http_header.value); + + jobjectArray jArray = (*env)->NewObjectArray(env, (jsize)num_headers, s_http_header.header_class, NULL); + + for (int i = 0; i < num_headers; i++) { + jobject jHeader = (*env)->NewObject(env, s_http_header.header_class, s_http_header.constructor); + + jbyteArray actual_name = aws_jni_byte_array_from_cursor(env, &(header_array[i].name)); + jbyteArray actual_value = aws_jni_byte_array_from_cursor(env, &(header_array[i].value)); + + // Overwrite with actual values + (*env)->SetObjectField(env, jHeader, s_http_header.name, actual_name); + (*env)->SetObjectField(env, jHeader, s_http_header.value, actual_value); + (*env)->SetObjectArrayElement(env, jArray, i, jHeader); + } + + return jArray; +} + +static void s_on_incoming_headers_fn( + struct aws_http_stream *stream, + const struct aws_http_header *header_array, + size_t num_headers, + void *user_data) { + + struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + // Other threads might edit the callback struct, so ensure that we gain a lock on it + aws_mutex_lock(&callback->lock); + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + jobjectArray jHeaders = s_java_headers_array_from_native(user_data, header_array, num_headers); + + int resp_status = -1; + int err_code = aws_http_stream_get_incoming_response_status(stream, &resp_status); + + if (err_code != AWS_OP_SUCCESS) { + // Close the connection if we can't get the response status + aws_mutex_unlock(&callback->lock); + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return; + } + + (*env)->CallVoidMethod( + env, + callback->java_crt_http_callback_handler, + s_crt_http_stream_handler.onResponseHeaders, + callback->java_http_stream, + resp_status, + jHeaders); + + aws_mutex_unlock(&callback->lock); + + if ((*env)->ExceptionCheck(env)) { + // Close the Connection if the Java Callback throws an Exception + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return; + } +} + +static void s_on_incoming_header_block_done_fn(struct aws_http_stream *stream, bool has_body, void *user_data) { + struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + + // Other threads might edit the callback struct, so ensure that we gain a lock on it + aws_mutex_lock(&callback->lock); + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + + jboolean jHasBody = has_body; + (*env)->CallVoidMethod( + env, + callback->java_crt_http_callback_handler, + s_crt_http_stream_handler.onResponseHeadersDone, + callback->java_http_stream, + jHasBody); + + aws_mutex_unlock(&callback->lock); + + if ((*env)->ExceptionCheck(env)) { + // Close the Connection if the Java Callback throws an Exception + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return; + } +} + +static void s_on_incoming_body_fn( + struct aws_http_stream *stream, + const struct aws_byte_cursor *data, + /* NOLINTNEXTLINE(readability-non-const-parameter) */ + size_t *out_window_update_size, + void *user_data) { + + struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + + // Other threads might edit the callback struct, so ensure that we gain a lock on it + aws_mutex_lock(&callback->lock); + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + + jobject jByteBuffer = aws_jni_byte_buffer_copy_from_cursor(env, data); + jint window_increment = (*env)->CallIntMethod( + env, + callback->java_crt_http_callback_handler, + s_crt_http_stream_handler.onResponseBody, + callback->java_http_stream, + jByteBuffer); + + aws_mutex_unlock(&callback->lock); + + if (window_increment < 0 || *out_window_update_size < window_increment) { + aws_jni_throw_runtime_exception(env, "WindowUpdate is OutOfBounds."); + return; + } + + if ((*env)->ExceptionCheck(env)) { + // Close the Connection if the Java Callback throws an Exception + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return; + } + + // We can check the ByteBuffer read position to verify that the userThread actually read all the data they claimed + // to be able to read. + int amt_read = aws_jni_byte_buffer_get_position(env, jByteBuffer); + (void)amt_read; + assert(amt_read == data->len); + + *out_window_update_size = (size_t)window_increment; +} + +static void s_on_stream_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data) { + + struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + + // Other threads might edit the callback struct, so ensure that we gain a lock on it + aws_mutex_lock(&callback->lock); + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + + jint jErrorCode = error_code; + (*env)->CallVoidMethod( + env, + callback->java_crt_http_callback_handler, + s_crt_http_stream_handler.onResponseComplete, + callback->java_http_stream, + jErrorCode); + + aws_mutex_unlock(&callback->lock); + + if ((*env)->ExceptionCheck(env)) { + // Close the Connection if the Java Callback throws an Exception + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return; + } + + http_stream_callback_release(env, callback); +} + +enum aws_http_outgoing_body_state s_stream_outgoing_body_fn( + struct aws_http_stream *stream, + struct aws_byte_buf *dst, + void *user_data) { + + struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + + // Other threads might edit the callback struct, so ensure that we gain a lock on it + aws_mutex_lock(&callback->lock); + + JNIEnv *env = aws_jni_get_thread_env(callback->connection->jvm); + + uint8_t *out = &(dst->buffer[dst->len]); + size_t out_remaining = dst->capacity - dst->len; + + jbyteArray jByteArray = aws_java_byte_array_new(env, out_remaining); + jobject jByteBuffer = aws_java_byte_array_to_java_byte_buffer(env, jByteArray); + + jByteBuffer = (*env)->NewGlobalRef(env, jByteBuffer); + + jboolean isDone = (*env)->CallBooleanMethod( + env, + callback->java_crt_http_callback_handler, + s_crt_http_stream_handler.sendOutgoingBody, + callback->java_http_stream, + jByteBuffer); + + aws_mutex_unlock(&callback->lock); + + if ((*env)->ExceptionCheck(env)) { + // Close the Connection if the Java Callback throws an Exception + (*env)->DeleteGlobalRef(env, jByteBuffer); + aws_http_connection_close(aws_http_stream_get_connection(stream)); + return AWS_HTTP_OUTGOING_BODY_IN_PROGRESS; + } + + int amt_written = aws_jni_byte_buffer_get_position(env, jByteBuffer); + assert(amt_written <= out_remaining); + + aws_copy_java_byte_array_to_native_array(env, jByteArray, out, amt_written); + dst->len += amt_written; + + (*env)->DeleteGlobalRef(env, jByteBuffer); + + if (isDone) { + return AWS_HTTP_OUTGOING_BODY_DONE; + } + + return AWS_HTTP_OUTGOING_BODY_IN_PROGRESS; +} + +JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_HttpConnection_httpConnectionMakeRequest( + JNIEnv *env, + jclass jni_class, + jlong jni_connection, + jstring jni_method, + jstring jni_uri, + jobjectArray jni_headers, + jobject jni_crt_http_callback_handler) { + + struct http_jni_connection *http_jni_conn = (struct http_jni_connection *)jni_connection; + + if (!http_jni_conn) { + aws_jni_throw_runtime_exception(env, "HttpConnection.ExecuteRequest: Invalid jni_connection"); + return (jobject)NULL; + } + + if (!jni_crt_http_callback_handler) { + aws_jni_throw_runtime_exception(env, "HttpConnection.ExecuteRequest: Invalid jni_callback_handler"); + return (jobject)NULL; + } + + struct http_stream_callback_data *callback_data = + http_stream_callback_alloc(http_jni_conn, jni_crt_http_callback_handler); + + if (!callback_data) { + aws_jni_throw_runtime_exception( + env, "HttpConnection.ExecuteRequest: Unable to allocate http_request_jni_async_callback"); + return (jobject)NULL; + } + + // There's a Data Race between this thread writing to callback_data->java_http_stream and the EventLoop thread + // reading callback_data->java_http_stream when calling the callbacks, add a lock so that both threads see a + // consistent state. + aws_mutex_init(&callback_data->lock); + aws_mutex_lock(&callback_data->lock); + + struct aws_byte_cursor method = aws_jni_byte_cursor_from_jstring(env, jni_method); + struct aws_byte_cursor uri = aws_jni_byte_cursor_from_jstring(env, jni_uri); + jsize num_headers = (*env)->GetArrayLength(env, jni_headers); + + struct aws_http_header headers[num_headers]; + AWS_ZERO_ARRAY(headers); + + assert(s_http_header.name); + assert(s_http_header.value); + + for (int i = 0; i < num_headers; i++) { + jobject jHeader = (*env)->GetObjectArrayElement(env, jni_headers, i); + jbyteArray jname = (*env)->GetObjectField(env, jHeader, s_http_header.name); + jbyteArray jvalue = (*env)->GetObjectField(env, jHeader, s_http_header.value); + + headers[i].name = aws_jni_byte_cursor_from_jbyteArray(env, jname); + headers[i].value = aws_jni_byte_cursor_from_jbyteArray(env, jvalue); + } + + struct aws_http_request_options request_options = AWS_HTTP_REQUEST_OPTIONS_INIT; + request_options.client_connection = http_jni_conn->native_http_conn; + request_options.method = method; + request_options.uri = uri; + request_options.header_array = headers; + request_options.num_headers = num_headers; + + // Set Callbacks + request_options.on_response_headers = s_on_incoming_headers_fn; + request_options.on_response_header_block_done = s_on_incoming_header_block_done_fn; + request_options.on_response_body = s_on_incoming_body_fn; + request_options.stream_outgoing_body = s_stream_outgoing_body_fn; + request_options.on_complete = s_on_stream_complete_fn; + request_options.user_data = callback_data; + + // This call schedules tasks on the Native Event loop thread to begin sending HttpRequest and receive the response. + struct aws_http_stream *req = aws_http_stream_new_client_request(&request_options); + + if (req == NULL) { + aws_jni_throw_runtime_exception(env, "HttpConnection.ExecuteRequest: Unable to Execute Request"); + return (jobject)NULL; + } + + jobject jHttpStream = s_java_http_stream_from_native_new(env, req); + + // Call NewGlobalRef() so that jHttpStream reference doesn't get Garbage collected can can be used from callbacks. + jHttpStream = (*env)->NewGlobalRef(env, jHttpStream); + + callback_data->java_http_stream = jHttpStream; + + // Now that callback_data->java_http_stream has been written, the EventLoop thread may begin using this callback. + aws_mutex_unlock(&callback_data->lock); + + return jHttpStream; +} + +JNIEXPORT void JNICALL + Java_software_amazon_awssdk_crt_http_HttpStream_httpStreamRelease(JNIEnv *env, jclass jni_class, jlong jni_stream) { + + struct aws_http_stream *stream = (struct aws_http_stream *)jni_stream; + + if (stream == NULL) { + aws_jni_throw_runtime_exception(env, "HttpStream is null."); + return; + } + + aws_http_stream_release(stream); +} + +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStreamIncrementWindow( + JNIEnv *env, + jclass jni_class, + jlong jni_stream, + jint window_update) { + + struct aws_http_stream *stream = (struct aws_http_stream *)jni_stream; + + if (stream == NULL) { + aws_jni_throw_runtime_exception(env, "HttpStream is null."); + return; + } + + if (window_update < 0) { + aws_jni_throw_runtime_exception(env, "Window Update is < 0"); + return; + } + + aws_http_stream_update_window(stream, window_update); +} + +#if UINTPTR_MAX == 0xffffffff +# if defined(_MSC_VER) +# pragma warning(pop) +# else +# pragma GCC diagnostic pop +# endif +#endif diff --git a/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseTest.java b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseTest.java new file mode 100644 index 000000000..ada4c2b24 --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseTest.java @@ -0,0 +1,318 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.crt.test; + +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.http.CrtHttpStreamHandler; +import software.amazon.awssdk.crt.http.HttpConnection; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpStream; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class HttpRequestResponseTest { + private final static Charset UTF8 = StandardCharsets.UTF_8; + private final String EMPTY_BODY = ""; + private final static String TEST_DOC_LINE = "This is a sample to prove that http downloads and uploads work. It doesn't really matter what's in here, we mainly just need to verify the downloads and uploads work."; + private final static int TEST_DOC_NUM_LINES = 86401; + private final static String TEST_DOC_SHA256 = "C7FDB5314B9742467B16BD5EA2F8012190B5E2C44A005F7984F89AAB58219534"; + + private class TestHttpResponse { + int statusCode = -1; + boolean hasBody = false; + List headers = new ArrayList<>(); + ByteBuffer bodyBuffer = ByteBuffer.wrap(new byte[16*1024*1024]); // Allow up to 16 MB Responses + int onCompleteErrorCode = -1; + + public String getBody() { + bodyBuffer.flip(); + return UTF8.decode(bodyBuffer).toString(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Status: " + statusCode); + int i = 0; + for (HttpHeader h: headers) { + builder.append("\nHeader[" + i + "]: " + h.toString()); + } + + builder.append("\nBody:\n"); + builder.append(getBody()); + + return builder.toString(); + } + } + + private void transferData(ByteBuffer in, ByteBuffer out) { + int amtToTransfer = Math.min(in.remaining(), out.remaining()); + + if (amtToTransfer > 0) { + out.put(in.array(), in.arrayOffset() + in.position(), amtToTransfer); + in.position(in.position() + amtToTransfer); + } + } + + public static String byteArrayToHex(byte[] input) { + StringBuilder output = new StringBuilder(input.length * 2); + for(byte b: input) + output.append(String.format("%02X", b)); + return output.toString(); + } + + private String calculateBodyHash(ByteBuffer bodyBuffer) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.update(bodyBuffer); + return byteArrayToHex(digest.digest()); + } + + public TestHttpResponse getResponse(URI uri, HttpRequest request, String reqBody) throws Exception { + boolean actuallyConnected = false; + + ClientBootstrap bootstrap = new ClientBootstrap(1); + SocketOptions sockOpts = new SocketOptions(); + TlsContext tlsContext = new TlsContext(); + + final ByteBuffer bodyBytesIn = ByteBuffer.wrap(reqBody.getBytes(UTF8)); + final CompletableFuture reqCompleted = new CompletableFuture<>(); + + final TestHttpResponse response = new TestHttpResponse(); + + HttpConnection conn = null; + HttpStream stream = null; + try { + conn = HttpConnection.createConnection(uri, bootstrap, sockOpts, tlsContext).get(); + actuallyConnected = true; + CrtHttpStreamHandler streamHandler = new CrtHttpStreamHandler() { + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public void onResponseHeadersDone(HttpStream stream, boolean hasBody) { + response.hasBody = hasBody; + } + + @Override + public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) { + int start = bodyBytesIn.position(); + response.bodyBuffer.put(bodyBytesIn); + int amountRead = bodyBytesIn.position() - start; + + // Slide the window open by the number of bytes just read + return amountRead; + } + + @Override + public void onResponseComplete(HttpStream stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + + @Override + public boolean sendRequestBody(HttpStream stream, ByteBuffer bodyBytesOut) { + transferData(bodyBytesIn, bodyBytesOut); + + return bodyBytesIn.remaining() == 0; + } + }; + + stream = conn.makeRequest(request, streamHandler); + Assert.assertNotNull(stream); + // Give the request up to 60 seconds to complete, otherwise throw a TimeoutException + reqCompleted.get(60, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (stream != null) { + stream.close(); + } + if (conn != null) { + conn.close(); + } + + tlsContext.close(); + sockOpts.close(); + bootstrap.close(); + } + + Assert.assertTrue(actuallyConnected); + Assert.assertEquals(0, CrtResource.getAllocatedNativeResourceCount()); + + return response; + } + + private boolean shouldRetry(TestHttpResponse response) { + // Retry if we couldn't connect or if we got 503 response + if (response.onCompleteErrorCode != CRT.AWS_CRT_SUCCESS || response.statusCode == 503) { + return true; + } + return false; + } + + public TestHttpResponse testRequest(String method, String endpoint, String path, String requestBody, int expectedStatus) throws Exception { + URI uri = new URI(endpoint); + + HttpHeader[] requestHeaders = + new HttpHeader[]{ + new HttpHeader("Host", uri.getHost()), + new HttpHeader("Content-Length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + HttpRequest request = new HttpRequest(method, path, requestHeaders); + + TestHttpResponse response = null; + int numAttempts = 0; + do { + numAttempts++; + response = getResponse(uri, request, requestBody); + } while (shouldRetry(response) && numAttempts < 3); + + Assert.assertEquals("Expected and Actual Status Codes don't match", expectedStatus, response.statusCode); + + return response; + } + + @Test + public void testHttpDelete() throws Exception { + testRequest("DELETE", "https://httpbin.org", "/delete", EMPTY_BODY, 200); + testRequest("DELETE", "https://httpbin.org", "/get", EMPTY_BODY, 405); + testRequest("DELETE", "https://httpbin.org", "/post", EMPTY_BODY, 405); + testRequest("DELETE", "https://httpbin.org", "/put", EMPTY_BODY, 405); + } + + @Test + public void testHttpGet() throws Exception { + testRequest("GET", "https://httpbin.org", "/delete", EMPTY_BODY, 405); + testRequest("GET", "https://httpbin.org", "/get", EMPTY_BODY, 200); + testRequest("GET", "https://httpbin.org", "/post", EMPTY_BODY, 405); + testRequest("GET", "https://httpbin.org", "/put", EMPTY_BODY, 405); + } + + @Test + public void testHttpPost() throws Exception { + testRequest("POST", "https://httpbin.org", "/delete", EMPTY_BODY, 405); + testRequest("POST", "https://httpbin.org", "/get", EMPTY_BODY, 405); + testRequest("POST", "https://httpbin.org", "/post", EMPTY_BODY, 200); + testRequest("POST", "https://httpbin.org", "/put", EMPTY_BODY, 405); + } + + @Test + public void testHttpPut() throws Exception { + testRequest("PUT", "https://httpbin.org", "/delete", EMPTY_BODY, 405); + testRequest("PUT", "https://httpbin.org", "/get", EMPTY_BODY, 405); + testRequest("PUT", "https://httpbin.org", "/post", EMPTY_BODY, 405); + testRequest("PUT", "https://httpbin.org", "/put", EMPTY_BODY, 200); + } + + @Test + public void testHttpResponseStatusCodes() throws Exception { + testRequest("GET", "https://httpbin.org", "/status/200", EMPTY_BODY, 200); + testRequest("GET", "https://httpbin.org", "/status/300", EMPTY_BODY, 300); + testRequest("GET", "https://httpbin.org", "/status/400", EMPTY_BODY, 400); + testRequest("GET", "https://httpbin.org", "/status/500", EMPTY_BODY, 500); + } + + @Test + public void testHttpDownload() throws Exception { + TestHttpResponse response = testRequest("GET", "https://aws-crt-test-stuff.s3.amazonaws.com", "/http_test_doc.txt", EMPTY_BODY, 200); + + ByteBuffer body = response.bodyBuffer; + body.flip(); //Flip from Write mode to Read mode + + Assert.assertEquals(TEST_DOC_SHA256, calculateBodyHash(body)); + } + + /** + * Removes trailing commas, and trims quote characters from a string. + * + * @param input + * @return + */ + private String extractValueFromJson(String input) { + return input.trim() // Remove spaces from front and back + .replaceAll(",$", "") // Remove comma if it's the last character + .replaceAll("^\"|\"$", ""); // Remove quotes from front and back + } + + @Test + public void testHttpUpload() throws Exception { + String bodyToSend = TEST_DOC_LINE; + TestHttpResponse response = testRequest("PUT", "https://httpbin.org", "/anything", bodyToSend, 200); + + // Get the Body bytes that were echoed back to us + String body = response.getBody(); + + /** + * Example Json Response Body from httpbin.org: + * + * { + * "args": {}, + * "data": "This is a sample to prove that http downloads and uploads work. It doesn't really matter what's in here, we mainly just need to verify the downloads and uploads work.", + * "files": {}, + * "form": {}, + * "headers": { + * "Content-Length": "166", + * "Host": "httpbin.org" + * }, + * "json": null, + * "method": "PUT", + * "origin": "1.2.3.4, 5.6.7.8", + * "url": "https://httpbin.org/anything" + * } + * + */ + + String echoedBody = null; + for (String line: body.split("\n")) { + String[] keyAndValue = line.split(":", 2); + + // Found JSON Key/Value Pair + if (keyAndValue.length == 2) { + String key = extractValueFromJson(keyAndValue[0]); + String val = extractValueFromJson(keyAndValue[1]); + + // Found Echoed Body + if (key.equals("data")) { + echoedBody = extractValueFromJson(val); + } + } + } + + Assert.assertNotNull("Response Body did not contain \"data\" JSON key:\n" + body, echoedBody); + Assert.assertEquals(bodyToSend, echoedBody); + } + +}