diff --git a/src/api/qpid-proton/reactor/TxReceivingClient.cpp b/src/api/qpid-proton/reactor/TxReceivingClient.cpp index 88ee27c..12dea9c 100644 --- a/src/api/qpid-proton/reactor/TxReceivingClient.cpp +++ b/src/api/qpid-proton/reactor/TxReceivingClient.cpp @@ -344,7 +344,7 @@ int TxReceivingClient::run(int argc, char **argv) const tx_action = options["tx-action"]; } - string tx_endloop_action = "commit"; + string tx_endloop_action = "none"; if (options.is_set("tx-endloop-action")) { tx_endloop_action = options["tx-endloop-action"]; } @@ -398,7 +398,7 @@ int TxReceivingClient::run(int argc, char **argv) const handler.setSelector(selector); } - int tx_size = 1; + int tx_size = 0; if (options.is_set("tx-size")) { tx_size = static_cast (options.get("tx-size")); } diff --git a/src/api/qpid-proton/reactor/TxSendingClient.cpp b/src/api/qpid-proton/reactor/TxSendingClient.cpp index e6bf7a5..3be4350 100644 --- a/src/api/qpid-proton/reactor/TxSendingClient.cpp +++ b/src/api/qpid-proton/reactor/TxSendingClient.cpp @@ -494,7 +494,7 @@ int TxSendingClient::run(int argc, char **argv) const tx_action = options["tx-action"]; } - string tx_endloop_action = "commit"; + string tx_endloop_action = "none"; if (options.is_set("tx-endloop-action")) { tx_endloop_action = options["tx-endloop-action"]; } @@ -575,7 +575,7 @@ int TxSendingClient::run(int argc, char **argv) const } handler.setCount(count); - int tx_size = 1; + int tx_size = 0; if (options.is_set("tx-size")) { tx_size = static_cast (options.get("tx-size")); } diff --git a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp index 635a3cb..5475b12 100644 --- a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp +++ b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp @@ -266,6 +266,13 @@ void TxReceiverHandler::on_transaction_declared(transaction t) { void TxReceiverHandler::on_transaction_aborted(transaction t) { confirmed += current_batch; logger(debug) << "[on_transaction_aborted] messages aborted"; + if(confirmed == count) { + logger(info) << "[on_transaction_committed] All messages proccessed"; + t.connection().close(); + } + else { + sess.declare_transaction(*this); + } } void TxReceiverHandler::on_transaction_committed(transaction t) { @@ -273,7 +280,7 @@ void TxReceiverHandler::on_transaction_committed(transaction t) { current_batch = 0; logger(trace) << "[on_transaction_committed] Committed:"<< confirmed; if(confirmed == count) { - logger(info) << "[on_transaction_committed] All messages committed"; + logger(info) << "[on_transaction_committed] All messages proccessed"; t.connection().close(); } else { @@ -461,6 +468,7 @@ void TxReceiverHandler::on_container_start(container &c) void TxReceiverHandler::on_message(delivery &d, message &m) { + // TODO useless now ?? usew confirmed ? msg_received_cnt += 1; logger(debug) << "Processing received message"; @@ -534,11 +542,23 @@ void TxReceiverHandler::on_message(delivery &d, message &m) tx.accept(d); current_batch += 1; logger(debug) << "[on_message] current batch: " << current_batch; - if(current_batch == batch_size) { + if (confirmed + current_batch == count) { + logger(debug) << "[on_message] Transaction attempt (endloop): " << tx_endloop_action; + if (tx_endloop_action == "commit") { + tx.commit(); + } else if (tx_endloop_action == "rollback") { + tx.abort(); + } + } else if(current_batch == batch_size) { //tx = transaction(); // null // TODO: I think we should do a commit here ! rakhi is not doing logger(debug) << "[on_message] messages commited: " << current_batch; - tx.commit(); + if (tx_endloop_action == "commit") { + tx.commit(); + } else if (tx_endloop_action == "rollback") { + tx.abort(); + } + // Rakhi ? //sess.declare_transaction(*this); } } diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp index e4da818..2fda473 100644 --- a/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp @@ -88,10 +88,9 @@ TxSenderHandler::TxSenderHandler( count(1), duration_time(duration_time), duration_mode(duration_mode), - batch_size(1), + batch_size(0), current_batch(0), processed(0), - confirmedSent(0), tx_action(tx_action), tx_endloop_action(tx_endloop_action), m(), @@ -152,7 +151,7 @@ message TxSenderHandler::getMessage() const } void TxSenderHandler::checkIfCanSend() { - if (confirmedSent < count) { + if (processed < count) { work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this)); if (sndr.credit() > 0) { @@ -165,17 +164,17 @@ void TxSenderHandler::checkIfCanSend() { void TxSenderHandler::send() { - logger(debug) << "Preparing to send message"; + logger(debug) << "[send] Preparing to send message"; int credit = sndr.credit(); if (credit == 0) { - logger(warning) << "There not enough credit to send messages"; + logger(warning) << "[send] There not enough credit to send messages"; } - logger(debug) << "The handler has enough credit to send " << credit + logger(debug) << "[send] The handler has enough credit to send " << credit << " message" << (credit > 1 ? "s" : "" ); - logger(trace) << "Sending messages through the link"; + logger(trace) << "[send] Sending messages through the link"; message message_to_send = message(m); @@ -183,23 +182,22 @@ void TxSenderHandler::send() if (get(message_to_send.body()).find("%d") != string::npos) { size_t percent_position = get(message_to_send.body()).find("%d"); stringstream ss; - ss << confirmedSent; + ss << processed; string replaced_number = get(message_to_send.body()).replace(percent_position, 2, ss.str()); message_to_send.body(replaced_number); } } catch (conversion_error &) { } - logger(trace) << "Transaction variables"; - logger(trace) << "confirmed: " << confirmedSent; - logger(trace) << "processed: " << processed; - logger(trace) << "count: " << count; - logger(trace) << "is_empty: " << tx.is_empty(); - logger(trace) << "current_batch: " << current_batch; + logger(trace) << "[send] Transaction is empty: " << tx.is_empty(); + logger(trace) << "[send] Current_batch: " << sndr.credit(); + logger(debug) << "[send] Messages processed: " << processed; + logger(trace) << "[send] Current batch: " << current_batch; while (!tx.is_empty() && sndr.credit() && (processed + current_batch) < count) { tx.send(sndr, message_to_send); + current_batch += 1; if (log_msgs == "dict") { ReactorDecoder decoder = ReactorDecoder(message_to_send); @@ -222,34 +220,36 @@ void TxSenderHandler::send() // TODO: Not implemented yet } - confirmedSent++; - - current_batch += 1; - - logger(trace) << "Transaction variables"; - logger(trace) << "confirmed: " << confirmedSent; - logger(trace) << "processed: " << processed; - logger(trace) << "count: " << count; - logger(trace) << "is_empty: " << tx.is_empty(); - logger(trace) << "current_batch: " << current_batch; + logger(debug) << "[send] Messages processed: " << processed; + logger(debug) << "[send] Current batch: " << current_batch; - if (confirmedSent + current_batch == count) { - logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action; - if (tx_endloop_action == "commit") { + if(current_batch == batch_size) { + logger(debug) << "[send] Transaction attempt: " << tx_action; + if (tx_action == "commit") { tx.commit(); - } else if (tx_endloop_action == "rollback") { + } else if (tx_action == "rollback") { tx.abort(); } - sndr.connection().close(); - } else if(current_batch == batch_size) { - logger(debug) << "[send] Transaction attempt: " << tx_action; + tx = transaction(); // null ? + + if (tx_action == "none") { + if (processed + current_batch == count) { + sndr.connection().close(); + } else { + processed += current_batch; + current_batch = 0; + sess.declare_transaction(*this); + } + } + } else if (processed + current_batch == count) { + logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action; if (tx_endloop_action == "commit") { tx.commit(); } else if (tx_endloop_action == "rollback") { tx.abort(); } - tx = transaction(); - } + sndr.connection().close(); + } } #if defined(__REACTOR_HAS_TIMER) @@ -259,19 +259,20 @@ void TxSenderHandler::send() } // reactor methods -void TxSenderHandler::on_sendable(sender &s) -{ - logger(trace) << "[on_sendable] transaction: " << &tx; - if (ready) { - send(); - } -} + +// TODO I guess not needed +// void TxSenderHandler::on_sendable(sender &s) +// { +// logger(trace) << "[on_sendable] transaction: " << &tx; +// if (ready) { +// send(); +// } +// } void TxSenderHandler::on_tracker_accept(tracker &t) { - confirmedSent += 1; - logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << confirmedSent; + logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << processed; } void TxSenderHandler::on_tracker_reject(tracker &t) @@ -281,7 +282,7 @@ void TxSenderHandler::on_tracker_reject(tracker &t) } void TxSenderHandler::on_transport_error(transport &t) { - logger(error) << "The connection with " << broker_url.getHost() << ":" << broker_url.getPort() << " was interrupted: " << t.error().what(); + logger(error) << "[on_transport_error] The connection with " << broker_url.getHost() << ":" << broker_url.getPort() << " was interrupted: " << t.error().what(); if (t.error().what().find("unauthorized") != string::npos) { exit(1); @@ -289,7 +290,7 @@ void TxSenderHandler::on_transport_error(transport &t) { } void TxSenderHandler::on_transport_close(transport &t) { - logger(debug) << "Closing the transport"; + logger(debug) << "[on_transport_close] Closing the transport"; if (conn_reconnect == "false") { exit(1); @@ -298,13 +299,8 @@ void TxSenderHandler::on_transport_close(transport &t) { void TxSenderHandler::on_connection_close(connection &c) { - logger(debug) << "Closing connection"; - logger(info) << "Transactions status"; - logger(info) << "Transaction batch size: " << batch_size; - logger(info) << "Transaction current batch: " << current_batch; - logger(info) << "Transaction confirmed: " << confirmedSent; - logger(info) << "Transaction processed: " << processed; current_batch = 0; + logger(debug) << "[on_connection_close] Closing connection"; } void TxSenderHandler::on_connection_error(connection &c) @@ -327,10 +323,12 @@ void TxSenderHandler::on_transaction_declared(transaction t) { void TxSenderHandler::on_transaction_committed(transaction t) { logger(trace) << "[on_transaction_committed] Messages committed"; processed += current_batch; + logger(debug) << "[on_transaction_committed] Messages processed" << processed; if (processed == count) { logger(trace) << "[on_transaction_committed] All messages processed"; t.connection().close(); } else { + logger(trace) << "[on_transaction_committed] Declaring new transaction"; current_batch = 0; sess.declare_transaction(*this); } @@ -339,10 +337,12 @@ void TxSenderHandler::on_transaction_committed(transaction t) { void TxSenderHandler::on_transaction_aborted(transaction t) { logger(trace) << "[on_transaction_aborted] Messages aborted"; processed += current_batch; + logger(debug) << "[on_transaction_committed] Messages processed" << processed; if (processed == count) { logger(trace) << "[on_transaction_aborted] All messages processed"; t.connection().close(); } else { + logger(trace) << "[on_transaction_committed] Declaring new transaction"; current_batch = 0; sess.declare_transaction(*this); } @@ -361,18 +361,18 @@ void TxSenderHandler::on_session_open(session &s) { void TxSenderHandler::on_container_start(container &c) { - logger(debug) << "Starting messaging handler"; - - logger(debug) << "User: " << user; - logger(debug) << "Password: " << password; - logger(debug) << "SASL mechanisms: " << sasl_mechanisms; - logger(debug) << "SASL enabled: " << conn_sasl_enabled; - - logger(debug) << "Maximum frame size: " << max_frame_size; - - logger(info) << "Transaction batch size" << batch_size; - - logger(debug) << "Topic: " << is_topic; + logger(debug) << "[on_container_start] Starting messaging transaction handler"; + logger(debug) << "[on_container_start] User: " << user; + logger(debug) << "[on_container_start] Password: " << password; + logger(debug) << "[on_container_start] SASL mechanisms: " << sasl_mechanisms; + logger(debug) << "[on_container_start] SASL enabled: " << conn_sasl_enabled; + logger(debug) << "[on_container_start] Maximum frame size: " << max_frame_size; + logger(debug) << "[on_container_start] Topic: " << is_topic; + logger(debug) << "[on_container_start] Transaction batch size: " << batch_size; + logger(debug) << "[on_container_start] Transaction action: " << tx_action; + logger(debug) << "[on_container_start] Transaction endloop action: " << tx_endloop_action; + logger(trace) << "[on_container_start] Messages count: " << count; + logger(debug) << "[on_container_start] Messages Processed: " << processed; std::vector< ::proton::symbol > caps; @@ -380,7 +380,7 @@ void TxSenderHandler::on_container_start(container &c) caps.push_back("topic"); } - logger(debug) << "Source capabilities: "; + logger(debug) << "[on_container_start] Source capabilities: "; for (std::vector< ::proton::symbol >::const_iterator i = caps.begin(); i != caps.end(); ++i) { logger(debug) << *i; } @@ -400,21 +400,21 @@ void TxSenderHandler::on_container_start(container &c) // conn_opts.max_frame_size(max_frame_size); conn_opts.failover_urls(conn_urls); - logger(debug) << "Setting a reconnect timer: " << conn_reconnect; - logger(debug) << "Custom reconnect: " << conn_reconnect_custom; + logger(debug) << "[on_container_start] Setting a reconnect timer: " << conn_reconnect; + logger(debug) << "[on_container_start] Custom reconnect: " << conn_reconnect_custom; configure_reconnect(conn_opts); configure_ssl(c); if (conn_heartbeat != 0) { - logger(debug) << "Heartbeat: " << conn_heartbeat; + logger(debug) << "[on_container_start] Heartbeat: " << conn_heartbeat; duration heartbeat_seconds = conn_heartbeat * duration::SECOND; conn_opts.idle_timeout(heartbeat_seconds); } - logger(debug) << "Creating a sender"; + logger(debug) << "[on_container_start] Creating a sender"; connection conn; if (conn_use_config_file) { @@ -433,12 +433,12 @@ void TxSenderHandler::on_container_start(container &c) work_q = &sndr.work_queue(); - logger(trace) << "Setting up timer"; + logger(trace) << "[on_container_start] Setting up timer"; if (duration_time > 0 && count > 0) { interval = duration((duration_time * duration::SECOND) / count); - logger(trace) << "Interval for duration: " << interval.milliseconds() << " ms"; + logger(trace) << "[on_container_start] Interval for duration: " << interval.milliseconds() << " ms"; } #if defined(__REACTOR_HAS_TIMER) work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this)); diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.h b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h index cbec183..15db772 100644 --- a/src/api/qpid-proton/reactor/handler/TxSenderHandler.h +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h @@ -165,7 +165,7 @@ class TxSenderHandler : public CommonHandler, transaction_handler { // common reactor methods void on_container_start(container &c); void on_session_open(session &s); - void on_sendable(sender &s); +// void on_sendable(sender &s); void on_tracker_accept(tracker &t); void on_tracker_reject(tracker &t); void on_transport_error(transport &t); @@ -191,7 +191,6 @@ class TxSenderHandler : public CommonHandler, transaction_handler { int batch_size = 0; int current_batch = 0; int processed = 0; - int confirmedSent = 0; string tx_action = "commit"; string tx_endloop_action = "commit";