Skip to content

Commit

Permalink
fixups and loging
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Jan 16, 2025
1 parent 94636b7 commit 31e1fe0
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 78 deletions.
4 changes: 2 additions & 2 deletions src/api/qpid-proton/reactor/TxReceivingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ int TxReceivingClient::run(int argc, char **argv) const
tx_action = options["tx-action"];
}

string tx_endloop_action = "commit";
string tx_endloop_action = "none";
if (options.is_set("tx-endloop-action")) {
tx_endloop_action = options["tx-endloop-action"];
}
Expand Down Expand Up @@ -398,7 +398,7 @@ int TxReceivingClient::run(int argc, char **argv) const
handler.setSelector(selector);
}

int tx_size = 1;
int tx_size = 0;
if (options.is_set("tx-size")) {
tx_size = static_cast<int> (options.get("tx-size"));
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/qpid-proton/reactor/TxSendingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ int TxSendingClient::run(int argc, char **argv) const
tx_action = options["tx-action"];
}

string tx_endloop_action = "commit";
string tx_endloop_action = "none";
if (options.is_set("tx-endloop-action")) {
tx_endloop_action = options["tx-endloop-action"];
}
Expand Down Expand Up @@ -575,7 +575,7 @@ int TxSendingClient::run(int argc, char **argv) const
}
handler.setCount(count);

int tx_size = 1;
int tx_size = 0;
if (options.is_set("tx-size")) {
tx_size = static_cast<int> (options.get("tx-size"));
}
Expand Down
26 changes: 23 additions & 3 deletions src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,21 @@ void TxReceiverHandler::on_transaction_declared(transaction t) {
void TxReceiverHandler::on_transaction_aborted(transaction t) {
confirmed += current_batch;
logger(debug) << "[on_transaction_aborted] messages aborted";
if(confirmed == count) {
logger(info) << "[on_transaction_committed] All messages proccessed";
t.connection().close();
}
else {
sess.declare_transaction(*this);
}
}

void TxReceiverHandler::on_transaction_committed(transaction t) {
confirmed += current_batch;
current_batch = 0;
logger(trace) << "[on_transaction_committed] Committed:"<< confirmed;
if(confirmed == count) {
logger(info) << "[on_transaction_committed] All messages committed";
logger(info) << "[on_transaction_committed] All messages proccessed";
t.connection().close();
}
else {
Expand Down Expand Up @@ -461,6 +468,7 @@ void TxReceiverHandler::on_container_start(container &c)

void TxReceiverHandler::on_message(delivery &d, message &m)
{
// TODO useless now ?? usew confirmed ?
msg_received_cnt += 1;

logger(debug) << "Processing received message";
Expand Down Expand Up @@ -534,11 +542,23 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
tx.accept(d);
current_batch += 1;
logger(debug) << "[on_message] current batch: " << current_batch;
if(current_batch == batch_size) {
if (confirmed + current_batch == count) {
logger(debug) << "[on_message] Transaction attempt (endloop): " << tx_endloop_action;
if (tx_endloop_action == "commit") {
tx.commit();
} else if (tx_endloop_action == "rollback") {
tx.abort();
}
} else if(current_batch == batch_size) {
//tx = transaction(); // null
// TODO: I think we should do a commit here ! rakhi is not doing
logger(debug) << "[on_message] messages commited: " << current_batch;
tx.commit();
if (tx_endloop_action == "commit") {
tx.commit();
} else if (tx_endloop_action == "rollback") {
tx.abort();
}
// Rakhi ?
//sess.declare_transaction(*this);
}
}
Expand Down
138 changes: 69 additions & 69 deletions src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ TxSenderHandler::TxSenderHandler(
count(1),
duration_time(duration_time),
duration_mode(duration_mode),
batch_size(1),
batch_size(0),
current_batch(0),
processed(0),
confirmedSent(0),
tx_action(tx_action),
tx_endloop_action(tx_endloop_action),
m(),
Expand Down Expand Up @@ -152,7 +151,7 @@ message TxSenderHandler::getMessage() const
}

void TxSenderHandler::checkIfCanSend() {
if (confirmedSent < count) {
if (processed < count) {
work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));

if (sndr.credit() > 0) {
Expand All @@ -165,41 +164,40 @@ void TxSenderHandler::checkIfCanSend() {

void TxSenderHandler::send()
{
logger(debug) << "Preparing to send message";
logger(debug) << "[send] Preparing to send message";
int credit = sndr.credit();

if (credit == 0) {
logger(warning) << "There not enough credit to send messages";
logger(warning) << "[send] There not enough credit to send messages";
}

logger(debug) << "The handler has enough credit to send " << credit
logger(debug) << "[send] The handler has enough credit to send " << credit
<< " message" << (credit > 1 ? "s" : "" );

logger(trace) << "Sending messages through the link";
logger(trace) << "[send] Sending messages through the link";

message message_to_send = message(m);

try {
if (get<string>(message_to_send.body()).find("%d") != string::npos) {
size_t percent_position = get<string>(message_to_send.body()).find("%d");
stringstream ss;
ss << confirmedSent;
ss << processed;
string replaced_number = get<string>(message_to_send.body()).replace(percent_position, 2, ss.str());
message_to_send.body(replaced_number);
}
} catch (conversion_error &) {
}

logger(trace) << "Transaction variables";
logger(trace) << "confirmed: " << confirmedSent;
logger(trace) << "processed: " << processed;
logger(trace) << "count: " << count;
logger(trace) << "is_empty: " << tx.is_empty();
logger(trace) << "current_batch: " << current_batch;

logger(trace) << "[send] Transaction is empty: " << tx.is_empty();
logger(trace) << "[send] Current_batch: " << sndr.credit();
logger(debug) << "[send] Messages processed: " << processed;
logger(trace) << "[send] Current batch: " << current_batch;
while (!tx.is_empty() && sndr.credit() && (processed + current_batch) < count)
{
tx.send(sndr, message_to_send);
current_batch += 1;

if (log_msgs == "dict") {
ReactorDecoder decoder = ReactorDecoder(message_to_send);
Expand All @@ -222,34 +220,36 @@ void TxSenderHandler::send()
// TODO: Not implemented yet
}

confirmedSent++;

current_batch += 1;

logger(trace) << "Transaction variables";
logger(trace) << "confirmed: " << confirmedSent;
logger(trace) << "processed: " << processed;
logger(trace) << "count: " << count;
logger(trace) << "is_empty: " << tx.is_empty();
logger(trace) << "current_batch: " << current_batch;
logger(debug) << "[send] Messages processed: " << processed;
logger(debug) << "[send] Current batch: " << current_batch;

if (confirmedSent + current_batch == count) {
logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action;
if (tx_endloop_action == "commit") {
if(current_batch == batch_size) {
logger(debug) << "[send] Transaction attempt: " << tx_action;
if (tx_action == "commit") {
tx.commit();
} else if (tx_endloop_action == "rollback") {
} else if (tx_action == "rollback") {
tx.abort();
}
sndr.connection().close();
} else if(current_batch == batch_size) {
logger(debug) << "[send] Transaction attempt: " << tx_action;
tx = transaction(); // null ?

if (tx_action == "none") {
if (processed + current_batch == count) {
sndr.connection().close();
} else {
processed += current_batch;
current_batch = 0;
sess.declare_transaction(*this);
}
}
} else if (processed + current_batch == count) {
logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action;
if (tx_endloop_action == "commit") {
tx.commit();
} else if (tx_endloop_action == "rollback") {
tx.abort();
}
tx = transaction();
}
sndr.connection().close();
}
}

#if defined(__REACTOR_HAS_TIMER)
Expand All @@ -259,19 +259,20 @@ void TxSenderHandler::send()
}

// reactor methods
void TxSenderHandler::on_sendable(sender &s)
{
logger(trace) << "[on_sendable] transaction: " << &tx;
if (ready) {
send();
}
}

// TODO I guess not needed
// void TxSenderHandler::on_sendable(sender &s)
// {
// logger(trace) << "[on_sendable] transaction: " << &tx;
// if (ready) {
// send();
// }
// }


void TxSenderHandler::on_tracker_accept(tracker &t)
{
confirmedSent += 1;
logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << confirmedSent;
logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << processed;
}

void TxSenderHandler::on_tracker_reject(tracker &t)
Expand All @@ -281,15 +282,15 @@ void TxSenderHandler::on_tracker_reject(tracker &t)
}

void TxSenderHandler::on_transport_error(transport &t) {
logger(error) << "The connection with " << broker_url.getHost() << ":" << broker_url.getPort() << " was interrupted: " << t.error().what();
logger(error) << "[on_transport_error] The connection with " << broker_url.getHost() << ":" << broker_url.getPort() << " was interrupted: " << t.error().what();

if (t.error().what().find("unauthorized") != string::npos) {
exit(1);
}
}

void TxSenderHandler::on_transport_close(transport &t) {
logger(debug) << "Closing the transport";
logger(debug) << "[on_transport_close] Closing the transport";

if (conn_reconnect == "false") {
exit(1);
Expand All @@ -298,13 +299,8 @@ void TxSenderHandler::on_transport_close(transport &t) {

void TxSenderHandler::on_connection_close(connection &c)
{
logger(debug) << "Closing connection";
logger(info) << "Transactions status";
logger(info) << "Transaction batch size: " << batch_size;
logger(info) << "Transaction current batch: " << current_batch;
logger(info) << "Transaction confirmed: " << confirmedSent;
logger(info) << "Transaction processed: " << processed;
current_batch = 0;
logger(debug) << "[on_connection_close] Closing connection";
}

void TxSenderHandler::on_connection_error(connection &c)
Expand All @@ -327,10 +323,12 @@ void TxSenderHandler::on_transaction_declared(transaction t) {
void TxSenderHandler::on_transaction_committed(transaction t) {
logger(trace) << "[on_transaction_committed] Messages committed";
processed += current_batch;
logger(debug) << "[on_transaction_committed] Messages processed" << processed;
if (processed == count) {
logger(trace) << "[on_transaction_committed] All messages processed";
t.connection().close();
} else {
logger(trace) << "[on_transaction_committed] Declaring new transaction";
current_batch = 0;
sess.declare_transaction(*this);
}
Expand All @@ -339,10 +337,12 @@ void TxSenderHandler::on_transaction_committed(transaction t) {
void TxSenderHandler::on_transaction_aborted(transaction t) {
logger(trace) << "[on_transaction_aborted] Messages aborted";
processed += current_batch;
logger(debug) << "[on_transaction_committed] Messages processed" << processed;
if (processed == count) {
logger(trace) << "[on_transaction_aborted] All messages processed";
t.connection().close();
} else {
logger(trace) << "[on_transaction_committed] Declaring new transaction";
current_batch = 0;
sess.declare_transaction(*this);
}
Expand All @@ -361,26 +361,26 @@ void TxSenderHandler::on_session_open(session &s) {

void TxSenderHandler::on_container_start(container &c)
{
logger(debug) << "Starting messaging handler";

logger(debug) << "User: " << user;
logger(debug) << "Password: " << password;
logger(debug) << "SASL mechanisms: " << sasl_mechanisms;
logger(debug) << "SASL enabled: " << conn_sasl_enabled;

logger(debug) << "Maximum frame size: " << max_frame_size;

logger(info) << "Transaction batch size" << batch_size;

logger(debug) << "Topic: " << is_topic;
logger(debug) << "[on_container_start] Starting messaging transaction handler";
logger(debug) << "[on_container_start] User: " << user;
logger(debug) << "[on_container_start] Password: " << password;
logger(debug) << "[on_container_start] SASL mechanisms: " << sasl_mechanisms;
logger(debug) << "[on_container_start] SASL enabled: " << conn_sasl_enabled;
logger(debug) << "[on_container_start] Maximum frame size: " << max_frame_size;
logger(debug) << "[on_container_start] Topic: " << is_topic;
logger(debug) << "[on_container_start] Transaction batch size: " << batch_size;
logger(debug) << "[on_container_start] Transaction action: " << tx_action;
logger(debug) << "[on_container_start] Transaction endloop action: " << tx_endloop_action;
logger(trace) << "[on_container_start] Messages count: " << count;
logger(debug) << "[on_container_start] Messages Processed: " << processed;

std::vector< ::proton::symbol > caps;

if (is_topic) {
caps.push_back("topic");
}

logger(debug) << "Source capabilities: ";
logger(debug) << "[on_container_start] Source capabilities: ";
for (std::vector< ::proton::symbol >::const_iterator i = caps.begin(); i != caps.end(); ++i) {
logger(debug) << *i;
}
Expand All @@ -400,21 +400,21 @@ void TxSenderHandler::on_container_start(container &c)
// conn_opts.max_frame_size(max_frame_size);
conn_opts.failover_urls(conn_urls);

logger(debug) << "Setting a reconnect timer: " << conn_reconnect;
logger(debug) << "Custom reconnect: " << conn_reconnect_custom;
logger(debug) << "[on_container_start] Setting a reconnect timer: " << conn_reconnect;
logger(debug) << "[on_container_start] Custom reconnect: " << conn_reconnect_custom;

configure_reconnect(conn_opts);
configure_ssl(c);

if (conn_heartbeat != 0) {
logger(debug) << "Heartbeat: " << conn_heartbeat;
logger(debug) << "[on_container_start] Heartbeat: " << conn_heartbeat;

duration heartbeat_seconds = conn_heartbeat * duration::SECOND;

conn_opts.idle_timeout(heartbeat_seconds);
}

logger(debug) << "Creating a sender";
logger(debug) << "[on_container_start] Creating a sender";

connection conn;
if (conn_use_config_file) {
Expand All @@ -433,12 +433,12 @@ void TxSenderHandler::on_container_start(container &c)

work_q = &sndr.work_queue();

logger(trace) << "Setting up timer";
logger(trace) << "[on_container_start] Setting up timer";

if (duration_time > 0 && count > 0) {
interval = duration((duration_time * duration::SECOND) / count);

logger(trace) << "Interval for duration: " << interval.milliseconds() << " ms";
logger(trace) << "[on_container_start] Interval for duration: " << interval.milliseconds() << " ms";
}
#if defined(__REACTOR_HAS_TIMER)
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));
Expand Down
Loading

0 comments on commit 31e1fe0

Please sign in to comment.