Add tests for sendBundle and fix flaky tests

This commit is contained in:
Andrea Maria Piana 2020-04-30 10:10:21 +02:00
parent 299b3fc093
commit 0e27e464d3
5 changed files with 175 additions and 42 deletions

View file

@ -303,8 +303,8 @@ func (s *WakuNodeMockSuite) SetupTest() {
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
peer := v0.NewPeer(w, p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}), rw2, nil)
go func() {
err := w.HandlePeer(peer, rw2)
panic(err)

View file

@ -26,6 +26,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/waku/common"
)
@ -40,3 +41,29 @@ func TestEncodeDecodeVersionedResponse(t *testing.T) {
require.NoError(t, err)
require.Equal(t, response.Response.Hash, v1resp.Hash)
}
func TestSendBundle(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer func() { handleError(t, rw1.Close()) }()
defer func() { handleError(t, rw2.Close()) }()
envelopes := []*common.Envelope{{
Expiry: 0,
TTL: 30,
Topic: common.TopicType{1},
Data: []byte{1, 1, 1},
}}
errc := make(chan error)
go func() {
_, err := sendBundle(rw1, envelopes)
errc <- err
}()
require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes))
require.NoError(t, <-errc)
}
func handleError(t *testing.T, err error) {
if err != nil {
t.Logf("deferred function error: '%s'", err)
}
}

View file

@ -19,11 +19,25 @@
package v0
import (
mrand "math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/waku/common"
)
var seed int64
// initSingleTest should be called in the beginning of every
// test, which uses RNG, in order to make the tests
// reproduciblity independent of their sequence.
func initSingleTest() {
seed = time.Now().Unix()
mrand.Seed(seed)
}
var sharedTopic = common.TopicType{0xF, 0x1, 0x2, 0}
var wrongTopic = common.TopicType{0, 0, 0, 0}
@ -58,3 +72,54 @@ func TestTopicOrBloomMatchFullNode(t *testing.T) {
t.Fatal("envelope should not match")
}
}
func TestPeerBasic(t *testing.T) {
initSingleTest()
params, err := generateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d.", seed)
}
params.PoW = 0.001
msg, err := common.NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
env, err := msg.Wrap(params, time.Now())
if err != nil {
t.Fatalf("failed Wrap with seed %d.", seed)
}
p := NewPeer(nil, nil, nil, nil)
p.Mark(env)
if !p.Marked(env) {
t.Fatalf("failed mark with seed %d.", seed)
}
}
func generateMessageParams() (*common.MessageParams, error) {
// set all the parameters except p.Dst and p.Padding
buf := make([]byte, 4)
mrand.Read(buf) // nolint: gosec
sz := mrand.Intn(400)
var p common.MessageParams
p.PoW = 0.01
p.WorkTime = 1
p.TTL = uint32(mrand.Intn(1024))
p.Payload = make([]byte, sz)
p.KeySym = make([]byte, common.AESKeyLength)
mrand.Read(p.Payload) // nolint: gosec
mrand.Read(p.KeySym) // nolint: gosec
p.Topic = common.BytesToTopic(buf)
var err error
p.Src, err = crypto.GenerateKey()
if err != nil {
return nil, err
}
return &p, nil
}

View file

@ -26,6 +26,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/waku/common"
)
@ -40,3 +41,29 @@ func TestEncodeDecodeVersionedResponse(t *testing.T) {
require.NoError(t, err)
require.Equal(t, response.Response.Hash, v1resp.Hash)
}
func TestSendBundle(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer func() { handleError(t, rw1.Close()) }()
defer func() { handleError(t, rw2.Close()) }()
envelopes := []*common.Envelope{{
Expiry: 0,
TTL: 30,
Topic: common.TopicType{1},
Data: []byte{1, 1, 1},
}}
errc := make(chan error)
go func() {
_, err := sendBundle(rw1, envelopes)
errc <- err
}()
require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes))
require.NoError(t, <-errc)
}
func handleError(t *testing.T, err error) {
if err != nil {
t.Logf("deferred function error: '%s'", err)
}
}

View file

@ -98,17 +98,14 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() {
rw1, rw2 := p2p.MsgPipe()
go func() {
// This will eventually error as we disconnect one the peers
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1))
}()
go func() {
select {
case <-time.After(time.Second * 5):
handleError(s.T(), rw1.Close())
handleError(s.T(), rw2.Close())
}
s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1))
}()
timer := time.AfterFunc(time.Second*5, func() {
handleError(s.T(), rw1.Close())
handleError(s.T(), rw2.Close())
})
peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil)
peer1.SetPeerTrusted(true)
@ -122,6 +119,9 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() {
e := <-envelopeEvents
s.Require().Equal(e.Hash, env.Hash(), "envelopes not equal")
peer1.Stop()
s.Require().NoError(rw1.Close())
s.Require().NoError(rw2.Close())
timer.Stop()
}
func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
@ -134,7 +134,7 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
rw1, rw2 := p2p.MsgPipe()
// so that actual read won't hang forever
time.AfterFunc(5*time.Second, func() {
timer := time.AfterFunc(5*time.Second, func() {
handleError(s.T(), rw1.Close())
handleError(s.T(), rw2.Close())
})
@ -142,7 +142,8 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil)
go func() {
handleError(s.T(), w1.HandlePeer(p1, rw1))
// This will always fail eventually as we close the channels
s.Require().Error(w1.HandlePeer(p1, rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
@ -151,9 +152,12 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
peers := w1.getPeers()
s.Require().Len(peers, 1)
s.Require().Equal(expectConfirmations, peers[0].ConfirmationsEnabled())
timer.Stop()
s.Require().NoError(rw1.Close())
s.Require().NoError(rw2.Close())
}
func (s *WakuTestSuite) TestConfirmationHadnshakeExtension() {
func (s *WakuTestSuite) TestConfirmationHandshakeExtension() {
s.testConfirmationsHandshake(true)
}
@ -161,7 +165,6 @@ func (s *WakuTestSuite) TestHandshakeWithConfirmationsDisabled() {
s.testConfirmationsHandshake(false)
}
// FLAKY
func (s *WakuTestSuite) TestMessagesResponseWithError() {
conf := &Config{
MinimumAcceptedPoW: 0,
@ -198,7 +201,7 @@ func (s *WakuTestSuite) TestMessagesResponseWithError() {
Nonce: 1,
}
normal := common.Envelope{
Expiry: uint32(time.Now().Unix()),
Expiry: uint32(time.Now().Unix()) + 5,
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
@ -255,7 +258,7 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
timer := time.AfterFunc(5*time.Second, func() {
if err := rw1.Close(); err != nil {
s.T().Errorf("error closing MsgPipe 1, '%s'", err)
}
@ -275,30 +278,39 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop
errorc <- err
}()
s.Require().NoError(w1.Send(&envelope))
w1.addEnvelope(&envelope)
var hash gethcommon.Hash
select {
case err := <-errorc:
s.Require().NoError(err)
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeSent:
s.Require().Equal(p1.EnodeID(), ev.Peer)
s.Require().NotEqual(gethcommon.Hash{}, ev.Batch)
hash = ev.Batch
case common.EventBatchAcknowledged:
s.Require().Equal(p1.EnodeID(), ev.Peer)
s.Require().Equal(hash, ev.Batch)
s.Require().Equal(envelopeErrors, ev.Data)
default:
s.Require().FailNow("invalid event message", ev.Event)
var e1, e2 *common.EnvelopeEvent
var count int
for count < 2 {
select {
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeSent:
if e1 == nil {
e1 = &ev
count++
}
case common.EventBatchAcknowledged:
if e2 == nil {
e2 = &ev
count++
}
}
case <-time.After(5 * time.Second):
s.Require().FailNow("timed out waiting for an envelope.sent event")
}
case <-time.After(5 * time.Second):
s.Require().FailNow("timed out waiting for an envelope.sent event")
}
s.Require().Equal(p1.EnodeID(), e1.Peer)
s.Require().NotEqual(gethcommon.Hash{}, e1.Batch)
s.Require().Equal(p1.EnodeID(), e2.Peer)
s.Require().Equal(e1.Batch, e2.Batch)
s.Require().Equal(envelopeErrors, e2.Data)
s.Require().NoError(rw1.Close())
s.Require().NoError(rw2.Close())
timer.Stop()
}
func (s *WakuTestSuite) TestConfirmationEventsReceived() {
@ -314,7 +326,7 @@ func (s *WakuTestSuite) TestConfirmationEventsReceived() {
func (s *WakuTestSuite) TestConfirmationEventsExtendedWithErrors() {
e := common.Envelope{
Expiry: uint32(time.Now().Unix()),
Expiry: uint32(time.Now().Unix()) - 4*common.DefaultSyncAllowance,
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
@ -324,7 +336,7 @@ func (s *WakuTestSuite) TestConfirmationEventsExtendedWithErrors() {
{
Hash: e.Hash(),
Code: common.EnvelopeTimeNotSynced,
Description: "test error",
Description: "very old envelope",
}},
)
}
@ -347,8 +359,8 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() {
handleError(s.T(), w1.HandlePeer(p1, rw2))
}()
time.AfterFunc(5*time.Second, func() {
rw1.Close()
timer := time.AfterFunc(5*time.Second, func() {
handleError(s.T(), rw1.Close())
})
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
s.Require().NoError(peer2.Start())
@ -374,6 +386,8 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() {
case <-time.After(5 * time.Second):
s.Require().FailNow("timed out waiting for an envelope.sent event")
}
s.Require().NoError(rw1.Close())
timer.Stop()
}
func discardPipe() *p2p.MsgPipeRW {
@ -628,10 +642,10 @@ func (s *WakuTestSuite) TestPeerHandshakeWithTwoFullNode() {
//two generic waku node handshake. one don't send light flag
func (s *WakuTestSuite) TestHandshakeWithOldVersionWithoutLightModeFlag() {
rw1, rw2 := p2p.MsgPipe()
go func() {
defer func() {
handleError(s.T(), rw1.Close())
}()
go func() {
defer func() {
handleError(s.T(), rw2.Close())
}()