Skip to content

Commit

Permalink
Global order writes should send relative uris over serialization (#3557
Browse files Browse the repository at this point in the history
…) (#3575)

Fragment URI was a required part of the global write states serialization, but we need to strip away any location sensitive info out of the uri before sending it back on the client.
This PR is addressing this issue, but also changes the way intermediate multipart upload states are stored on the client side as the (uri, multipart_state) were maintained using absolute URIs. Absolute URIs don't make sense on the client or at least not until we also have serialization support for ArrayDirectory.

---
TYPE: BUG
DESC: global order writes should send relative uris over serialization

Co-authored-by: Robert Bindar <[email protected]>
  • Loading branch information
github-actions[bot] and robertbindar authored Oct 15, 2022
1 parent 44a70bf commit c8b41a7
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 397 deletions.
10 changes: 7 additions & 3 deletions tiledb/sm/filesystem/s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1775,9 +1775,13 @@ std::optional<S3::MultiPartUploadState> S3::multipart_upload_state(
}

Status S3::set_multipart_upload_state(
const URI& uri, const MultiPartUploadState& state) {
const Aws::Http::URI aws_uri(uri.c_str());
const std::string uri_path(aws_uri.GetPath().c_str());
const std::string& uri, MultiPartUploadState& state) {
Aws::Http::URI aws_uri(uri.c_str());
std::string uri_path(aws_uri.GetPath().c_str());

state.bucket = aws_uri.GetAuthority();
state.key = aws_uri.GetPath();

UniqueWriteLock unique_wl(&multipart_upload_rwlock_);
multipart_upload_states_[uri_path] = state;

Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filesystem/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ class S3 {
* @return Status
*/
Status set_multipart_upload_state(
const URI& uri, const S3::MultiPartUploadState& state);
const std::string& uri, S3::MultiPartUploadState& state);

private:
/* ********************************* */
Expand Down
21 changes: 12 additions & 9 deletions tiledb/sm/filesystem/vfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,9 @@ Status VFS::write(const URI& uri, const void* buffer, uint64_t buffer_size) {

std::pair<Status, std::optional<VFS::MultiPartUploadState>>
VFS::multipart_upload_state(const URI& uri) {
if (uri.is_s3()) {
if (uri.is_file()) {
return {Status::Ok(), {}};
} else if (uri.is_s3()) {
#ifdef HAVE_S3
VFS::MultiPartUploadState state;
auto s3_state = s3_.multipart_upload_state(uri);
Expand All @@ -1836,8 +1838,6 @@ VFS::multipart_upload_state(const URI& uri) {
}
state.upload_id = s3_state->upload_id;
state.part_number = s3_state->part_number;
state.bucket = s3_state->bucket;
state.s3_key = s3_state->key;
state.status = s3_state->st;
auto& completed_parts = s3_state->completed_parts;
for (auto& entry : completed_parts) {
Expand Down Expand Up @@ -1868,26 +1868,28 @@ VFS::multipart_upload_state(const URI& uri) {
#endif
}

return {Status::Ok(), nullopt};
return {LOG_STATUS(
Status_VFSError("Unsupported URI schemes: " + uri.to_string())),
nullopt};
}

Status VFS::set_multipart_upload_state(
const URI& uri, const MultiPartUploadState& state) {
(void)state;
if (uri.is_s3()) {
if (uri.is_file()) {
return Status::Ok();
} else if (uri.is_s3()) {
#ifdef HAVE_S3
S3::MultiPartUploadState s3_state;
s3_state.part_number = state.part_number;
s3_state.upload_id = *state.upload_id;
s3_state.bucket = *state.bucket;
s3_state.key = *state.s3_key;
s3_state.st = state.status;
for (auto& part : state.completed_parts) {
auto rv = s3_state.completed_parts.try_emplace(part.part_number);
rv.first->second.SetETag(part.e_tag->c_str());
rv.first->second.SetPartNumber(part.part_number);
}
return s3_.set_multipart_upload_state(uri, s3_state);
return s3_.set_multipart_upload_state(uri.to_string(), s3_state);
#else
return LOG_STATUS(Status_VFSError("TileDB was built without S3 support"));
#endif
Expand All @@ -1906,7 +1908,8 @@ Status VFS::set_multipart_upload_state(
#endif
}

return Status::Ok();
return LOG_STATUS(
Status_VFSError("Unsupported URI schemes: " + uri.to_string()));
}

Status VFS::flush_multipart_file_buffer(const URI& uri) {
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/filesystem/vfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class VFS {

uint64_t part_number;
optional<std::string> upload_id;
optional<std::string> bucket;
optional<std::string> s3_key;
std::vector<CompletedParts> completed_parts;
Status status;
};
Expand Down
29 changes: 23 additions & 6 deletions tiledb/sm/query/writers/global_order_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,11 @@ GlobalOrderWriter::GlobalWriteState* GlobalOrderWriter::get_global_state() {
}

std::pair<Status, std::unordered_map<std::string, VFS::MultiPartUploadState>>
GlobalOrderWriter::multipart_upload_state() {
GlobalOrderWriter::multipart_upload_state(bool client) {
if (client) {
return {Status::Ok(), global_write_state_->multipart_upload_state_};
}

auto meta = global_write_state_->frag_meta_;
std::unordered_map<std::string, VFS::MultiPartUploadState> result;

Expand All @@ -1239,7 +1243,7 @@ GlobalOrderWriter::multipart_upload_state() {
if (!state.has_value()) {
return {Status::Ok(), {}};
}
result[uri->to_string()] = std::move(*state);
result[uri->remove_trailing_slash().last_path_part()] = std::move(*state);

if (array_schema_.var_size(name)) {
auto&& [status, var_uri] = meta->var_uri(name);
Expand All @@ -1248,7 +1252,8 @@ GlobalOrderWriter::multipart_upload_state() {
auto&& [st, var_state] =
storage_manager_->vfs()->multipart_upload_state(*var_uri);
RETURN_NOT_OK_TUPLE(st, {});
result[var_uri->to_string()] = std::move(*var_state);
result[var_uri->remove_trailing_slash().last_path_part()] =
std::move(*var_state);
}
if (array_schema_.is_nullable(name)) {
auto&& [status, validity_uri] = meta->validity_uri(name);
Expand All @@ -1257,16 +1262,28 @@ GlobalOrderWriter::multipart_upload_state() {
auto&& [st, val_state] =
storage_manager_->vfs()->multipart_upload_state(*validity_uri);
RETURN_NOT_OK_TUPLE(st, {});
result[validity_uri->to_string()] = std::move(*val_state);
result[validity_uri->remove_trailing_slash().last_path_part()] =
std::move(*val_state);
}
}

return {Status::Ok(), result};
}

Status GlobalOrderWriter::set_multipart_upload_state(
const URI& uri, const VFS::MultiPartUploadState& state) {
return storage_manager_->vfs()->set_multipart_upload_state(uri, state);
const std::string& uri,
const VFS::MultiPartUploadState& state,
bool client) {
if (client) {
global_write_state_->multipart_upload_state_[uri] = state;
return Status::Ok();
}

// uri in this case holds only the buffer name
auto absolute_uri =
global_write_state_->frag_meta_->fragment_uri().join_path(uri);
return storage_manager_->vfs()->set_multipart_upload_state(
absolute_uri, state);
}

} // namespace sm
Expand Down
23 changes: 21 additions & 2 deletions tiledb/sm/query/writers/global_order_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class GlobalOrderWriter : public WriterBase {

/** The last hilbert value written. */
uint64_t last_hilbert_value_;

/** A mapping of buffer names to multipart upload state used by clients
* to track the write state in remote global order writes */
std::unordered_map<std::string, VFS::MultiPartUploadState>
multipart_upload_state_;
};

/* ********************************* */
Expand Down Expand Up @@ -146,16 +151,30 @@ class GlobalOrderWriter : public WriterBase {
/**
* Used in serialization to share the multipart upload state
* among cloud executors
*
* @param client true if the code is executed from a client context
* @return A mapping of buffer names to VFS multipart upload states read from
* within this instance's `multipart_upload_state_` if the caller is a client,
* or from within the cloud backend internal mappings if the code is executed
* on the rest server.
*/
std::pair<Status, std::unordered_map<std::string, VFS::MultiPartUploadState>>
multipart_upload_state();
multipart_upload_state(bool client);

/**
* Used in serialization of global order writes to set the multipart upload
* state in the internal maps of cloud backends
*
* @param uri complete uri of a buffer file or just the buffer name if client
* is true
* @param state VFS multipart upload state to be set
* @param client true if the code is executed from a client context
* @return Status
*/
Status set_multipart_upload_state(
const URI& uri, const VFS::MultiPartUploadState& state);
const std::string& uri,
const VFS::MultiPartUploadState& state,
bool client);

private:
/* ********************************* */
Expand Down
2 changes: 2 additions & 0 deletions tiledb/sm/serialization/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ Status array_from_capnp(
for (auto array_schema_build : entries) {
auto schema{array_schema_from_capnp(
array_schema_build.getValue(), array->array_uri())};
schema.set_array_uri(array->array_uri());
all_schemas[array_schema_build.getKey()] =
make_shared<ArraySchema>(HERE(), schema);
}
Expand All @@ -211,6 +212,7 @@ Status array_from_capnp(
auto array_schema_latest_reader = array_reader.getArraySchemaLatest();
auto array_schema_latest{array_schema_from_capnp(
array_schema_latest_reader, array->array_uri())};
array_schema_latest.set_array_uri(array->array_uri());
array->set_array_schema_latest(
make_shared<ArraySchema>(HERE(), array_schema_latest));
}
Expand Down
2 changes: 2 additions & 0 deletions tiledb/sm/serialization/fragment_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ Status fragment_info_from_capnp(
fragment_info_reader.getArraySchemaLatest();
auto array_schema_latest{
array_schema_from_capnp(array_schema_latest_reader, array_uri)};
array_schema_latest.set_array_uri(array_uri);
fragment_info->array_schema_latest() =
make_shared<ArraySchema>(HERE(), array_schema_latest);
}
Expand All @@ -292,6 +293,7 @@ Status fragment_info_from_capnp(
for (auto array_schema_build : entries) {
auto schema{
array_schema_from_capnp(array_schema_build.getValue(), array_uri)};
schema.set_array_uri(array_uri);
fragment_info->array_schemas_all()[array_schema_build.getKey()] =
make_shared<ArraySchema>(HERE(), schema);
}
Expand Down
10 changes: 8 additions & 2 deletions tiledb/sm/serialization/fragment_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ Status fragment_metadata_from_capnp(
}
}
if (frag_meta_reader.hasFragmentUri()) {
frag_meta->fragment_uri() = URI(frag_meta_reader.getFragmentUri().cStr());
// Reconstruct the fragment uri out of the received fragment name
auto write_version = array_schema->write_version();
auto frag_dir_uri = ArrayDirectory::generate_fragment_dir_uri(
write_version, array_schema->array_uri().add_trailing_slash());
auto fragment_name = std::string(frag_meta_reader.getFragmentUri().cStr());
frag_meta->fragment_uri() = frag_dir_uri.join_path(fragment_name);
}
frag_meta->has_timestamps() = frag_meta_reader.getHasTimestamps();
frag_meta->has_delete_meta() = frag_meta_reader.getHasDeleteMeta();
Expand Down Expand Up @@ -281,7 +286,8 @@ Status fragment_metadata_to_capnp(
}
}

frag_meta_builder->setFragmentUri(frag_meta.fragment_uri());
frag_meta_builder->setFragmentUri(
frag_meta.fragment_uri().remove_trailing_slash().last_path_part());
frag_meta_builder->setHasTimestamps(frag_meta.has_timestamps());
frag_meta_builder->setHasDeleteMeta(frag_meta.has_delete_meta());
frag_meta_builder->setHasConsolidatedFooter(
Expand Down
70 changes: 20 additions & 50 deletions tiledb/sm/serialization/posix/tiledb-rest.capnp.c++

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c8b41a7

Please sign in to comment.