From ad9eae926144092130a277d70e8d4b9d7fe88258 Mon Sep 17 00:00:00 2001 From: Petr Matousek Date: Mon, 2 Dec 2024 21:39:56 +0100 Subject: [PATCH] wip: initial version refactoring n8 --- .../reactor/handler/TxReceiverHandler.h | 2 +- .../reactor/handler/TxSenderHandler.cpp | 37 +++++++++++-------- .../reactor/handler/TxSenderHandler.h | 2 +- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h index 0fdb2f1..c17d0e4 100644 --- a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h +++ b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h @@ -247,7 +247,7 @@ class TxReceiverHandler : public CommonHandler, transaction_handler { // transaction variables int batch_size = 0; int current_batch = 0; - int committed = 0; + int processed = 0; int confirmed = 0; string tx_action = "commit"; string tx_endloop_action = "commit"; diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp index 1adabd8..62e2ef3 100644 --- a/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp @@ -90,7 +90,7 @@ TxSenderHandler::TxSenderHandler( duration_mode(duration_mode), batch_size(1), current_batch(0), - committed(0), + processed(0), confirmedSent(0), tx_action(tx_action), tx_endloop_action(tx_endloop_action), @@ -193,12 +193,12 @@ void TxSenderHandler::send() logger(trace) << "Transaction variables"; logger(trace) << "confirmed: " << confirmedSent; - logger(trace) << "commited: " << committed; + logger(trace) << "processed: " << processed; logger(trace) << "count: " << count; logger(trace) << "is_empty: " << tx.is_empty(); logger(trace) << "current_batch: " << current_batch; - while (!tx.is_empty() && sndr.credit() && (committed + current_batch) < count) + while (!tx.is_empty() && sndr.credit() && (processed + current_batch) < count) { tx.send(sndr, message_to_send); @@ -229,7 +229,7 @@ void TxSenderHandler::send() logger(trace) << "Transaction variables"; logger(trace) << "confirmed: " << confirmedSent; - logger(trace) << "commited: " << committed; + logger(trace) << "processed: " << processed; logger(trace) << "count: " << count; logger(trace) << "is_empty: " << tx.is_empty(); logger(trace) << "current_batch: " << current_batch; @@ -307,7 +307,7 @@ void TxSenderHandler::on_connection_close(connection &c) logger(info) << "Transaction batch size: " << batch_size; logger(info) << "Transaction current batch: " << current_batch; logger(info) << "Transaction confirmed: " << confirmedSent; - logger(info) << "Transaction committed: " << committed; + logger(info) << "Transaction processed: " << processed; current_batch = 0; } @@ -329,26 +329,31 @@ void TxSenderHandler::on_transaction_declared(transaction t) { } void TxSenderHandler::on_transaction_committed(transaction t) { - logger(trace) << "[on_transaction_commited] Messages committed"; - committed += current_batch; - current_batch = 0; + logger(trace) << "[on_transaction_committed] Messages committed"; + processed += current_batch; connection conn = sndr.connection(); - if(committed == count) { - logger(trace) << "[on_transaction_commited] All messages committed"; + if (processed == count) { + logger(trace) << "[on_transaction_committed] All messages processed"; conn.close(); - } - else { + } else { + current_batch = 0; cont->declare_transaction(conn, *this); } } // TODO jak je to s temi override ? -// ?? python to tak ma ?? stejne jak commited // void TxSenderHandler::on_transaction_aborted(transaction t) override { void TxSenderHandler::on_transaction_aborted(transaction t) { - logger(debug) << "[on_transaction_aborted] Mesages Aborted"; - current_batch = 0; - cont->declare_transaction(sndr.connection(), *this); + logger(trace) << "[on_transaction_aborted] Messages aborted"; + processed += current_batch; + connection conn = sndr.connection(); + if (processed == count) { + logger(trace) << "[on_transaction_aborted] All messages processed"; + conn.close(); + } else { + current_batch = 0; + cont->declare_transaction(conn, *this); + } } void TxSenderHandler::on_sender_close(sender &s) { diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.h b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h index 8aa335f..14712b2 100644 --- a/src/api/qpid-proton/reactor/handler/TxSenderHandler.h +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h @@ -188,7 +188,7 @@ class TxSenderHandler : public CommonHandler, transaction_handler { // transactions related variables int batch_size = 0; int current_batch = 0; - int committed = 0; + int processed = 0; int confirmedSent = 0; string tx_action = "commit"; string tx_endloop_action = "commit";