diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index 8709f88f6..a8fb1a894 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -303,8 +303,8 @@ func (s *WakuNodeMockSuite) SetupTest() { pkey, err := crypto.GenerateKey() s.Require().NoError(err) node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1) - peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}) rw1, rw2 := p2p.MsgPipe() + peer := v0.NewPeer(w, p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}), rw2, nil) go func() { err := w.HandlePeer(peer, rw2) panic(err) diff --git a/waku/v0/message_test.go b/waku/v0/message_test.go index 075172b2d..25531ea8e 100644 --- a/waku/v0/message_test.go +++ b/waku/v0/message_test.go @@ -26,6 +26,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/waku/common" ) @@ -40,3 +41,29 @@ func TestEncodeDecodeVersionedResponse(t *testing.T) { require.NoError(t, err) require.Equal(t, response.Response.Hash, v1resp.Hash) } + +func TestSendBundle(t *testing.T) { + rw1, rw2 := p2p.MsgPipe() + defer func() { handleError(t, rw1.Close()) }() + defer func() { handleError(t, rw2.Close()) }() + envelopes := []*common.Envelope{{ + Expiry: 0, + TTL: 30, + Topic: common.TopicType{1}, + Data: []byte{1, 1, 1}, + }} + + errc := make(chan error) + go func() { + _, err := sendBundle(rw1, envelopes) + errc <- err + }() + require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes)) + require.NoError(t, <-errc) +} + +func handleError(t *testing.T, err error) { + if err != nil { + t.Logf("deferred function error: '%s'", err) + } +} diff --git a/waku/v0/peer_test.go b/waku/v0/peer_test.go index a130ced6a..3ee09111c 100644 --- a/waku/v0/peer_test.go +++ b/waku/v0/peer_test.go @@ -19,11 +19,25 @@ package v0 import ( + mrand "math/rand" "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/status-go/waku/common" ) +var seed int64 + +// initSingleTest should be called in the beginning of every +// test, which uses RNG, in order to make the tests +// reproduciblity independent of their sequence. +func initSingleTest() { + seed = time.Now().Unix() + mrand.Seed(seed) +} + var sharedTopic = common.TopicType{0xF, 0x1, 0x2, 0} var wrongTopic = common.TopicType{0, 0, 0, 0} @@ -58,3 +72,54 @@ func TestTopicOrBloomMatchFullNode(t *testing.T) { t.Fatal("envelope should not match") } } + +func TestPeerBasic(t *testing.T) { + initSingleTest() + + params, err := generateMessageParams() + if err != nil { + t.Fatalf("failed generateMessageParams with seed %d.", seed) + } + + params.PoW = 0.001 + msg, err := common.NewSentMessage(params) + if err != nil { + t.Fatalf("failed to create new message with seed %d: %s.", seed, err) + } + env, err := msg.Wrap(params, time.Now()) + if err != nil { + t.Fatalf("failed Wrap with seed %d.", seed) + } + + p := NewPeer(nil, nil, nil, nil) + p.Mark(env) + if !p.Marked(env) { + t.Fatalf("failed mark with seed %d.", seed) + } +} + +func generateMessageParams() (*common.MessageParams, error) { + // set all the parameters except p.Dst and p.Padding + + buf := make([]byte, 4) + mrand.Read(buf) // nolint: gosec + sz := mrand.Intn(400) + + var p common.MessageParams + p.PoW = 0.01 + p.WorkTime = 1 + p.TTL = uint32(mrand.Intn(1024)) + p.Payload = make([]byte, sz) + p.KeySym = make([]byte, common.AESKeyLength) + mrand.Read(p.Payload) // nolint: gosec + mrand.Read(p.KeySym) // nolint: gosec + p.Topic = common.BytesToTopic(buf) + + var err error + p.Src, err = crypto.GenerateKey() + if err != nil { + return nil, err + } + + return &p, nil +} diff --git a/waku/v1/message_test.go b/waku/v1/message_test.go index ae45be3b7..327c2055c 100644 --- a/waku/v1/message_test.go +++ b/waku/v1/message_test.go @@ -26,6 +26,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/waku/common" ) @@ -40,3 +41,29 @@ func TestEncodeDecodeVersionedResponse(t *testing.T) { require.NoError(t, err) require.Equal(t, response.Response.Hash, v1resp.Hash) } + +func TestSendBundle(t *testing.T) { + rw1, rw2 := p2p.MsgPipe() + defer func() { handleError(t, rw1.Close()) }() + defer func() { handleError(t, rw2.Close()) }() + envelopes := []*common.Envelope{{ + Expiry: 0, + TTL: 30, + Topic: common.TopicType{1}, + Data: []byte{1, 1, 1}, + }} + + errc := make(chan error) + go func() { + _, err := sendBundle(rw1, envelopes) + errc <- err + }() + require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes)) + require.NoError(t, <-errc) +} + +func handleError(t *testing.T, err error) { + if err != nil { + t.Logf("deferred function error: '%s'", err) + } +} diff --git a/waku/waku_version_test.go b/waku/waku_version_test.go index f24beab15..d8c57faa3 100644 --- a/waku/waku_version_test.go +++ b/waku/waku_version_test.go @@ -98,17 +98,14 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() { rw1, rw2 := p2p.MsgPipe() go func() { - // This will eventually error as we disconnect one the peers - handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1)) - }() - go func() { - select { - case <-time.After(time.Second * 5): - handleError(s.T(), rw1.Close()) - handleError(s.T(), rw2.Close()) - } + s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1)) }() + timer := time.AfterFunc(time.Second*5, func() { + handleError(s.T(), rw1.Close()) + handleError(s.T(), rw2.Close()) + }) + peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil) peer1.SetPeerTrusted(true) @@ -122,6 +119,9 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() { e := <-envelopeEvents s.Require().Equal(e.Hash, env.Hash(), "envelopes not equal") peer1.Stop() + s.Require().NoError(rw1.Close()) + s.Require().NoError(rw2.Close()) + timer.Stop() } func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) { @@ -134,7 +134,7 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) { rw1, rw2 := p2p.MsgPipe() // so that actual read won't hang forever - time.AfterFunc(5*time.Second, func() { + timer := time.AfterFunc(5*time.Second, func() { handleError(s.T(), rw1.Close()) handleError(s.T(), rw2.Close()) }) @@ -142,7 +142,8 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) { p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil) go func() { - handleError(s.T(), w1.HandlePeer(p1, rw1)) + // This will always fail eventually as we close the channels + s.Require().Error(w1.HandlePeer(p1, rw1)) }() p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) @@ -151,9 +152,12 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) { peers := w1.getPeers() s.Require().Len(peers, 1) s.Require().Equal(expectConfirmations, peers[0].ConfirmationsEnabled()) + timer.Stop() + s.Require().NoError(rw1.Close()) + s.Require().NoError(rw2.Close()) } -func (s *WakuTestSuite) TestConfirmationHadnshakeExtension() { +func (s *WakuTestSuite) TestConfirmationHandshakeExtension() { s.testConfirmationsHandshake(true) } @@ -161,7 +165,6 @@ func (s *WakuTestSuite) TestHandshakeWithConfirmationsDisabled() { s.testConfirmationsHandshake(false) } -// FLAKY func (s *WakuTestSuite) TestMessagesResponseWithError() { conf := &Config{ MinimumAcceptedPoW: 0, @@ -198,7 +201,7 @@ func (s *WakuTestSuite) TestMessagesResponseWithError() { Nonce: 1, } normal := common.Envelope{ - Expiry: uint32(time.Now().Unix()), + Expiry: uint32(time.Now().Unix()) + 5, TTL: 10, Topic: common.TopicType{1}, Data: make([]byte, 1<<10), @@ -255,7 +258,7 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop errorc <- err }() - time.AfterFunc(5*time.Second, func() { + timer := time.AfterFunc(5*time.Second, func() { if err := rw1.Close(); err != nil { s.T().Errorf("error closing MsgPipe 1, '%s'", err) } @@ -275,30 +278,39 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop errorc <- err }() - s.Require().NoError(w1.Send(&envelope)) + w1.addEnvelope(&envelope) - var hash gethcommon.Hash - select { - case err := <-errorc: - s.Require().NoError(err) - case ev := <-events: - switch ev.Event { - case common.EventEnvelopeSent: - s.Require().Equal(p1.EnodeID(), ev.Peer) - s.Require().NotEqual(gethcommon.Hash{}, ev.Batch) - hash = ev.Batch - case common.EventBatchAcknowledged: - s.Require().Equal(p1.EnodeID(), ev.Peer) - s.Require().Equal(hash, ev.Batch) - s.Require().Equal(envelopeErrors, ev.Data) - default: - s.Require().FailNow("invalid event message", ev.Event) + var e1, e2 *common.EnvelopeEvent + var count int + for count < 2 { + select { + case ev := <-events: + switch ev.Event { + case common.EventEnvelopeSent: + if e1 == nil { + e1 = &ev + count++ + } + case common.EventBatchAcknowledged: + if e2 == nil { + e2 = &ev + count++ + } + } + + case <-time.After(5 * time.Second): + s.Require().FailNow("timed out waiting for an envelope.sent event") } - - case <-time.After(5 * time.Second): - s.Require().FailNow("timed out waiting for an envelope.sent event") } + s.Require().Equal(p1.EnodeID(), e1.Peer) + s.Require().NotEqual(gethcommon.Hash{}, e1.Batch) + s.Require().Equal(p1.EnodeID(), e2.Peer) + s.Require().Equal(e1.Batch, e2.Batch) + s.Require().Equal(envelopeErrors, e2.Data) + s.Require().NoError(rw1.Close()) + s.Require().NoError(rw2.Close()) + timer.Stop() } func (s *WakuTestSuite) TestConfirmationEventsReceived() { @@ -314,7 +326,7 @@ func (s *WakuTestSuite) TestConfirmationEventsReceived() { func (s *WakuTestSuite) TestConfirmationEventsExtendedWithErrors() { e := common.Envelope{ - Expiry: uint32(time.Now().Unix()), + Expiry: uint32(time.Now().Unix()) - 4*common.DefaultSyncAllowance, TTL: 10, Topic: common.TopicType{1}, Data: make([]byte, 1<<10), @@ -324,7 +336,7 @@ func (s *WakuTestSuite) TestConfirmationEventsExtendedWithErrors() { { Hash: e.Hash(), Code: common.EnvelopeTimeNotSynced, - Description: "test error", + Description: "very old envelope", }}, ) } @@ -347,8 +359,8 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }() - time.AfterFunc(5*time.Second, func() { - rw1.Close() + timer := time.AfterFunc(5*time.Second, func() { + handleError(s.T(), rw1.Close()) }) peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil) s.Require().NoError(peer2.Start()) @@ -374,6 +386,8 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { case <-time.After(5 * time.Second): s.Require().FailNow("timed out waiting for an envelope.sent event") } + s.Require().NoError(rw1.Close()) + timer.Stop() } func discardPipe() *p2p.MsgPipeRW { @@ -628,10 +642,10 @@ func (s *WakuTestSuite) TestPeerHandshakeWithTwoFullNode() { //two generic waku node handshake. one don't send light flag func (s *WakuTestSuite) TestHandshakeWithOldVersionWithoutLightModeFlag() { rw1, rw2 := p2p.MsgPipe() - go func() { + defer func() { handleError(s.T(), rw1.Close()) }() - go func() { + defer func() { handleError(s.T(), rw2.Close()) }()