Skip to content

Commit

Permalink
Merge branch 'main' into fix/mv-mounted-disk
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Jan 18, 2025
2 parents 3ae6bbc + 0a9ecfd commit def79a1
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 7 deletions.
2 changes: 1 addition & 1 deletion www/zmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (b *basePub) HWM() int {
// - Message body (varies based on provided parts)
// - Sequence number (4 Bytes).
func (b *basePub) makeTopicMsg(parts ...any) []byte {
result := make([]byte, 0, 64)
result := make([]byte, 0)

// Append Topic ID to the message (2 Bytes)
result = append(result, b.topic.Bytes()...)
Expand Down
2 changes: 2 additions & 0 deletions www/zmq/publisher_block_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) {

if err := b.zmqSocket.Send(message); err != nil {
b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName())

return
}

b.logger.Debug("zmq published message success",
Expand Down
29 changes: 26 additions & 3 deletions www/zmq/publisher_raw_block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zmq

import (
"bytes"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
Expand All @@ -20,7 +22,28 @@ func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
}
}

func (*rawBlockPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")
func (r *rawBlockPub) onNewBlock(blk *block.Block) {
rawHeader := make([]byte, 0)
buf := bytes.NewBuffer(rawHeader)

if err := blk.Header().Encode(buf); err != nil {
r.logger.Error("failed to encode block header", "err", err, "publisher", r.TopicName())

return
}

rawMsg := r.makeTopicMsg(buf.Bytes(), blk.Height())
message := zmq4.NewMsg(rawMsg)

if err := r.zmqSocket.Send(message); err != nil {
r.logger.Error("zmq publish message error", "err", err, "publisher", r.TopicName())

return
}

r.logger.Debug("zmq published message success",
"publisher", r.TopicName(),
"block_height", blk.Height())

r.seqNo++
}
65 changes: 65 additions & 0 deletions www/zmq/publisher_raw_block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zmq

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/require"
)

func TestRawBlockPublisher(t *testing.T) {
port := testsuite.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)
conf := DefaultConfig()
conf.ZmqPubRawBlock = addr

td := setup(t, conf)
defer td.closeServer()

td.server.Publishers()

sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))

err := sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawBlock.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]

topic := msg[:2]
blockHeader := msg[2 : len(msg)-8]
height := binary.BigEndian.Uint32(msg[140 : len(msg)-4])
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])

buf := bytes.NewBuffer(blockHeader)
header := new(block.Header)

require.NoError(t, header.Decode(buf))

require.NotNil(t, header)
require.Equal(t, uint32(0), seqNo)
require.Equal(t, blk.Height(), height)
require.Equal(t, TopicRawBlock.Bytes(), topic)
require.Equal(t, header.PrevBlockHash(), blk.Header().PrevBlockHash())
require.Equal(t, header.StateRoot(), blk.Header().StateRoot())

require.NoError(t, sub.Close())
}
22 changes: 19 additions & 3 deletions www/zmq/publisher_tx_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,23 @@ func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
}
}

func (*txInfoPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")
func (t *txInfoPub) onNewBlock(blk *block.Block) {
for _, txn := range blk.Transactions() {
rawMsg := t.makeTopicMsg(txn.ID().Bytes(), blk.Height())
message := zmq4.NewMsg(rawMsg)

if err := t.zmqSocket.Send(message); err != nil {
t.logger.Error("zmq publish message error", "err", err, "publisher", t.TopicName())

continue
}

t.logger.Debug("zmq published message success",
"publisher", t.TopicName(),
"block_height", blk.Height(),
"tx_hash", txn.ID().String(),
)

t.seqNo++
}
}
59 changes: 59 additions & 0 deletions www/zmq/publisher_tx_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package zmq

import (
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/require"
)

func TestTxInfoPublisher(t *testing.T) {
port := testsuite.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)
conf := DefaultConfig()
conf.ZmqPubTxInfo = addr

td := setup(t, conf)
defer td.closeServer()

td.server.Publishers()

sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))

err := sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicTransactionInfo.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

for i := 0; i < len(blk.Transactions()); i++ {
received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]
require.Len(t, msg, 42)

topic := msg[:2]
txHash := msg[2:34]
height := binary.BigEndian.Uint32(msg[34:38])
seqNo := binary.BigEndian.Uint32(msg[38:])

require.Equal(t, TopicTransactionInfo.Bytes(), topic)
require.Equal(t, blk.Transactions()[i].ID().Bytes(), txHash)
require.Equal(t, blk.Height(), height)
require.Equal(t, uint32(i), seqNo)
}

require.NoError(t, sub.Close())
}

0 comments on commit def79a1

Please sign in to comment.