From bbee88783b02569b3bf428b0a5c6b0f55b10912f Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 14 Jan 2025 21:15:43 +0800 Subject: [PATCH] fix: move makeTopicMsg into basePub --- www/zmq/block_info_publisher.go | 13 ++--- www/zmq/block_info_publisher_test.go | 2 +- www/zmq/config.go | 2 +- www/zmq/config_test.go | 2 +- www/zmq/publisher.go | 37 ++++++++++++++ www/zmq/util.go | 30 ----------- www/zmq/util_test.go | 74 ---------------------------- 7 files changed, 44 insertions(+), 116 deletions(-) delete mode 100644 www/zmq/util.go delete mode 100644 www/zmq/util_test.go diff --git a/www/zmq/block_info_publisher.go b/www/zmq/block_info_publisher.go index 5e6e4d5a7..fa7659f07 100644 --- a/www/zmq/block_info_publisher.go +++ b/www/zmq/block_info_publisher.go @@ -7,7 +7,6 @@ import ( ) type blockInfoPub struct { - seqNo uint32 basePub } @@ -15,6 +14,7 @@ func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { return &blockInfoPub{ basePub: basePub{ topic: BlockInfo, + seqNo: 0, zmqSocket: socket, logger: logger, }, @@ -22,15 +22,11 @@ func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { } func (b *blockInfoPub) onNewBlock(blk *block.Block) { - seq := b.seqNo + 1 - - rawMsg := makeTopicMsg( - b.topic, + rawMsg := b.makeTopicMsg( blk.Header().ProposerAddress(), blk.Header().UnixTime(), uint16(len(blk.Transactions())), blk.Height(), - seq, ) message := zmq4.NewMsg(rawMsg) @@ -41,8 +37,7 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) { b.logger.Debug("zmq published message success", "publisher", b.TopicName(), - "block_height", blk.Height(), - ) + "block_height", blk.Height()) - b.seqNo = seq + b.seqNo++ } diff --git a/www/zmq/block_info_publisher_test.go b/www/zmq/block_info_publisher_test.go index dc10f45dc..0927c2b15 100644 --- a/www/zmq/block_info_publisher_test.go +++ b/www/zmq/block_info_publisher_test.go @@ -63,5 +63,5 @@ func TestBlockInfoPublisher(t *testing.T) { require.Equal(t, blk.Header().UnixTime(), timestamp) require.Equal(t, uint16(len(blk.Transactions())), txCount) require.Equal(t, blk.Height(), height) - require.Equal(t, uint32(1), seqNo) + require.Equal(t, uint32(0), seqNo) } diff --git a/www/zmq/config.go b/www/zmq/config.go index 2fd96a0e4..9d165c207 100644 --- a/www/zmq/config.go +++ b/www/zmq/config.go @@ -65,7 +65,7 @@ func (c *Config) BasicCheck() error { func validateTopicSocket(socket string) error { addr, err := url.Parse(socket) if err != nil { - return errors.New("failed to parse ZmqPub value: " + err.Error()) + return errors.New("failed to parse URL: " + err.Error()) } if addr.Scheme != "tcp" { diff --git a/www/zmq/config_test.go b/www/zmq/config_test.go index 26c2770be..85af31ee5 100644 --- a/www/zmq/config_test.go +++ b/www/zmq/config_test.go @@ -51,7 +51,7 @@ func TestBasicCheck(t *testing.T) { { name: "Empty host", config: &Config{ - ZmqPubBlockInfo: "tcp://:28332", + ZmqPubTxInfo: "tcp://:28332", }, expectErr: true, }, diff --git a/www/zmq/publisher.go b/www/zmq/publisher.go index 8999c86b4..f1db90776 100644 --- a/www/zmq/publisher.go +++ b/www/zmq/publisher.go @@ -1,7 +1,10 @@ package zmq import ( + "encoding/binary" + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/crypto" "github.com/pactus-project/pactus/types/block" "github.com/pactus-project/pactus/util/logger" ) @@ -15,6 +18,7 @@ type Publisher interface { type basePub struct { topic Topic + seqNo uint32 zmqSocket zmq4.Socket logger *logger.SubLogger } @@ -26,3 +30,36 @@ func (b *basePub) Address() string { func (b *basePub) TopicName() string { return b.topic.String() } + +// makeTopicMsg constructs a ZMQ message with a topic ID, message body, and sequence number. +// The message is constructed as a byte slice with the following structure: +// - Topic ID (2 Bytes) +// - Message body (varies based on provided parts) +// - Sequence number (4 Bytes). +func (b *basePub) makeTopicMsg(parts ...any) []byte { + result := make([]byte, 0, 64) + + // Append Topic ID to the message (2 Bytes) + result = append(result, b.topic.Bytes()...) + + // Append message body based on the provided parts + for _, part := range parts { + switch castedVal := part.(type) { + case crypto.Address: + result = append(result, castedVal.Bytes()...) + case []byte: + result = append(result, castedVal...) + case uint32: + result = binary.BigEndian.AppendUint32(result, castedVal) + case uint16: + result = binary.BigEndian.AppendUint16(result, castedVal) + default: + panic("implement me!!") + } + } + + // Append sequence number to the message (4 Bytes, Big Endian encoding) + result = binary.BigEndian.AppendUint32(result, b.seqNo) + + return result +} diff --git a/www/zmq/util.go b/www/zmq/util.go deleted file mode 100644 index f64832a6a..000000000 --- a/www/zmq/util.go +++ /dev/null @@ -1,30 +0,0 @@ -package zmq - -import ( - "encoding/binary" - - "github.com/pactus-project/pactus/crypto" -) - -func makeTopicMsg(parts ...any) []byte { - result := make([]byte, 0, 64) - - for _, part := range parts { - switch castedVal := part.(type) { - case crypto.Address: - result = append(result, castedVal.Bytes()...) - case Topic: - result = append(result, castedVal.Bytes()...) - case []byte: - result = append(result, castedVal...) - case uint32: - result = binary.BigEndian.AppendUint32(result, castedVal) - case uint16: - result = binary.BigEndian.AppendUint16(result, castedVal) - default: - panic("implement me!!") - } - } - - return result -} diff --git a/www/zmq/util_test.go b/www/zmq/util_test.go deleted file mode 100644 index 08e4ef470..000000000 --- a/www/zmq/util_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package zmq - -import ( - "encoding/binary" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMakeTopicMsg(t *testing.T) { - td := setup(t) - - _, addr := td.TestSuite.GenerateTestAccount() - randHeight := td.TestSuite.RandHeight() - - tests := []struct { - name string - parts []any - want []byte - wantPanic bool - }{ - { - name: "single crypto.Address", - parts: []any{addr}, - want: addr.Bytes(), - }, - { - name: "single Topic", - parts: []any{BlockInfo}, - want: BlockInfo.Bytes(), - }, - { - name: "uint32 value", - parts: []any{randHeight}, - want: binary.BigEndian.AppendUint32([]byte{}, randHeight), - }, - { - name: "uint16 value", - parts: []any{uint16(0x0506)}, - want: binary.BigEndian.AppendUint16([]byte{}, 0x0506), - }, - { - name: "multiple types", - parts: []any{addr, BlockInfo, uint32(0x0A0B0C0D), []byte{0x0E}}, - want: func() []byte { - b := addr.Bytes() - b = append(b, BlockInfo.Bytes()...) - b = binary.BigEndian.AppendUint32(b, 0x0A0B0C0D) - b = append(b, 0x0E) - - return b - }(), - }, - { - name: "unknown type", - parts: []any{"unknown"}, - wantPanic: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.wantPanic { - assert.Panics(t, func() { - makeTopicMsg(tt.parts...) - }) - - return - } - got := makeTopicMsg(tt.parts...) - assert.Equal(t, tt.want, got) - }) - } -}