diff --git a/www/zmq/publisher.go b/www/zmq/publisher.go index bfa3147be..b59b02ad6 100644 --- a/www/zmq/publisher.go +++ b/www/zmq/publisher.go @@ -44,7 +44,7 @@ func (b *basePub) HWM() int { // - Message body (varies based on provided parts) // - Sequence number (4 Bytes). func (b *basePub) makeTopicMsg(parts ...any) []byte { - result := make([]byte, 0, 64) + result := make([]byte, 0) // Append Topic ID to the message (2 Bytes) result = append(result, b.topic.Bytes()...) diff --git a/www/zmq/publisher_tx_info.go b/www/zmq/publisher_tx_info.go index a56350314..f05116357 100644 --- a/www/zmq/publisher_tx_info.go +++ b/www/zmq/publisher_tx_info.go @@ -20,7 +20,23 @@ func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { } } -func (*txInfoPub) onNewBlock(_ *block.Block) { - // TODO implement me - panic("implement me") +func (t *txInfoPub) onNewBlock(blk *block.Block) { + for _, txn := range blk.Transactions() { + rawMsg := t.makeTopicMsg(txn.ID().Bytes(), blk.Height()) + message := zmq4.NewMsg(rawMsg) + + if err := t.zmqSocket.Send(message); err != nil { + t.logger.Error("zmq publish message error", "err", err, "publisher", t.TopicName()) + + continue + } + + t.logger.Debug("zmq published message success", + "publisher", t.TopicName(), + "block_height", blk.Height(), + "tx_hash", txn.ID().String(), + ) + + t.seqNo++ + } } diff --git a/www/zmq/publisher_tx_info_test.go b/www/zmq/publisher_tx_info_test.go new file mode 100644 index 000000000..2265debdf --- /dev/null +++ b/www/zmq/publisher_tx_info_test.go @@ -0,0 +1,59 @@ +package zmq + +import ( + "context" + "encoding/binary" + "fmt" + "testing" + + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/require" +) + +func TestTxInfoPublisher(t *testing.T) { + port := testsuite.FindFreePort() + addr := fmt.Sprintf("tcp://localhost:%d", port) + conf := DefaultConfig() + conf.ZmqPubTxInfo = addr + + td := setup(t, conf) + defer td.closeServer() + + td.server.Publishers() + + sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false)) + + err := sub.Dial(addr) + require.NoError(t, err) + + err = sub.SetOption(zmq4.OptionSubscribe, string(TopicTransactionInfo.Bytes())) + require.NoError(t, err) + + blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight()) + + td.eventCh <- blk + + for i := 0; i < len(blk.Transactions()); i++ { + received, err := sub.Recv() + require.NoError(t, err) + + require.NotNil(t, received.Frames) + require.GreaterOrEqual(t, len(received.Frames), 1) + + msg := received.Frames[0] + require.Len(t, msg, 42) + + topic := msg[:2] + txHash := msg[2:34] + height := binary.BigEndian.Uint32(msg[34:38]) + seqNo := binary.BigEndian.Uint32(msg[38:]) + + require.Equal(t, TopicTransactionInfo.Bytes(), topic) + require.Equal(t, blk.Transactions()[i].ID().Bytes(), txHash) + require.Equal(t, blk.Height(), height) + require.Equal(t, uint32(i), seqNo) + } + + require.NoError(t, sub.Close()) +}