Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crt/aws-c-http
2 changes: 1 addition & 1 deletion crt/aws-lc
2 changes: 1 addition & 1 deletion crt/s2n
Submodule s2n updated from f5e5e8 to 3276a0
15 changes: 15 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,20 @@ public Mqtt5ClientOperationStatistics getOperationStatistics() {
return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
}

/**
* Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control.
*
* <p>To use manual PUBACK control, call {@link PublishReturn#acquirePubackControl()} within
* the {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback of a QoS 1 PUBLISH to
* obtain a {@link Mqtt5PubackControlHandle}. Then call this method to send the PUBACK.</p>
*
* @param pubackControlHandle An opaque handle obtained from {@link PublishReturn#acquirePubackControl()}.
* @throws CrtRuntimeException If the native client returns an error when invoking the PUBACK.
*/
public void invokePuback(Mqtt5PubackControlHandle pubackControlHandle) throws CrtRuntimeException {
mqtt5ClientInternalInvokePuback(getNativeHandle(), pubackControlHandle.getControlId());
}

/**
* Returns the connectivity state for the Mqtt5Client.
* @return True if the client is connected, false otherwise
Expand Down Expand Up @@ -277,4 +291,5 @@ private static native long mqtt5ClientNew(
private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
private static native void mqtt5ClientInternalInvokePuback(long client, long controlId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt5;

/**
* An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet.
*
* <p>This class cannot be instantiated directly. Instances are only created by the CRT library.</p>
*/
public class Mqtt5PubackControlHandle {

private final long controlId;

/**
* Creates a new Mqtt5PubackControlHandle. Only called from native/JNI code.
*
* @param controlId The native puback control ID returned by aws_mqtt5_client_acquire_puback.
*/
Mqtt5PubackControlHandle(long controlId) {
this.controlId = controlId;
}

/**
* Returns the native puback control ID. Used internally by JNI.
*
* @return The native puback control ID.
*/
long getControlId() {
return controlId;
}
}
58 changes: 54 additions & 4 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/PublishReturn.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
public class PublishReturn {
private PublishPacket publishPacket;

/**
* Single-element long array holding the native manual PUBACK control context pointer.
* Element [0] is the pointer value, valid only during the
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback.
* Native code sets [0] to 0 after the callback returns (via SetLongArrayRegion,
* requiring no extra JNI method ID).
*/
private final long[] nativeContextPtrHolder;

/**
* Returns the PublishPacket returned from the server or Null if none was returned.
* @return The PublishPacket returned from the server.
Expand All @@ -22,12 +31,53 @@ public PublishPacket getPublishPacket() {
return publishPacket;
}

/**
* Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the
* client from automatically sending a PUBACK. The returned handle can be passed to
* {@link Mqtt5Client#invokePuback(Mqtt5PubackControlHandle)} at a later time to send the
* PUBACK to the broker.
*
* <p><b>Important:</b> This method must be called within the
* {@link Mqtt5ClientOptions.PublishEvents#onMessageReceived} callback. Calling it after the
* callback returns will throw an {@link IllegalStateException}.</p>
*
* <p>This method may only be called once per received PUBLISH. Subsequent calls will throw
* an {@link IllegalStateException}.</p>
*
* <p>If this method is not called, the client will automatically send a PUBACK for QoS 1
* messages when the callback returns.</p>
*
* @return A {@link Mqtt5PubackControlHandle} that can be used to manually send the PUBACK.
* @throws IllegalStateException if called outside the onMessageReceived callback or called more than once.
*/
public synchronized Mqtt5PubackControlHandle acquirePubackControl() {
if (nativeContextPtrHolder == null || nativeContextPtrHolder[0] == 0) {
throw new IllegalStateException(
"acquirePubackControl() must be called within the onMessageReceived callback and may only be called once.");
}
long controlId = mqtt5AcquirePubackControl(nativeContextPtrHolder[0]);
/* We set the array element to 0 so it can't be double-called */
nativeContextPtrHolder[0] = 0;
return new Mqtt5PubackControlHandle(controlId);
}

/**
* This is only called in JNI to make a new PublishReturn with a PUBLISH packet.
* @param newPublishPacket The PubAckPacket data for QoS 1 packets. Can be null if result is non QoS 1.
* @return A newly created PublishResult
* The nativeContextPtrHolder is a single-element long array; native code sets [0] to 0
* after the onMessageReceived callback returns to prevent use-after-free.
*
* @param newPublishPacket The PublishPacket data received from the server.
* @param nativeContextPtrHolder Single-element long[] holding the native PUBACK control context pointer.
*/
private PublishReturn(PublishPacket newPublishPacket) {
private PublishReturn(PublishPacket newPublishPacket, long[] nativeContextPtrHolder) {
this.publishPacket = newPublishPacket;
this.nativeContextPtrHolder = nativeContextPtrHolder;
}
}

/**
* Calls the native aws_mqtt5_client_acquire_puback function.
* @param nativeContextPtr Pointer to the native manual PUBACK control context.
* @return The native puback control ID.
*/
private static native long mqtt5AcquirePubackControl(long nativeContextPtr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,8 @@
{
"name": "<init>",
"parameterTypes": [
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket"
"software.amazon.awssdk.crt.mqtt5.packets.PublishPacket",
"long[]"
]
}
]
Expand Down
9 changes: 7 additions & 2 deletions src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -2248,12 +2248,17 @@ static void s_cache_mqtt5_publish_return(JNIEnv *env) {
AWS_FATAL_ASSERT(cls);
mqtt5_publish_return_properties.return_class = (*env)->NewGlobalRef(env, cls);
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_class);
// Functions
/*
* Constructor: PublishReturn(PublishPacket, long[])
* The long[] is a single-element array holding the native context pointer.
* Native code sets [0] to 0 via SetLongArrayRegion after the callback returns,
* requiring no extra JNI method ID.
*/
mqtt5_publish_return_properties.return_constructor_id = (*env)->GetMethodID(
env,
mqtt5_publish_return_properties.return_class,
"<init>",
"(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;)V");
"(Lsoftware/amazon/awssdk/crt/mqtt5/packets/PublishPacket;[J)V");
AWS_FATAL_ASSERT(mqtt5_publish_return_properties.return_constructor_id);
}

Expand Down
2 changes: 1 addition & 1 deletion src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ extern struct java_aws_mqtt5_publish_result_properties mqtt5_publish_result_prop
/* mqtt5.PublishReturn */
struct java_aws_mqtt5_publish_return_properties {
jclass return_class;
jmethodID return_constructor_id;
jmethodID return_constructor_id; /* (PublishPacket, long[]) - long[0] holds native context ptr */
};
extern struct java_aws_mqtt5_publish_return_properties mqtt5_publish_return_properties;

Expand Down
127 changes: 125 additions & 2 deletions src/native/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ struct aws_mqtt5_client_publish_return_data {
jobject jni_publish_future;
};

/* Context for manual PUBACK control. Valid only during the publish received callback. */
struct manual_puback_control_context {
struct aws_mqtt5_client *client;
const struct aws_mqtt5_packet_publish_view *publish_packet;
};

struct aws_mqtt5_client_subscribe_return_data {
struct aws_mqtt5_client_java_jni *java_client;
jobject jni_subscribe_future;
Expand Down Expand Up @@ -538,6 +544,9 @@ static void s_aws_mqtt5_client_java_publish_received(
/* One reference is needed for the PublishReturn */
references_needed += 1;

/* One reference is needed for the long[] context pointer holder array */
references_needed += 1;

/* A Publish packet will need 5 references at minimum */
references_needed += 5;
/* Optionals */
Expand Down Expand Up @@ -577,12 +586,38 @@ static void s_aws_mqtt5_client_java_publish_received(
goto clean_up;
}

/* Make the PublishReturn struct that will hold all of the data that is passed to Java */
/* Create manual PUBACK control context (valid only during this callback)
* We clean this before clean_up since we don't want to create this early and we can be sure to
* hit the aws_mem_release AFTER we return from the publish events callback.
*/
struct manual_puback_control_context *control_context =
aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct manual_puback_control_context));

/*
* Create a single-element long[] array to hold the native context pointer.
* This allows native code to invalidate the pointer after the callback returns
* using SetLongArrayRegion(array, 0, 1, &zero) without using an extra JNI method ID.
*/
jlongArray context_ptr_holder = NULL;
control_context->client = java_client->client;
control_context->publish_packet = publish;

context_ptr_holder = (*env)->NewLongArray(env, 1);
/* If allocation failed, Java will throw IllegalStateException on acquirePubackControl(). */
if (context_ptr_holder != NULL) {
jlong context_ptr_value = (jlong)(uintptr_t)control_context;
/* Assigning the pointer to the allocated manual_puback_control_context to the single-element of the array */
(*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &context_ptr_value);
}

/* Make the PublishReturn struct that will hold all of the data that is passed to Java.
* The constructor takes (PublishPacket, long[]) where long[0] is the native context pointer. */
publish_packet_return_data = (*env)->NewObject(
env,
mqtt5_publish_return_properties.return_class,
mqtt5_publish_return_properties.return_constructor_id,
publish_packet_data);
publish_packet_data,
context_ptr_holder);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */

if (java_client->jni_publish_events) {
Expand All @@ -594,6 +629,22 @@ static void s_aws_mqtt5_client_java_publish_received(
publish_packet_return_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
}

/*
* Invalidate the context pointer in the long[] array now that the callback has returned.
* This prevents use-after-free if acquirePubackControl() is called after the callback.
* SetLongArrayRegion requires no extra JNI method ID.
*/
if (context_ptr_holder != NULL) {
jlong zero = 0;
(*env)->SetLongArrayRegion(env, context_ptr_holder, 0, 1, &zero);
}

/* Free the context. The publish_packet pointer is no longer valid after this callback returns */
if (control_context != NULL) {
aws_mem_release(aws_jni_get_allocator(), control_context);
}

goto clean_up;

clean_up:
Expand Down Expand Up @@ -2169,6 +2220,78 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl
}
}

/*******************************************************************************
* Manual PUBACK Control Functions
******************************************************************************/

/**
* Called from PublishReturn.mqtt5AcquirePubackControl(long nativeContextPtr).
* Calls aws_mqtt5_client_acquire_puback to take manual control of the PUBACK
* for the received PUBLISH packet. Returns the puback_control_id as a jlong.
*
* This must be called within the onMessageReceived callback while the
* native context pointer is still valid.
*/
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_PublishReturn_mqtt5AcquirePubackControl(
JNIEnv *env,
jclass jni_class,
jlong native_context_ptr) {
(void)jni_class;
aws_cache_jni_ids(env);

if (native_context_ptr == 0) {
aws_jni_throw_runtime_exception(
env,
"PublishReturn.acquirePubackControl: context is no longer valid. "
"acquirePubackControl() must be called within the onMessageReceived callback.");
return 0;
}

struct manual_puback_control_context *context =
(struct manual_puback_control_context *)(uintptr_t)native_context_ptr;

if (!context->client || !context->publish_packet) {
aws_jni_throw_runtime_exception(
env, "PublishReturn.acquirePubackControl: invalid native PUBACK control context");
return 0;
}

uint64_t control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet);
return (jlong)control_id;
}

/**
* Called from Mqtt5Client.mqtt5ClientInternalInvokePuback(long client, long controlId).
* Calls aws_mqtt5_client_invoke_puback to send the PUBACK for a previously acquired
* manual PUBACK control handle.
*/
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalInvokePuback(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jlong control_id) {
(void)jni_class;
aws_cache_jni_ids(env);

struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.invokePuback: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.invokePuback: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
return;
}

int result = aws_mqtt5_client_invoke_puback(java_client->client, (uint64_t)control_id, NULL);
if (result != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.invokePuback: aws_mqtt5_client_invoke_puback failed!", aws_last_error());
}
}

#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
Expand Down
Loading