Skip to content

Commit

Permalink
fix: move makeTopicMsg into basePub
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Jan 14, 2025
1 parent d33412a commit bbee887
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 116 deletions.
13 changes: 4 additions & 9 deletions www/zmq/block_info_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,26 @@ import (
)

type blockInfoPub struct {
seqNo uint32
basePub
}

func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &blockInfoPub{
basePub: basePub{
topic: BlockInfo,
seqNo: 0,
zmqSocket: socket,
logger: logger,
},
}
}

func (b *blockInfoPub) onNewBlock(blk *block.Block) {
seq := b.seqNo + 1

rawMsg := makeTopicMsg(
b.topic,
rawMsg := b.makeTopicMsg(
blk.Header().ProposerAddress(),
blk.Header().UnixTime(),
uint16(len(blk.Transactions())),
blk.Height(),
seq,
)

message := zmq4.NewMsg(rawMsg)
Expand All @@ -41,8 +37,7 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) {

b.logger.Debug("zmq published message success",
"publisher", b.TopicName(),
"block_height", blk.Height(),
)
"block_height", blk.Height())

b.seqNo = seq
b.seqNo++
}
2 changes: 1 addition & 1 deletion www/zmq/block_info_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ func TestBlockInfoPublisher(t *testing.T) {
require.Equal(t, blk.Header().UnixTime(), timestamp)
require.Equal(t, uint16(len(blk.Transactions())), txCount)
require.Equal(t, blk.Height(), height)
require.Equal(t, uint32(1), seqNo)
require.Equal(t, uint32(0), seqNo)
}
2 changes: 1 addition & 1 deletion www/zmq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *Config) BasicCheck() error {
func validateTopicSocket(socket string) error {
addr, err := url.Parse(socket)
if err != nil {
return errors.New("failed to parse ZmqPub value: " + err.Error())
return errors.New("failed to parse URL: " + err.Error())
}

if addr.Scheme != "tcp" {
Expand Down
2 changes: 1 addition & 1 deletion www/zmq/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBasicCheck(t *testing.T) {
{
name: "Empty host",
config: &Config{
ZmqPubBlockInfo: "tcp://:28332",
ZmqPubTxInfo: "tcp://:28332",
},
expectErr: true,
},
Expand Down
37 changes: 37 additions & 0 deletions www/zmq/publisher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package zmq

import (
"encoding/binary"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)
Expand All @@ -15,6 +18,7 @@ type Publisher interface {

type basePub struct {
topic Topic
seqNo uint32
zmqSocket zmq4.Socket
logger *logger.SubLogger
}
Expand All @@ -26,3 +30,36 @@ func (b *basePub) Address() string {
func (b *basePub) TopicName() string {
return b.topic.String()
}

// makeTopicMsg constructs a ZMQ message with a topic ID, message body, and sequence number.
// The message is constructed as a byte slice with the following structure:
// - Topic ID (2 Bytes)
// - Message body (varies based on provided parts)
// - Sequence number (4 Bytes).
func (b *basePub) makeTopicMsg(parts ...any) []byte {
result := make([]byte, 0, 64)

// Append Topic ID to the message (2 Bytes)
result = append(result, b.topic.Bytes()...)

// Append message body based on the provided parts
for _, part := range parts {
switch castedVal := part.(type) {
case crypto.Address:
result = append(result, castedVal.Bytes()...)
case []byte:
result = append(result, castedVal...)
case uint32:
result = binary.BigEndian.AppendUint32(result, castedVal)
case uint16:
result = binary.BigEndian.AppendUint16(result, castedVal)
default:
panic("implement me!!")
}
}

// Append sequence number to the message (4 Bytes, Big Endian encoding)
result = binary.BigEndian.AppendUint32(result, b.seqNo)

return result
}
30 changes: 0 additions & 30 deletions www/zmq/util.go

This file was deleted.

74 changes: 0 additions & 74 deletions www/zmq/util_test.go

This file was deleted.

0 comments on commit bbee887

Please sign in to comment.