Skip to content

Commit

Permalink
wip: initial version refactoring n8
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Dec 2, 2024
1 parent e0651a0 commit ad9eae9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/api/qpid-proton/reactor/handler/TxReceiverHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class TxReceiverHandler : public CommonHandler, transaction_handler {
// transaction variables
int batch_size = 0;
int current_batch = 0;
int committed = 0;
int processed = 0;
int confirmed = 0;
string tx_action = "commit";
string tx_endloop_action = "commit";
Expand Down
37 changes: 21 additions & 16 deletions src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TxSenderHandler::TxSenderHandler(
duration_mode(duration_mode),
batch_size(1),
current_batch(0),
committed(0),
processed(0),
confirmedSent(0),
tx_action(tx_action),
tx_endloop_action(tx_endloop_action),
Expand Down Expand Up @@ -193,12 +193,12 @@ void TxSenderHandler::send()

logger(trace) << "Transaction variables";
logger(trace) << "confirmed: " << confirmedSent;
logger(trace) << "commited: " << committed;
logger(trace) << "processed: " << processed;
logger(trace) << "count: " << count;
logger(trace) << "is_empty: " << tx.is_empty();
logger(trace) << "current_batch: " << current_batch;

while (!tx.is_empty() && sndr.credit() && (committed + current_batch) < count)
while (!tx.is_empty() && sndr.credit() && (processed + current_batch) < count)
{
tx.send(sndr, message_to_send);

Expand Down Expand Up @@ -229,7 +229,7 @@ void TxSenderHandler::send()

logger(trace) << "Transaction variables";
logger(trace) << "confirmed: " << confirmedSent;
logger(trace) << "commited: " << committed;
logger(trace) << "processed: " << processed;
logger(trace) << "count: " << count;
logger(trace) << "is_empty: " << tx.is_empty();
logger(trace) << "current_batch: " << current_batch;
Expand Down Expand Up @@ -307,7 +307,7 @@ void TxSenderHandler::on_connection_close(connection &c)
logger(info) << "Transaction batch size: " << batch_size;
logger(info) << "Transaction current batch: " << current_batch;
logger(info) << "Transaction confirmed: " << confirmedSent;
logger(info) << "Transaction committed: " << committed;
logger(info) << "Transaction processed: " << processed;
current_batch = 0;
}

Expand All @@ -329,26 +329,31 @@ void TxSenderHandler::on_transaction_declared(transaction t) {
}

void TxSenderHandler::on_transaction_committed(transaction t) {
logger(trace) << "[on_transaction_commited] Messages committed";
committed += current_batch;
current_batch = 0;
logger(trace) << "[on_transaction_committed] Messages committed";
processed += current_batch;
connection conn = sndr.connection();
if(committed == count) {
logger(trace) << "[on_transaction_commited] All messages committed";
if (processed == count) {
logger(trace) << "[on_transaction_committed] All messages processed";
conn.close();
}
else {
} else {
current_batch = 0;
cont->declare_transaction(conn, *this);
}
}

// TODO jak je to s temi override ?
// ?? python to tak ma ?? stejne jak commited
// void TxSenderHandler::on_transaction_aborted(transaction t) override {
void TxSenderHandler::on_transaction_aborted(transaction t) {
logger(debug) << "[on_transaction_aborted] Mesages Aborted";
current_batch = 0;
cont->declare_transaction(sndr.connection(), *this);
logger(trace) << "[on_transaction_aborted] Messages aborted";
processed += current_batch;
connection conn = sndr.connection();
if (processed == count) {
logger(trace) << "[on_transaction_aborted] All messages processed";
conn.close();
} else {
current_batch = 0;
cont->declare_transaction(conn, *this);
}
}

void TxSenderHandler::on_sender_close(sender &s) {
Expand Down
2 changes: 1 addition & 1 deletion src/api/qpid-proton/reactor/handler/TxSenderHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class TxSenderHandler : public CommonHandler, transaction_handler {
// transactions related variables
int batch_size = 0;
int current_batch = 0;
int committed = 0;
int processed = 0;
int confirmedSent = 0;
string tx_action = "commit";
string tx_endloop_action = "commit";
Expand Down

0 comments on commit ad9eae9

Please sign in to comment.