diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp index 6323ca35a..5b7d07591 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp @@ -42,6 +42,12 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + bool get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const override; + TypeSupport(); protected: @@ -49,6 +55,7 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport private: const message_type_support_callbacks_t * members_; + const message_type_support_key_callbacks_t * key_callbacks_; bool has_data_; }; diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index c16db932d..82ec2ebeb 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -264,6 +264,15 @@ rmw_fastrtps_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->data_writer_ = publisher->create_datawriter( info->topic_, diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index d94ae4859..4382bc2d6 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -323,6 +323,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -381,6 +390,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->request_writer_ = publisher->create_datawriter( info->request_topic_, diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 118b4e2f9..5a299c4fd 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -322,6 +322,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -384,6 +393,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->response_writer_ = publisher->create_datawriter( info->response_topic_, diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 2375f45cb..576e3f8fb 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -392,6 +392,15 @@ __create_dynamic_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + info->datareader_qos_ = reader_qos; // create_datareader @@ -659,6 +668,15 @@ __create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + info->datareader_qos_ = reader_qos; // create_datareader diff --git a/rmw_fastrtps_cpp/src/type_support_common.cpp b/rmw_fastrtps_cpp/src/type_support_common.cpp index eb3d38232..3c4cd3c0f 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_cpp/src/type_support_common.cpp @@ -28,6 +28,11 @@ TypeSupport::TypeSupport() m_isGetKeyDefined = false; max_size_bound_ = false; is_plain_ = false; + key_is_unbounded_ = false; + key_max_serialized_size_ = 0; + members_ = nullptr; + key_callbacks_ = nullptr; + has_data_ = false; } void TypeSupport::set_members(const message_type_support_callbacks_t * members) @@ -57,6 +62,16 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members) m_typeSize = 4 + data_size; // Account for RTPS submessage alignment m_typeSize = (m_typeSize + 3) & ~3; + + if (nullptr != members->key_callbacks) { + key_callbacks_ = members->key_callbacks; + m_isGetKeyDefined = true; + + key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_); + if (!key_is_unbounded_) { + key_buffer_.reserve(key_max_serialized_size_); + } + } } size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const @@ -129,6 +144,56 @@ bool TypeSupport::deserializeROSmessage( return true; } +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const +{ + assert(ros_message); + assert(ihandle); + (void)impl; + + // retrieve estimated serialized size in case key is unbounded + if (key_is_unbounded_) { + key_max_serialized_size_ = (std::max) ( + key_max_serialized_size_, + key_callbacks_->get_serialized_size_key(ros_message)); + key_buffer_.reserve(key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer fast_buffer( + reinterpret_cast(key_buffer_.data()), + key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1); + + key_callbacks_->cdr_serialize_key(ros_message, ser); + + const size_t max_serialized_key_length = 16; + + auto ser_length = ser.get_serialized_data_length(); + + // check for md5 + if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) { + md5_.init(); + md5_.update(key_buffer_.data(), static_cast(ser_length)); + md5_.finalize(); + + for (uint8_t i = 0; i < max_serialized_key_length; ++i) { + ihandle->value[i] = md5_.digest[i]; + } + } else { + memset(ihandle->value, 0, max_serialized_key_length); + for (uint8_t i = 0; i < ser_length; ++i) { + ihandle->value[i] = key_buffer_[i]; + } + } + + return true; +} + MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members) { assert(members); diff --git a/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp index c56945ae1..0f03983f4 100644 --- a/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp @@ -60,6 +60,13 @@ MessageTypeSupport::MessageTypeSupport( } else { this->m_typeSize++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(members); + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp index b7152480a..6635e3f4e 100644 --- a/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp @@ -59,6 +59,13 @@ RequestTypeSupport::RequestTypeSupport( } else { this->m_typeSize++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_); + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } @@ -92,6 +99,13 @@ ResponseTypeSupport::ResponseTypeSupport } else { this->m_typeSize++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_); + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp b/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp index 627e4d5c2..b9e1153b6 100644 --- a/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp @@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + + bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, + const void * impl) const override; }; class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport @@ -170,28 +174,68 @@ class TypeSupport : public BaseTypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + bool get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const override; + protected: explicit TypeSupport(const void * ros_type_support); size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment); + size_t calculateMaxSerializedKeySize(const MembersType * members); const MembersType * members_; private: + size_t calculateMaxSerializedSize( + const MembersType * members, + size_t current_alignment, + bool compute_key, + bool & is_key_unbounded); + size_t getEstimatedSerializedSize( const MembersType * members, const void * ros_message, size_t current_alignment) const; + size_t getEstimatedSerializedKeySize( + const MembersType * members, + const void * ros_message) const; + + size_t getEstimatedSerializedSize( + const MembersType * members, + const void * ros_message, + size_t current_alignment, + bool compute_key) const; + bool serializeROSmessage( eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message) const; + bool serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message) const; + + bool serializeROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message, + bool compute_key) const; + bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, const MembersType * members, void * ros_message) const; + + bool get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) const; }; } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp index 142648fbf..9aee286b5 100644 --- a/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp @@ -65,6 +65,7 @@ TypeSupport::TypeSupport(const void * ros_type_support) m_isGetKeyDefined = false; max_size_bound_ = false; is_plain_ = false; + key_is_unbounded_ = false; } // C++ specialization @@ -196,12 +197,36 @@ bool TypeSupport::serializeROSmessage( eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message) const +{ + return serializeROSmessage(ser, members, ros_message, false); +} + +template +bool TypeSupport::serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message) const +{ + return serializeROSmessage(ser, members, ros_message, true); +} + +template +bool TypeSupport::serializeROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message, + bool compute_key) const { assert(members); assert(ros_message); for (uint32_t i = 0; i < members->member_count_; ++i) { const auto member = members->members_ + i; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + void * field = const_cast(static_cast(ros_message)) + member->offset_; switch (member->type_id_) { case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: @@ -255,7 +280,7 @@ bool TypeSupport::serializeROSmessage( { auto sub_members = static_cast(member->members_->data); if (!member->is_array_) { - serializeROSmessage(ser, sub_members, field); + serializeROSmessage(ser, sub_members, field, compute_key); } else { size_t array_size = 0; @@ -277,7 +302,9 @@ bool TypeSupport::serializeROSmessage( return false; } for (size_t index = 0; index < array_size; ++index) { - serializeROSmessage(ser, sub_members, member->get_function(field, index)); + serializeROSmessage( + ser, sub_members, member->get_function(field, index), + compute_key); } } } @@ -290,6 +317,57 @@ bool TypeSupport::serializeROSmessage( return true; } +template +bool TypeSupport::get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) const +{ + assert(members); + assert(ros_message); + assert(ihandle); + + // get estimated serialized size in case key is unbounded + if (this->key_is_unbounded_) { + this->key_max_serialized_size_ = + (std::max) (this->key_max_serialized_size_, + this->getEstimatedSerializedKeySize(members, ros_message)); + key_buffer_.reserve(this->key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(this->key_buffer_.data()), + this->key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1); + + // serialize + serializeKeyROSmessage(ser, members_, ros_message); + + // check for md5 + if (force_md5 || this->key_max_serialized_size_ > 16) { + md5_.init(); + + md5_.update( + this->key_buffer_.data(), + static_cast(ser.get_serialized_data_length())); + + md5_.finalize(); + + for (uint8_t i = 0; i < 16; ++i) { + ihandle->value[i] = md5_.digest[i]; + } + } else { + for (uint8_t i = 0; i < 16; ++i) { + ihandle->value[i] = this->key_buffer_[i]; + } + } + + return true; +} + // C++ specialization template size_t next_field_align( @@ -459,6 +537,24 @@ size_t TypeSupport::getEstimatedSerializedSize( const MembersType * members, const void * ros_message, size_t current_alignment) const +{ + return getEstimatedSerializedSize(members, ros_message, current_alignment, false); +} + +template +size_t TypeSupport::getEstimatedSerializedKeySize( + const MembersType * members, + const void * ros_message) const +{ + return getEstimatedSerializedSize(members, ros_message, 0, true); +} + +template +size_t TypeSupport::getEstimatedSerializedSize( + const MembersType * members, + const void * ros_message, + size_t current_alignment, + bool compute_key) const { assert(members); assert(ros_message); @@ -468,6 +564,11 @@ size_t TypeSupport::getEstimatedSerializedSize( for (uint32_t i = 0; i < members->member_count_; ++i) { const auto member = members->members_ + i; void * field = const_cast(static_cast(ros_message)) + member->offset_; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + switch (member->type_id_) { case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: current_alignment = next_field_align(member, field, current_alignment); @@ -514,7 +615,9 @@ size_t TypeSupport::getEstimatedSerializedSize( { auto sub_members = static_cast(member->members_->data); if (!member->is_array_) { - current_alignment += getEstimatedSerializedSize(sub_members, field, current_alignment); + current_alignment += getEstimatedSerializedSize( + sub_members, field, current_alignment, + compute_key); } else { size_t array_size = 0; @@ -539,7 +642,8 @@ size_t TypeSupport::getEstimatedSerializedSize( current_alignment += getEstimatedSerializedSize( sub_members, member->get_function(field, index), - current_alignment); + current_alignment, + compute_key); } } } @@ -801,6 +905,24 @@ bool TypeSupport::deserializeROSmessage( template size_t TypeSupport::calculateMaxSerializedSize( const MembersType * members, size_t current_alignment) +{ + bool is_key_unbounded{false}; // unused + return calculateMaxSerializedSize(members, current_alignment, false, is_key_unbounded); +} + +template +size_t TypeSupport::calculateMaxSerializedKeySize( + const MembersType * members) +{ + return calculateMaxSerializedSize(members, 0, true, this->key_is_unbounded_); +} + +template +size_t TypeSupport::calculateMaxSerializedSize( + const MembersType * members, + size_t current_alignment, + bool compute_key, + bool & is_key_unbounded) { assert(members); @@ -813,6 +935,11 @@ size_t TypeSupport::calculateMaxSerializedSize( const auto * member = members->members_ + i; size_t array_size = 1; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + if (member->is_array_) { array_size = member->array_size_; @@ -864,6 +991,11 @@ size_t TypeSupport::calculateMaxSerializedSize( { this->max_size_bound_ = false; this->is_plain_ = false; + + if (compute_key) { + is_key_unbounded = true; + } + size_t character_size = (member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) ? 4 : 1; size_t extra_char = @@ -879,7 +1011,9 @@ size_t TypeSupport::calculateMaxSerializedSize( { auto sub_members = static_cast(member->members_->data); for (size_t index = 0; index < array_size; ++index) { - size_t curr = calculateMaxSerializedSize(sub_members, current_alignment); + size_t curr = calculateMaxSerializedSize( + sub_members, current_alignment, compute_key, + is_key_unbounded); current_alignment += curr; last_member_size += curr; } @@ -979,6 +1113,25 @@ bool TypeSupport::deserializeROSmessage( return true; } +template +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, + const void * impl) const +{ + assert(ros_message); + assert(ihandle); + assert(members_); + + bool ret = false; + + (void)impl; + if (members_->member_count_ != 0) { + ret = TypeSupport::get_key_hash_from_ros_message(members_, ros_message, ihandle, force_md5); + } + + return ret; +} + } // namespace rmw_fastrtps_dynamic_cpp #endif // TYPESUPPORT_IMPL_HPP_ diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index 6d597abe7..d737fc8e3 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -272,6 +272,15 @@ rmw_fastrtps_dynamic_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter (with publisher name to not change name policy) info->data_writer_ = publisher->create_datawriter( info->topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index d8626b54c..5c4423da3 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -354,6 +354,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -412,6 +421,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->request_writer_ = publisher->create_datawriter( info->request_topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 4e1fe8341..47f6bd70f 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -353,6 +353,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -415,6 +424,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->response_writer_ = publisher->create_datawriter( info->response_topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index c3d3433e3..23587d73b 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -276,6 +276,15 @@ create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos; switch (subscription_options->require_unique_network_flow_endpoints) { default: diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp index f4924c7a4..bce368920 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp @@ -23,6 +23,8 @@ TypeSupportProxy::TypeSupportProxy(rmw_fastrtps_shared_cpp::TypeSupport * inner_ m_typeSize = inner_type->m_typeSize; is_plain_ = inner_type->is_plain(); max_size_bound_ = inner_type->is_bounded(); + m_isGetKeyDefined = inner_type->m_isGetKeyDefined; + key_is_unbounded_ = inner_type->is_key_unbounded(); } size_t TypeSupportProxy::getEstimatedSerializedSize( @@ -46,4 +48,12 @@ bool TypeSupportProxy::deserializeROSmessage( return type_impl->deserializeROSmessage(deser, ros_message, impl); } +bool TypeSupportProxy::get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, + const void * impl) const +{ + auto type_impl = static_cast(impl); + return type_impl->get_key_hash_from_ros_message(ros_message, ihandle, force_md5, impl); +} + } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index df370b0f5..5af5e9e29 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "fastdds/dds/topic/TopicDataType.hpp" @@ -25,6 +26,7 @@ #include "fastcdr/FastBuffer.h" #include "fastcdr/Cdr.h" +#include "fastrtps/utils/md5.h" #include "rcutils/logging_macros.h" @@ -61,15 +63,15 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType virtual bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const = 0; + virtual bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, + const void * impl) const = 0; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool getKey( void * data, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, - bool force_md5 = false) override - { - (void)data; (void)ihandle; (void)force_md5; - return false; - } + bool force_md5 = false) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) override; @@ -110,6 +112,12 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType return is_plain_ && rep == eprosima::fastdds::dds::XCDR_DATA_REPRESENTATION; } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_key_unbounded() const + { + return key_is_unbounded_; + } + RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} @@ -119,6 +127,11 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType bool max_size_bound_; bool is_plain_; + bool key_is_unbounded_; + mutable size_t key_max_serialized_size_; + mutable MD5 md5_; + mutable std::vector key_buffer_; + mutable std::mutex mtx_; }; RMW_FASTRTPS_SHARED_CPP_PUBLIC diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp index 67f1a5ad6..7973ca738 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp @@ -126,6 +126,21 @@ create_datareader( CustomDataReaderListener * listener, eprosima::fastdds::dds::DataReader ** data_reader); +/** +* Apply specific resource limits when using keys. +* Max samples per instance is set to history depth if KEEP_LAST +* else UNLIMITED. +* +* \param[in] history_qos History entitiy QoS. +* \param[in, out] res_limits_qos Resource limits entitiy QoS. +* +*/ +RMW_FASTRTPS_SHARED_CPP_PUBLIC +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__UTILS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp index 62f8cc24f..b34ae77f3 100644 --- a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp +++ b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp @@ -64,6 +64,58 @@ void * TypeSupport::createData() return new eprosima::fastcdr::FastBuffer(); } +bool TypeSupport::getKey( + void * data, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) +{ + assert(data); + + bool ret = false; + + if (!m_isGetKeyDefined) { + return ret; + } + + auto ser_data = static_cast(data); + + switch (ser_data->type) { + case FASTRTPS_SERIALIZED_DATA_TYPE_ROS_MESSAGE: + { + std::lock_guard lock(this->mtx_); + ret = + this->get_key_hash_from_ros_message(ser_data->data, ihandle, force_md5, ser_data->impl); + break; + } + + case FASTRTPS_SERIALIZED_DATA_TYPE_CDR_BUFFER: + { + // TODO(MiguelCompany): In order to support keys in rmw_publish_serialized_message, + // we would need a get_key_hash_from_payload method + break; + } + + case FASTRTPS_SERIALIZED_DATA_TYPE_DYNAMIC_MESSAGE: + { + auto m_type = std::make_shared(); + + // Retrieves the key (ihandle) from the dynamic data stored in data->data + return m_type->getKey( + static_cast(ser_data->data), + ihandle, + force_md5); + + break; + } + default: + { + break; + } + } + + return ret; +} + bool TypeSupport::serialize( void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) { diff --git a/rmw_fastrtps_shared_cpp/src/utils.cpp b/rmw_fastrtps_shared_cpp/src/utils.cpp index 47429680a..86500685c 100644 --- a/rmw_fastrtps_shared_cpp/src/utils.cpp +++ b/rmw_fastrtps_shared_cpp/src/utils.cpp @@ -184,5 +184,18 @@ create_datareader( return true; } +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos) +{ + res_limits_qos.max_instances = 0; + res_limits_qos.max_samples = 0; + if (history_qos.kind == eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS) { + res_limits_qos.max_samples_per_instance = history_qos.depth; + } else { + res_limits_qos.max_samples_per_instance = 0; + } +} } // namespace rmw_fastrtps_shared_cpp