Move networking code for waku under v0 namespace

Why make the change?

As discussed previously, the way we will move across versions is to maintain completely separate
codebases and eventually remove those that are not supported anymore.

This has the drawback of some code duplication, but the advantage is that is more
explicit what each version requires, and changes in one version will not
impact the other, so we won't pile up backward compatible code.
This is the same strategy used by `whisper` in go ethereum and is influenced by
https://www.youtube.com/watch?v=oyLBGkS5ICk .

All the code that is used for the networking protocol is now under `v0/`.
Some of the common parts might still be refactored out.
The main namespace `waku` deals with `host`->`waku` interactions (through RPC),
while `v0` deals with `waku`->`remote-waku` interactions.

In order to support `v1`, the namespace `v0` will be copied over, and changed to
support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed.

This PR does not actually implement `v1`, I'd rather get things looked over to
make sure the structure is what we would like before implementing the changes.

What has changed?

- Moved all code for the common parts under `waku/common/` namespace
- Moved code used for bloomfilters in `waku/common/bloomfilter.go`
- Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc)
- Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go`

Things still to do

Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`).
I started moving those tests to instead of stubbing using the actual component, which increases
the testing surface. Some other tests that can't be easily ported should be likely moved under
`v0` instead. Ideally no version specif code should be exported from a version namespace (for
example the various codes, as those might change across versions). But this will be a work-in-progress.

Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only
when implementations diverge across versions.
This commit is contained in:
Andrea Maria Piana 2020-04-21 14:40:30 +02:00
parent a98e31c20f
commit aa7f591587
46 changed files with 1998 additions and 1713 deletions

View file

@ -1 +1 @@
0.52.4
0.52.5

View file

@ -7,6 +7,7 @@ import (
"go.uber.org/zap"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)
@ -20,8 +21,8 @@ type Bridge struct {
whisperIn chan *whisper.Envelope
whisperOut chan *whisper.Envelope
wakuIn chan *waku.Envelope
wakuOut chan *waku.Envelope
wakuIn chan *wakucommon.Envelope
wakuOut chan *wakucommon.Envelope
}
func New(shh *whisper.Whisper, w *waku.Waku, logger *zap.Logger) *Bridge {
@ -31,8 +32,8 @@ func New(shh *whisper.Whisper, w *waku.Waku, logger *zap.Logger) *Bridge {
logger: logger,
whisperOut: make(chan *whisper.Envelope),
whisperIn: make(chan *whisper.Envelope),
wakuIn: make(chan *waku.Envelope),
wakuOut: make(chan *waku.Envelope),
wakuIn: make(chan *wakucommon.Envelope),
wakuOut: make(chan *wakucommon.Envelope),
}
}
@ -48,7 +49,7 @@ type bridgeWaku struct {
*Bridge
}
func (b *bridgeWaku) Pipe() (<-chan *waku.Envelope, chan<- *waku.Envelope) {
func (b *bridgeWaku) Pipe() (<-chan *wakucommon.Envelope, chan<- *wakucommon.Envelope) {
return b.wakuOut, b.wakuIn
}
@ -84,7 +85,7 @@ func (b *Bridge) Start() {
case <-b.cancel:
return
case env := <-b.whisperIn:
wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(env)) // nolint: gosec
wakuEnvelope := (*wakucommon.Envelope)(unsafe.Pointer(env)) // nolint: gosec
b.logger.Debug(
"received waku envelope from whisper",
zap.ByteString("hash", wakuEnvelope.Hash().Bytes()),

View file

@ -13,17 +13,18 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)
func TestEnvelopesBeingIdentical(t *testing.T) {
// whisper.Envelope --> waku.Envelope
// whisper.Envelope --> wakucommon.Envelope
whisperEnvelope, err := createWhisperEnvelope()
require.NoError(t, err)
wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(whisperEnvelope)) // nolint: gosec
wakuEnvelope := (*wakucommon.Envelope)(unsafe.Pointer(whisperEnvelope)) // nolint: gosec
require.Equal(t, whisperEnvelope.Hash(), wakuEnvelope.Hash())
// waku.Envelope --> whisper.Envelope
// wakucommon.Envelope --> whisper.Envelope
wakuEnvelope, err = createWakuEnvelope()
require.NoError(t, err)
whisperEnvelope = (*whisper.Envelope)(unsafe.Pointer(wakuEnvelope)) // nolint: gosec
@ -47,7 +48,7 @@ func TestBridgeWhisperToWaku(t *testing.T) {
require.NoError(t, err)
// Subscribe for envelope events in Waku.
eventsWaku := make(chan waku.EnvelopeEvent, 10)
eventsWaku := make(chan wakucommon.EnvelopeEvent, 10)
sub1 := wak.SubscribeEnvelopeEvents(eventsWaku)
defer sub1.Unsubscribe()
@ -106,7 +107,7 @@ func TestBridgeWakuToWhisper(t *testing.T) {
defer sub1.Unsubscribe()
// Subscribe for envelope events in Waku.
eventsWaku := make(chan waku.EnvelopeEvent, 10)
eventsWaku := make(chan wakucommon.EnvelopeEvent, 10)
sub2 := wak.SubscribeEnvelopeEvents(eventsWaku)
defer sub2.Unsubscribe()
@ -167,20 +168,20 @@ func createWhisperEnvelope() (*whisper.Envelope, error) {
return envelope, nil
}
func createWakuEnvelope() (*waku.Envelope, error) {
messageParams := &waku.MessageParams{
func createWakuEnvelope() (*wakucommon.Envelope, error) {
messageParams := &wakucommon.MessageParams{
TTL: 120,
KeySym: []byte{0xaa, 0xbb, 0xcc},
Topic: waku.BytesToTopic([]byte{0x01}),
Topic: wakucommon.BytesToTopic([]byte{0x01}),
WorkTime: 10,
PoW: 2.0,
Payload: []byte("hello!"),
}
sentMessage, err := waku.NewSentMessage(messageParams)
sentMessage, err := wakucommon.NewSentMessage(messageParams)
if err != nil {
return nil, err
}
envelope := waku.NewEnvelope(120, waku.BytesToTopic([]byte{0x01}), sentMessage, time.Now())
envelope := wakucommon.NewEnvelope(120, wakucommon.BytesToTopic([]byte{0x01}), sentMessage, time.Now())
if err := envelope.Seal(messageParams); err != nil {
return nil, err
}

View file

@ -5,7 +5,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
waku "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)

View file

@ -2,7 +2,7 @@ package gethbridge
import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
waku "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)

View file

@ -3,6 +3,7 @@ package gethbridge
import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)
@ -34,14 +35,14 @@ func NewWhisperEnvelopeEventWrapper(envelopeEvent *whisper.EnvelopeEvent) *types
}
// NewWakuEnvelopeEventWrapper returns a types.EnvelopeEvent object that mimics Geth's EnvelopeEvent
func NewWakuEnvelopeEventWrapper(envelopeEvent *waku.EnvelopeEvent) *types.EnvelopeEvent {
func NewWakuEnvelopeEventWrapper(envelopeEvent *wakucommon.EnvelopeEvent) *types.EnvelopeEvent {
if envelopeEvent == nil {
panic("envelopeEvent should not be nil")
}
wrappedData := envelopeEvent.Data
switch data := envelopeEvent.Data.(type) {
case []waku.EnvelopeError:
case []wakucommon.EnvelopeError:
wrappedData := make([]types.EnvelopeError, len(data))
for index, envError := range data {
wrappedData[index] = *NewWakuEnvelopeErrorWrapper(&envError)

View file

@ -7,6 +7,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
)
type gethPublicWakuAPIWrapper struct {
@ -42,9 +43,9 @@ func (w *gethPublicWakuAPIWrapper) DeleteKeyPair(ctx context.Context, key string
// NewMessageFilter creates a new filter that can be used to poll for
// (new) messages that satisfy the given criteria.
func (w *gethPublicWakuAPIWrapper) NewMessageFilter(req types.Criteria) (string, error) {
topics := make([]waku.TopicType, len(req.Topics))
topics := make([]wakucommon.TopicType, len(req.Topics))
for index, tt := range req.Topics {
topics[index] = waku.TopicType(tt)
topics[index] = wakucommon.TopicType(tt)
}
criteria := waku.Criteria{
@ -92,7 +93,7 @@ func (w *gethPublicWakuAPIWrapper) Post(ctx context.Context, req types.NewMessag
PublicKey: req.PublicKey,
Sig: req.SigID, // Sig is really a SigID
TTL: req.TTL,
Topic: waku.TopicType(req.Topic),
Topic: wakucommon.TopicType(req.Topic),
Payload: req.Payload,
Padding: req.Padding,
PowTime: req.PowTime,

View file

@ -6,6 +6,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
)
type gethWakuWrapper struct {
@ -56,7 +57,7 @@ func (w *gethWakuWrapper) SetTimeSource(timesource func() time.Time) {
}
func (w *gethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.EnvelopeEvent) types.Subscription {
events := make(chan waku.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
events := make(chan wakucommon.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
go func() {
for e := range events {
eventsProxy <- *NewWakuEnvelopeEventWrapper(&e)
@ -139,18 +140,18 @@ func (w *gethWakuWrapper) Unsubscribe(id string) error {
}
func (w *gethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (types.Filter, error) {
return NewWakuFilterWrapper(&waku.Filter{
return NewWakuFilterWrapper(&wakucommon.Filter{
KeyAsym: keyAsym,
KeySym: keySym,
PoW: pow,
AllowP2P: true,
Topics: topics,
Messages: waku.NewMemoryMessageStore(),
Messages: wakucommon.NewMemoryMessageStore(),
}, id), nil
}
func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error {
return w.waku.SendMessagesRequest(peerID, waku.MessagesRequest{
return w.waku.SendMessagesRequest(peerID, wakucommon.MessagesRequest{
ID: r.ID,
From: r.From,
To: r.To,
@ -166,16 +167,16 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error {
return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*waku.Envelope), timeout)
return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout)
}
type wakuFilterWrapper struct {
filter *waku.Filter
filter *wakucommon.Filter
id string
}
// NewWakuFilterWrapper returns an object that wraps Geth's Filter in a types interface
func NewWakuFilterWrapper(f *waku.Filter, id string) types.Filter {
func NewWakuFilterWrapper(f *wakucommon.Filter, id string) types.Filter {
if f.Messages == nil {
panic("Messages should not be nil")
}
@ -187,7 +188,7 @@ func NewWakuFilterWrapper(f *waku.Filter, id string) types.Filter {
}
// GetWakuFilterFrom retrieves the underlying whisper Filter struct from a wrapped Filter interface
func GetWakuFilterFrom(f types.Filter) *waku.Filter {
func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter {
return f.(*wakuFilterWrapper).filter
}

2
go.sum
View file

@ -138,6 +138,7 @@ github.com/elastic/gosigar v0.10.4/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTy
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/ethereum/go-ethereum v1.8.20/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/ethereum/go-ethereum v1.9.2/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/ethereum/go-ethereum v1.9.13 h1:rOPqjSngvs1VSYH2H+PMPiWt4VEulvNRbFgqiGqJM3E=
github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc h1:jtW8jbpkO4YirRSyepBOH8E+2HEw6/hKkBvFPwhUN8c=
github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
@ -635,6 +636,7 @@ github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcw
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
github.com/status-im/status-go/extkeys v1.1.2 h1:FSjARgDathJ3rIapJt851LsIXP9Oyuu2M2jPJKuzloU=
github.com/status-im/status-go/extkeys v1.1.2/go.mod h1:hCmFzb2jiiVF2voZKYbzuhOQiHHCmyLJsZJXrFFg7BY=
github.com/status-im/status-go/waku v1.3.1 h1:hXvWsS/5ZKJ5iUXJvIZRE4Z78OH5u4d7OwBEPLNY9Gs=
github.com/status-im/status-go/whisper/v6 v6.2.6 h1:xELIv/8QB9CQlJjChnCPt4COWOFmgsc2kl03Y3Dspmo=
github.com/status-im/status-go/whisper/v6 v6.2.6/go.mod h1:csqMoPMkCPW1NJO56HJzNTWAl9UMdetnQzkPbPjsAC4=
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRNtXaM/P465MhvSFo/HM2O8qi2DDuPcd7ro=

View file

@ -36,6 +36,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)
@ -332,8 +333,8 @@ type WakuMailServer struct {
shh *waku.Waku
minRequestPoW float64
symFilter *waku.Filter
asymFilter *waku.Filter
symFilter *wakucommon.Filter
asymFilter *wakucommon.Filter
}
func (s *WakuMailServer) Init(waku *waku.Waku, cfg *params.WakuConfig) error {
@ -370,11 +371,11 @@ func (s *WakuMailServer) Close() {
s.ms.Close()
}
func (s *WakuMailServer) Archive(env *waku.Envelope) {
func (s *WakuMailServer) Archive(env *wakucommon.Envelope) {
s.ms.Archive(gethbridge.NewWakuEnvelope(env))
}
func (s *WakuMailServer) Deliver(peerID []byte, req waku.MessagesRequest) {
func (s *WakuMailServer) Deliver(peerID []byte, req wakucommon.MessagesRequest) {
s.ms.DeliverMail(types.BytesToHash(peerID), types.BytesToHash(req.ID), MessagesRequestPayload{
Lower: req.From,
Upper: req.To,
@ -387,7 +388,7 @@ func (s *WakuMailServer) Deliver(peerID []byte, req waku.MessagesRequest) {
}
// DEPRECATED; user Deliver instead
func (s *WakuMailServer) DeliverMail(peerID []byte, req *waku.Envelope) {
func (s *WakuMailServer) DeliverMail(peerID []byte, req *wakucommon.Envelope) {
payload, err := s.decodeRequest(peerID, req)
if err != nil {
deliveryFailuresCounter.WithLabelValues("validation").Inc()
@ -419,7 +420,7 @@ func (s *WakuMailServer) setupDecryptor(password, asymKey string) error {
return fmt.Errorf("save symmetric key: %v", err)
}
s.symFilter = &waku.Filter{KeySym: symKey}
s.symFilter = &wakucommon.Filter{KeySym: symKey}
}
if asymKey != "" {
@ -427,7 +428,7 @@ func (s *WakuMailServer) setupDecryptor(password, asymKey string) error {
if err != nil {
return err
}
s.asymFilter = &waku.Filter{KeyAsym: keyAsym}
s.asymFilter = &wakucommon.Filter{KeyAsym: keyAsym}
}
return nil
@ -435,7 +436,7 @@ func (s *WakuMailServer) setupDecryptor(password, asymKey string) error {
// openEnvelope tries to decrypt an envelope, first based on asymetric key (if
// provided) and second on the symetric key (if provided)
func (s *WakuMailServer) openEnvelope(request *waku.Envelope) *waku.ReceivedMessage {
func (s *WakuMailServer) openEnvelope(request *wakucommon.Envelope) *wakucommon.ReceivedMessage {
if s.asymFilter != nil {
if d := request.Open(s.asymFilter); d != nil {
return d
@ -449,7 +450,7 @@ func (s *WakuMailServer) openEnvelope(request *waku.Envelope) *waku.ReceivedMess
return nil
}
func (s *WakuMailServer) decodeRequest(peerID []byte, request *waku.Envelope) (MessagesRequestPayload, error) {
func (s *WakuMailServer) decodeRequest(peerID []byte, request *wakucommon.Envelope) (MessagesRequestPayload, error) {
var payload MessagesRequestPayload
if s.minRequestPoW > 0.0 && request.PoW() < s.minRequestPoW {

View file

@ -43,6 +43,7 @@ import (
"github.com/status-im/status-go/static"
"github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
"github.com/status-im/status-go/whisper/v6"
)
@ -456,7 +457,7 @@ func createShhService(ctx *node.ServiceContext, whisperConfig *params.WhisperCon
func createWakuService(ctx *node.ServiceContext, wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfig) (*waku.Waku, error) {
cfg := &waku.Config{
MaxMessageSize: waku.DefaultMaxMessageSize,
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
BloomFilterMode: wakuCfg.BloomFilterMode,
MinimumAcceptedPoW: params.WakuMinimumPoW,
}
@ -613,7 +614,7 @@ func whisperRateLimiter(whisperConfig *params.WhisperConfig, clusterConfig *para
)
}
func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfig) *waku.PeerRateLimiter {
func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfig) *wakucommon.PeerRateLimiter {
enodes := append(
parseNodes(clusterCfg.StaticNodes),
parseNodes(clusterCfg.TrustedMailServers)...,
@ -626,8 +627,8 @@ func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfi
ips = append(ips, item.IP().String())
peerIDs = append(peerIDs, item.ID())
}
return waku.NewPeerRateLimiter(
&waku.PeerRateLimiterConfig{
return wakucommon.NewPeerRateLimiter(
&wakucommon.PeerRateLimiterConfig{
LimitPerSecIP: wakuCfg.RateLimitIP,
LimitPerSecPeerID: wakuCfg.RateLimitPeerID,
WhitelistedIPs: ips,

View file

@ -11,7 +11,7 @@ import (
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/ext"
"github.com/status-im/status-go/waku"
waku "github.com/status-im/status-go/waku/common"
)
const (

View file

@ -31,6 +31,8 @@ import (
"github.com/status-im/status-go/sqlite"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
)
func TestRequestMessagesErrors(t *testing.T) {
@ -273,7 +275,6 @@ func (s *ShhExtSuite) TestFailedRequestWithUnknownMailServerPeer() {
const (
// internal waku protocol codes
statusCode = 0
p2pRequestCompleteCode = 125
)
@ -297,6 +298,7 @@ func (s *WakuNodeMockSuite) SetupTest() {
EnableConfirmations: true,
}
w := waku.New(conf, nil)
w2 := waku.New(nil, nil)
s.Require().NoError(w.Start(nil))
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -308,8 +310,10 @@ func (s *WakuNodeMockSuite) SetupTest() {
panic(err)
}()
wakuWrapper := gethbridge.NewGethWakuWrapper(w)
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, nil))
s.Require().NoError(p2p.SendItems(rw1, statusCode, waku.ProtocolVersion, []interface{}{}))
peer1 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil)
err = peer1.Start()
s.Require().NoError(err, "failed run message loop")
nodeWrapper := ext.NewTestNodeWrapper(nil, wakuWrapper)
s.localService = New(
@ -352,7 +356,7 @@ func (s *RequestMessagesSyncSuite) TestExpired() {
},
ext.MessagesRequest{
MailServerPeer: s.localNode.String(),
Topics: []types.TopicType{{0x01, 0x02, 0x03, 0x04}},
Topics: []common.TopicType{{0x01, 0x02, 0x03, 0x04}},
},
)
s.Require().EqualError(err, "failed to request messages after 1 retries")
@ -374,7 +378,7 @@ func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) {
s.Require().NoError(msg.Discard())
continue
}
var e waku.Envelope
var e wakucommon.Envelope
s.Require().NoError(msg.Decode(&e))
s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, waku.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, cursor[:])))
}

View file

@ -26,6 +26,8 @@ import (
"sync"
"time"
"github.com/status-im/status-go/waku/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@ -61,11 +63,6 @@ func NewPublicWakuAPI(w *Waku) *PublicWakuAPI {
return api
}
// Version returns the Waku sub-protocol version.
func (api *PublicWakuAPI) Version(ctx context.Context) string {
return ProtocolVersionStr
}
// Info contains diagnostic information.
type Info struct {
Messages int `json:"messages"` // Number of floating messages.
@ -207,16 +204,16 @@ func (api *PublicWakuAPI) CancelLightClient(ctx context.Context) bool {
// NewMessage represents a new waku message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
Sig string `json:"sig"`
TTL uint32 `json:"ttl"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
Sig string `json:"sig"`
TTL uint32 `json:"ttl"`
Topic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
}
type newMessageOverride struct { // nolint: deadcode,unused
@ -239,7 +236,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
return nil, ErrSymAsym
}
params := &MessageParams{
params := &common.MessageParams{
TTL: req.TTL,
Payload: req.Payload,
Padding: req.Padding,
@ -257,13 +254,13 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
if params.Topic == (common.TopicType{}) { // topics are mandatory with symmetric encryption
return nil, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return nil, err
}
if !validateDataIntegrity(params.KeySym, aesKeyLength) {
if !common.ValidateDataIntegrity(params.KeySym, common.AESKeyLength) {
return nil, ErrInvalidSymmetricKey
}
}
@ -276,7 +273,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
}
// encrypt and sent message
msg, err := NewSentMessage(params)
msg, err := common.NewSentMessage(params)
if err != nil {
return nil, err
}
@ -326,12 +323,12 @@ func (api *PublicWakuAPI) Unsubscribe(id string) {
// Criteria holds various filter options for inbound messages.
type Criteria struct {
SymKeyID string `json:"symKeyID"`
PrivateKeyID string `json:"privateKeyID"`
Sig []byte `json:"sig"`
MinPow float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P bool `json:"allowP2P"`
SymKeyID string `json:"symKeyID"`
PrivateKeyID string `json:"privateKeyID"`
Sig []byte `json:"sig"`
MinPow float64 `json:"minPow"`
Topics []common.TopicType `json:"topics"`
AllowP2P bool `json:"allowP2P"`
}
// Messages set up a subscription that fires events when messages arrive that match
@ -354,9 +351,9 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
return nil, ErrSymAsym
}
filter := Filter{
filter := common.Filter{
PoW: crit.MinPow,
Messages: NewMemoryMessageStore(),
Messages: common.NewMemoryMessageStore(),
AllowP2P: crit.AllowP2P,
}
@ -382,7 +379,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
if err != nil {
return nil, err
}
if !validateDataIntegrity(key, aesKeyLength) {
if !common.ValidateDataIntegrity(key, common.AESKeyLength) {
return nil, ErrInvalidSymmetricKey
}
filter.KeySym = key
@ -433,16 +430,16 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
// Message is the RPC representation of a waku message.
type Message struct {
Sig []byte `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PoW float64 `json:"pow"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
P2P bool `json:"bool,omitempty"`
Sig []byte `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
Topic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PoW float64 `json:"pow"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
P2P bool `json:"bool,omitempty"`
}
type messageOverride struct { // nolint: deadcode,unused
@ -454,7 +451,7 @@ type messageOverride struct { // nolint: deadcode,unused
}
// ToWakuMessage converts an internal message into an API version.
func ToWakuMessage(message *ReceivedMessage) *Message {
func ToWakuMessage(message *common.ReceivedMessage) *Message {
msg := Message{
Payload: message.Payload,
Padding: message.Padding,
@ -473,7 +470,7 @@ func ToWakuMessage(message *ReceivedMessage) *Message {
}
}
if isMessageSigned(message.Raw[0]) {
if common.IsMessageSigned(message.Raw[0]) {
b := crypto.FromECDSAPub(message.SigToPubKey())
if b != nil {
msg.Sig = b
@ -484,7 +481,7 @@ func ToWakuMessage(message *ReceivedMessage) *Message {
}
// toMessage converts a set of messages to its RPC representation.
func toMessage(messages []*ReceivedMessage) []*Message {
func toMessage(messages []*common.ReceivedMessage) []*Message {
msgs := make([]*Message, len(messages))
for i, msg := range messages {
msgs[i] = ToWakuMessage(msg)
@ -552,7 +549,7 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) {
if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return "", err
}
if !validateDataIntegrity(keySym, aesKeyLength) {
if !common.ValidateDataIntegrity(keySym, common.AESKeyLength) {
return "", ErrInvalidSymmetricKey
}
}
@ -566,19 +563,19 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) {
if len(req.Topics) > 0 {
topics = make([][]byte, len(req.Topics))
for i, topic := range req.Topics {
topics[i] = make([]byte, TopicLength)
topics[i] = make([]byte, common.TopicLength)
copy(topics[i], topic[:])
}
}
f := &Filter{
f := &common.Filter{
Src: src,
KeySym: keySym,
KeyAsym: keyAsym,
PoW: req.MinPow,
AllowP2P: req.AllowP2P,
Topics: topics,
Messages: NewMemoryMessageStore(),
Messages: common.NewMemoryMessageStore(),
}
id, err := api.w.Subscribe(f)

View file

@ -22,6 +22,8 @@ import (
"bytes"
"testing"
"time"
"github.com/status-im/status-go/waku/common"
)
func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
@ -41,7 +43,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
crit := Criteria{
SymKeyID: keyID,
Topics: []TopicType{TopicType(t1), TopicType(t2)},
Topics: []common.TopicType{common.TopicType(t1), common.TopicType(t2)},
}
_, err = api.NewMessageFilter(crit)
@ -50,7 +52,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
}
found := false
candidates := w.filters.getWatchersByTopic(TopicType(t1))
candidates := w.filters.GetWatchersByTopic(common.TopicType(t1))
for _, f := range candidates {
if len(f.Topics) == 2 {
if bytes.Equal(f.Topics[0], t1[:]) && bytes.Equal(f.Topics[1], t2[:]) {

View file

@ -0,0 +1,37 @@
package common
func IsFullNode(bloom []byte) bool {
if bloom == nil {
return true
}
for _, b := range bloom {
if b != 255 {
return false
}
}
return true
}
func BloomFilterMatch(filter, sample []byte) bool {
if filter == nil {
return true
}
for i := 0; i < BloomFilterSize; i++ {
f := filter[i]
s := sample[i]
if (f | s) != f {
return false
}
}
return true
}
func MakeFullNodeBloom() []byte {
bloom := make([]byte, BloomFilterSize)
for i := 0; i < BloomFilterSize; i++ {
bloom[i] = 0xFF
}
return bloom
}

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"time"
@ -26,29 +26,14 @@ import (
// Waku protocol parameters
const (
ProtocolVersion = uint64(0) // Protocol version number
ProtocolVersionStr = "0" // The same, as a string
ProtocolName = "waku" // Nickname of the protocol
// Waku protocol message codes, according to https://github.com/vacp2p/specs/blob/master/waku.md
statusCode = 0 // used in the handshake
messagesCode = 1 // regular message
statusUpdateCode = 22 // update of settings
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
messageResponseCode = 12 // includes confirmation for delivery and information about errors
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
SizeMask = byte(3) // mask used to extract the size of payload size field from the flags
signatureFlag = byte(4)
TopicLength = 4 // in bytes
signatureLength = crypto.SignatureLength // in bytes
aesKeyLength = 32 // in bytes
AESKeyLength = 32 // in bytes
aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
keyIDSize = 32 // in bytes
KeyIDSize = 32 // in bytes
BloomFilterSize = 64 // in bytes
MaxTopicInterest = 10000
flagsLength = 1
@ -59,11 +44,10 @@ const (
DefaultMaxMessageSize = uint32(1024 * 1024)
DefaultMinimumPoW = 0.2
padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol
messageQueueLimit = 1024
padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol
expirationCycle = time.Second
transmissionCycle = 300 * time.Millisecond
ExpirationCycle = time.Second
TransmissionCycle = 300 * time.Millisecond
DefaultTTL = 50 // seconds
DefaultSyncAllowance = 10 // seconds

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"crypto/ecdsa"
@ -48,8 +48,8 @@ type Envelope struct {
bloom []byte
}
// size returns the size of envelope as it is sent (i.e. public fields only)
func (e *Envelope) size() int {
// Size returns the size of envelope as it is sent (i.e. public fields only)
func (e *Envelope) Size() int {
return EnvelopeHeaderLength + len(e.Data)
}
@ -124,12 +124,12 @@ func (e *Envelope) Seal(options *MessageParams) error {
// of the envelope.
func (e *Envelope) PoW() float64 {
if e.pow == 0 {
e.calculatePoW(0)
e.CalculatePoW(0)
}
return e.pow
}
func (e *Envelope) calculatePoW(diff uint32) {
func (e *Envelope) CalculatePoW(diff uint32) {
rlp := e.rlpWithoutNonce()
buf := make([]byte, len(rlp)+8)
copy(buf, rlp)
@ -144,7 +144,7 @@ func (e *Envelope) calculatePoW(diff uint32) {
func (e *Envelope) powToFirstBit(pow float64) int {
x := pow
x *= float64(e.size())
x *= float64(e.Size())
x *= float64(e.TTL)
bits := math.Log2(x)
bits = math.Ceil(bits)

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
mrand "math/rand"
@ -33,7 +33,7 @@ func TestPoWCalculationsWithNoLeadingZeros(t *testing.T) {
Nonce: 100000,
}
e.calculatePoW(0)
e.CalculatePoW(0)
if e.pow != 0.07692307692307693 {
t.Fatalf("invalid PoW calculation. Expected 0.07692307692307693, got %v", e.pow)
@ -46,7 +46,7 @@ func TestPoWCalculationsWith8LeadingZeros(t *testing.T) {
Data: []byte{0xde, 0xad, 0xbe, 0xef},
Nonce: 276,
}
e.calculatePoW(0)
e.CalculatePoW(0)
if e.pow != 19.692307692307693 {
t.Fatalf("invalid PoW calculation. Expected 19.692307692307693, got %v", e.pow)
@ -54,7 +54,7 @@ func TestPoWCalculationsWith8LeadingZeros(t *testing.T) {
}
func TestEnvelopeOpenAcceptsOnlyOneKeyTypeInFilter(t *testing.T) {
symKey := make([]byte, aesKeyLength)
symKey := make([]byte, AESKeyLength)
mrand.Read(symKey) //nolint: gosec
asymKey, err := crypto.GenerateKey()

4
waku/common/errors.go Normal file
View file

@ -0,0 +1,4 @@
package common
// TimeSyncError error for clock skew errors.
type TimeSyncError error

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"github.com/ethereum/go-ethereum/common"

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"crypto/ecdsa"
@ -49,17 +49,15 @@ type Filters struct {
topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
waku *Waku
mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Waku) *Filters {
func NewFilters() *Filters {
return &Filters{
watchers: make(map[string]*Filter),
topicMatcher: make(map[TopicType]map[*Filter]struct{}),
allTopicsMatcher: make(map[*Filter]struct{}),
waku: w,
}
}
@ -129,9 +127,9 @@ func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
}
}
// getWatchersByTopic returns a slice containing the filters that
// GetWatchersByTopic returns a slice containing the filters that
// match a specific topic
func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
func (fs *Filters) GetWatchersByTopic(topic TopicType) []*Filter {
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
for watcher := range fs.allTopicsMatcher {
res = append(res, watcher)
@ -157,7 +155,7 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
candidates := fs.getWatchersByTopic(env.Topic)
candidates := fs.GetWatchersByTopic(env.Topic)
for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P {
log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
@ -241,14 +239,3 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
return f.PoW <= 0 || envelope.pow >= f.PoW
}
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
} else if !ValidatePublicKey(b) {
return false
}
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"math/big"
@ -65,7 +65,7 @@ func generateFilter(t *testing.T, symmetric bool) (*Filter, error) {
f.Src = &key.PublicKey
if symmetric {
f.KeySym = make([]byte, aesKeyLength)
f.KeySym = make([]byte, AESKeyLength)
mrand.Read(f.KeySym) // nolint: gosec
f.SymKeyHash = crypto.Keccak256Hash(f.KeySym)
} else {
@ -94,8 +94,7 @@ func TestInstallFilters(t *testing.T) {
InitSingleTest()
const SizeTestFilters = 256
w := New(&Config{}, nil)
filters := NewFilters(w)
filters := NewFilters()
tst := generateTestCases(t, SizeTestFilters)
var err error
@ -106,7 +105,7 @@ func TestInstallFilters(t *testing.T) {
t.Fatalf("seed %d: failed to install filter: %s", seed, err)
}
tst[i].id = j
if len(j) != keyIDSize*2 {
if len(j) != KeyIDSize*2 {
t.Fatalf("seed %d: wrong filter id size [%d]", seed, len(j))
}
}
@ -132,8 +131,7 @@ func TestInstallFilters(t *testing.T) {
func TestInstallSymKeyGeneratesHash(t *testing.T) {
InitSingleTest()
w := New(&Config{}, nil)
filters := NewFilters(w)
filters := NewFilters()
filter, _ := generateFilter(t, true)
// save the current SymKeyHash for comparison
@ -159,8 +157,7 @@ func TestInstallSymKeyGeneratesHash(t *testing.T) {
func TestInstallIdenticalFilters(t *testing.T) {
InitSingleTest()
w := New(&Config{}, nil)
filters := NewFilters(w)
filters := NewFilters()
filter1, _ := generateFilter(t, true)
// Copy the first filter since some of its fields
@ -185,7 +182,7 @@ func TestInstallIdenticalFilters(t *testing.T) {
t.Fatalf("Error installing the second filter with seed %d: %s", seed, err)
}
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("Error generating message parameters with seed %d: %s", seed, err)
}
@ -229,8 +226,7 @@ func TestInstallIdenticalFilters(t *testing.T) {
func TestInstallFilterWithSymAndAsymKeys(t *testing.T) {
InitSingleTest()
w := New(&Config{}, nil)
filters := NewFilters(w)
filters := NewFilters()
filter1, _ := generateFilter(t, true)
asymKey, err := crypto.GenerateKey()
@ -295,9 +291,9 @@ func TestMatchEnvelope(t *testing.T) {
t.Fatalf("failed generateFilter() with seed %d: %s.", seed, err)
}
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
params.Topic[0] = 0xFF // topic mismatch
@ -428,9 +424,9 @@ func TestMatchEnvelope(t *testing.T) {
func TestMatchMessageSym(t *testing.T) {
InitSingleTest()
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
f, err := generateFilter(t, true)
@ -525,9 +521,9 @@ func TestMatchMessageAsym(t *testing.T) {
t.Fatalf("failed generateFilter with seed %d: %s.", seed, err)
}
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
const index = 1
@ -613,9 +609,9 @@ func cloneFilter(orig *Filter) *Filter {
}
func generateCompatibeEnvelope(t *testing.T, f *Filter) *Envelope {
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
return nil
}
@ -644,8 +640,7 @@ func TestWatchers(t *testing.T) {
var x, firstID string
var err error
w := New(&Config{}, nil)
filters := NewFilters(w)
filters := NewFilters()
tst := generateTestCases(t, NumFilters)
for i = 0; i < NumFilters; i++ {
tst[i].f.Src = nil
@ -796,9 +791,9 @@ func TestVariableTopics(t *testing.T) {
const lastTopicByte = 3
var match bool
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err := NewSentMessage(params)
if err != nil {

78
waku/common/helper.go Normal file
View file

@ -0,0 +1,78 @@
package common
import (
"crypto/ecdsa"
"fmt"
"github.com/ethereum/go-ethereum/common"
)
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
} else if !ValidatePublicKey(b) {
return false
}
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}
// ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
}
// BytesToUintLittleEndian converts the slice to 64-bit unsigned integer.
func BytesToUintLittleEndian(b []byte) (res uint64) {
mul := uint64(1)
for i := 0; i < len(b); i++ {
res += uint64(b[i]) * mul
mul *= 256
}
return res
}
// BytesToUintBigEndian converts the slice to 64-bit unsigned integer.
func BytesToUintBigEndian(b []byte) (res uint64) {
for i := 0; i < len(b); i++ {
res *= 256
res += uint64(b[i])
}
return res
}
// ContainsOnlyZeros checks if the data contain only zeros.
func ContainsOnlyZeros(data []byte) bool {
for _, b := range data {
if b != 0 {
return false
}
}
return true
}
// GenerateRandomID generates a random string, which is then returned to be used as a key id
func GenerateRandomID() (id string, err error) {
buf, err := GenerateSecureRandomData(KeyIDSize)
if err != nil {
return "", err
}
if !ValidateDataIntegrity(buf, KeyIDSize) {
return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
}
id = common.Bytes2Hex(buf)
return id, err
}
// ValidateDataIntegrity returns false if the data have the wrong or contains all zeros,
// which is the simplest and the most common bug.
func ValidateDataIntegrity(k []byte, expectedSize int) bool {
if len(k) != expectedSize {
return false
}
if expectedSize > 3 && ContainsOnlyZeros(k) {
return false
}
return true
}

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"crypto/aes"
@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// MessageParams specifies the exact way a message should be wrapped
@ -137,35 +136,7 @@ type MessagesResponse struct {
Errors []EnvelopeError
}
// MultiVersionResponse allows to decode response into chosen version.
type MultiVersionResponse struct {
Version uint
Response rlp.RawValue
}
// DecodeResponse1 decodes response into first version of the messages response.
func (m MultiVersionResponse) DecodeResponse1() (resp MessagesResponse, err error) {
return resp, rlp.DecodeBytes(m.Response, &resp)
}
// Version1MessageResponse first version of the message response.
type Version1MessageResponse struct {
Version uint
Response MessagesResponse
}
// NewMessagesResponse returns instance of the version messages response.
func NewMessagesResponse(batch common.Hash, errors []EnvelopeError) Version1MessageResponse {
return Version1MessageResponse{
Version: 1,
Response: MessagesResponse{
Hash: batch,
Errors: errors,
},
}
}
func isMessageSigned(flags byte) bool {
func IsMessageSigned(flags byte) bool {
return (flags & signatureFlag) != 0
}
@ -229,7 +200,7 @@ func (msg *sentMessage) appendPadding(params *MessageParams) error {
if err != nil {
return err
}
if !validateDataIntegrity(pad, paddingSize) {
if !ValidateDataIntegrity(pad, paddingSize) {
return errors.New("failed to generate random padding of size " + strconv.Itoa(paddingSize))
}
msg.Raw = append(msg.Raw, pad...)
@ -239,7 +210,7 @@ func (msg *sentMessage) appendPadding(params *MessageParams) error {
// sign calculates and sets the cryptographic signature for the message,
// also setting the sign flag.
func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error {
if isMessageSigned(msg.Raw[0]) {
if IsMessageSigned(msg.Raw[0]) {
// this should not happen, but no reason to panic
log.Error("failed to sign the message: already signed")
return nil
@ -271,7 +242,7 @@ func (msg *sentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error {
// encryptSymmetric encrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
if !validateDataIntegrity(key, aesKeyLength) {
if !ValidateDataIntegrity(key, AESKeyLength) {
return errors.New("invalid key provided for symmetric encryption, size: " + strconv.Itoa(len(key)))
}
block, err := aes.NewCipher(key)
@ -282,7 +253,7 @@ func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
if err != nil {
return err
}
salt, err := generateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key
salt, err := GenerateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key
if err != nil {
return err
}
@ -291,12 +262,12 @@ func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
return nil
}
// generateSecureRandomData generates random data where extra security is required.
// GenerateSecureRandomData generates random data where extra security is required.
// The purpose of this function is to prevent some bugs in software or in hardware
// from delivering not-very-random data. This is especially useful for AES nonce,
// where true randomness does not really matter, but it is very important to have
// a unique nonce for every message.
func generateSecureRandomData(length int) ([]byte, error) {
func GenerateSecureRandomData(length int) ([]byte, error) {
x := make([]byte, length)
y := make([]byte, length)
res := make([]byte, length)
@ -304,19 +275,19 @@ func generateSecureRandomData(length int) ([]byte, error) {
_, err := crand.Read(x)
if err != nil {
return nil, err
} else if !validateDataIntegrity(x, length) {
} else if !ValidateDataIntegrity(x, length) {
return nil, errors.New("crypto/rand failed to generate secure random data")
}
_, err = mrand.Read(y) // nolint: gosec
if err != nil {
return nil, err
} else if !validateDataIntegrity(y, length) {
} else if !ValidateDataIntegrity(y, length) {
return nil, errors.New("math/rand failed to generate secure random data")
}
for i := 0; i < length; i++ {
res[i] = x[i] ^ y[i]
}
if !validateDataIntegrity(res, length) {
if !ValidateDataIntegrity(res, length) {
return nil, errors.New("failed to generate secure random data")
}
return res, nil
@ -392,7 +363,7 @@ func (msg *ReceivedMessage) ValidateAndParse() bool {
return false
}
if isMessageSigned(msg.Raw[0]) {
if IsMessageSigned(msg.Raw[0]) {
end -= signatureLength
if end <= 1 {
return false
@ -411,7 +382,7 @@ func (msg *ReceivedMessage) ValidateAndParse() bool {
if end < beg+sizeOfPayloadSizeField {
return false
}
payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
payloadSize = int(BytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
beg += sizeOfPayloadSizeField
if beg+payloadSize > end {
return false
@ -440,7 +411,7 @@ func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey {
// hash calculates the SHA3 checksum of the message flags, payload size field, payload and padding.
func (msg *ReceivedMessage) hash() []byte {
if isMessageSigned(msg.Raw[0]) {
if IsMessageSigned(msg.Raw[0]) {
sz := len(msg.Raw) - signatureLength
return crypto.Keccak256(msg.Raw[:sz])
}

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"bytes"
@ -26,15 +26,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
)
func generateMessageParams() (*MessageParams, error) {
func GenerateMessageParams() (*MessageParams, error) {
// set all the parameters except p.Dst and p.Padding
buf := make([]byte, 4)
@ -46,7 +43,7 @@ func generateMessageParams() (*MessageParams, error) {
p.WorkTime = 1
p.TTL = uint32(mrand.Intn(1024))
p.Payload = make([]byte, sz)
p.KeySym = make([]byte, aesKeyLength)
p.KeySym = make([]byte, AESKeyLength)
mrand.Read(p.Payload) // nolint: gosec
mrand.Read(p.KeySym) // nolint: gosec
p.Topic = BytesToTopic(buf)
@ -61,9 +58,9 @@ func generateMessageParams() (*MessageParams, error) {
}
func singleMessageTest(t *testing.T, symmetric bool) {
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
key, err := crypto.GenerateKey()
@ -106,7 +103,7 @@ func singleMessageTest(t *testing.T, symmetric bool) {
if !bytes.Equal(text, decrypted.Payload) {
t.Fatalf("failed with seed %d: compare payload.", seed)
}
if !isMessageSigned(decrypted.Raw[0]) {
if !IsMessageSigned(decrypted.Raw[0]) {
t.Fatalf("failed with seed %d: unsigned.", seed)
}
if len(decrypted.Signature) != signatureLength {
@ -132,9 +129,9 @@ func TestMessageWrap(t *testing.T) {
mrand.Seed(seed)
target := 128.0
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err := NewSentMessage(params)
@ -173,9 +170,9 @@ func TestMessageSeal(t *testing.T) {
seed = int64(1976726903)
mrand.Seed(seed)
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err := NewSentMessage(params)
@ -192,7 +189,7 @@ func TestMessageSeal(t *testing.T) {
params.PoW = target
env.Seal(params) // nolint: errcheck
env.calculatePoW(0)
env.CalculatePoW(0)
pow := env.PoW()
if pow < target {
t.Fatalf("failed Wrap with seed %d: pow < target (%f vs. %f).", seed, pow, target)
@ -201,7 +198,7 @@ func TestMessageSeal(t *testing.T) {
params.WorkTime = 1
params.PoW = 1000000000.0
env.Seal(params) // nolint: errcheck
env.calculatePoW(0)
env.CalculatePoW(0)
pow = env.PoW()
if pow < 2*target {
t.Fatalf("failed Wrap with seed %d: pow too small %f.", seed, pow)
@ -219,9 +216,9 @@ func TestEnvelopeOpen(t *testing.T) {
}
func singleEnvelopeOpenTest(t *testing.T, symmetric bool) {
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
key, err := crypto.GenerateKey()
@ -260,7 +257,7 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) {
if !bytes.Equal(text, decrypted.Payload) {
t.Fatalf("failed with seed %d: compare payload.", seed)
}
if !isMessageSigned(decrypted.Raw[0]) {
if !IsMessageSigned(decrypted.Raw[0]) {
t.Fatalf("failed with seed %d: unsigned.", seed)
}
if len(decrypted.Signature) != signatureLength {
@ -288,23 +285,23 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) {
func TestEncryptWithZeroKey(t *testing.T) {
InitSingleTest()
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err := NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
params.KeySym = make([]byte, aesKeyLength)
params.KeySym = make([]byte, AESKeyLength)
_, err = msg.Wrap(params, time.Now())
if err == nil {
t.Fatalf("wrapped with zero key, seed: %d.", seed)
}
params, err = generateMessageParams()
params, err = GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err = NewSentMessage(params)
if err != nil {
@ -316,9 +313,9 @@ func TestEncryptWithZeroKey(t *testing.T) {
t.Fatalf("wrapped with empty key, seed: %d.", seed)
}
params, err = generateMessageParams()
params, err = GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err = NewSentMessage(params)
if err != nil {
@ -334,9 +331,9 @@ func TestEncryptWithZeroKey(t *testing.T) {
func TestRlpEncode(t *testing.T) {
InitSingleTest()
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
t.Fatalf("failed GenerateMessageParams with seed %d: %s.", seed, err)
}
msg, err := NewSentMessage(params)
if err != nil {
@ -367,9 +364,9 @@ func TestRlpEncode(t *testing.T) {
}
func singlePaddingTest(t *testing.T, padSize int) {
params, err := generateMessageParams()
params, err := GenerateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d and sz=%d: %s.", seed, padSize, err)
t.Fatalf("failed GenerateMessageParams with seed %d and sz=%d: %s.", seed, padSize, err)
}
params.Padding = make([]byte, padSize)
params.PoW = 0.0000000001
@ -426,7 +423,7 @@ func TestPadding(t *testing.T) {
func TestPaddingAppendedToSymMessagesWithSignature(t *testing.T) {
params := &MessageParams{
Payload: make([]byte, 246),
KeySym: make([]byte, aesKeyLength),
KeySym: make([]byte, AESKeyLength),
}
pSrc, err := crypto.GenerateKey()
@ -495,15 +492,3 @@ func TestValidateAndParseSizeOfPayloadSize(t *testing.T) {
})
}
}
func TestEncodeDecodeVersionedResponse(t *testing.T) {
response := NewMessagesResponse(common.Hash{1}, []EnvelopeError{{Code: 1}})
b, err := rlp.EncodeToBytes(response)
require.NoError(t, err)
var mresponse MultiVersionResponse
require.NoError(t, rlp.DecodeBytes(b, &mresponse))
v1resp, err := mresponse.DecodeResponse1()
require.NoError(t, err)
require.Equal(t, response.Response.Hash, v1resp.Hash)
}

View file

@ -16,71 +16,69 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
prom "github.com/prometheus/client_golang/prometheus"
)
var (
envelopesReceivedCounter = prom.NewCounter(prom.CounterOpts{
EnvelopesReceivedCounter = prom.NewCounter(prom.CounterOpts{
Name: "waku_envelopes_received_total",
Help: "Number of envelopes received.",
})
envelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
EnvelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
Name: "waku_envelopes_validated_total",
Help: "Number of envelopes processed successfully.",
})
envelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
EnvelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku_envelopes_rejected_total",
Help: "Number of envelopes rejected.",
}, []string{"reason"})
envelopesCacheFailedCounter = prom.NewCounterVec(prom.CounterOpts{
EnvelopesCacheFailedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku_envelopes_cache_failures_total",
Help: "Number of envelopes which failed to be cached.",
}, []string{"type"})
envelopesCachedCounter = prom.NewCounterVec(prom.CounterOpts{
EnvelopesCachedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku_envelopes_cached_total",
Help: "Number of envelopes cached.",
}, []string{"cache"})
envelopesSizeMeter = prom.NewHistogram(prom.HistogramOpts{
EnvelopesSizeMeter = prom.NewHistogram(prom.HistogramOpts{
Name: "waku_envelopes_size_bytes",
Help: "Size of processed Waku envelopes in bytes.",
Buckets: prom.ExponentialBuckets(256, 4, 10),
})
// rate limiter metrics
rateLimitsProcessed = prom.NewCounter(prom.CounterOpts{
RateLimitsProcessed = prom.NewCounter(prom.CounterOpts{
Name: "waku_rate_limits_processed_total",
Help: "Number of packets Waku rate limiter processed.",
})
rateLimitsExceeded = prom.NewCounterVec(prom.CounterOpts{
RateLimitsExceeded = prom.NewCounterVec(prom.CounterOpts{
Name: "waku_rate_limits_exceeded_total",
Help: "Number of times the Waku rate limits were exceeded",
}, []string{"type"})
// bridging
bridgeSent = prom.NewCounter(prom.CounterOpts{
BridgeSent = prom.NewCounter(prom.CounterOpts{
Name: "waku_bridge_sent_total",
Help: "Number of envelopes bridged from Waku",
})
bridgeReceivedSucceed = prom.NewCounter(prom.CounterOpts{
BridgeReceivedSucceed = prom.NewCounter(prom.CounterOpts{
Name: "waku_bridge_received_success_total",
Help: "Number of envelopes bridged to Waku and successfully added",
})
bridgeReceivedFailed = prom.NewCounter(prom.CounterOpts{
BridgeReceivedFailed = prom.NewCounter(prom.CounterOpts{
Name: "waku_bridge_received_failure_total",
Help: "Number of envelopes bridged to Waku and failed to be added",
})
)
func init() {
prom.MustRegister(envelopesReceivedCounter)
prom.MustRegister(envelopesRejectedCounter)
prom.MustRegister(envelopesCacheFailedCounter)
prom.MustRegister(envelopesCachedCounter)
prom.MustRegister(envelopesSizeMeter)
prom.MustRegister(rateLimitsProcessed)
prom.MustRegister(rateLimitsExceeded)
prom.MustRegister(bridgeSent)
prom.MustRegister(bridgeReceivedSucceed)
prom.MustRegister(bridgeReceivedFailed)
prom.MustRegister(EnvelopesReceivedCounter)
prom.MustRegister(EnvelopesRejectedCounter)
prom.MustRegister(EnvelopesCacheFailedCounter)
prom.MustRegister(EnvelopesCachedCounter)
prom.MustRegister(EnvelopesSizeMeter)
prom.MustRegister(RateLimitsProcessed)
prom.MustRegister(RateLimitsExceeded)
prom.MustRegister(BridgeSent)
prom.MustRegister(BridgeReceivedSucceed)
prom.MustRegister(BridgeReceivedFailed)
}

94
waku/common/protocol.go Normal file
View file

@ -0,0 +1,94 @@
package common
import (
"net"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
type Peer interface {
// Start performs the handshake and initialize the broadcasting of messages
Start() error
Stop()
// Run start the polling loop
Run() error
// NotifyAboutPowRequirementChange notifies the peer that POW for the host has changed
NotifyAboutPowRequirementChange(float64) error
// NotifyAboutBloomFilterChange notifies the peer that bloom filter for the host has changed
NotifyAboutBloomFilterChange([]byte) error
// NotifyAboutTopicInterestChange notifies the peer that topics for the host have changed
NotifyAboutTopicInterestChange([]TopicType) error
// SetPeerTrusted sets the value of trusted, meaning we will
// allow p2p messages from them, which is necessary to interact
// with mailservers.
SetPeerTrusted(bool)
// SetRWWriter sets the socket to read/write
SetRWWriter(p2p.MsgReadWriter)
RequestHistoricMessages(*Envelope) error
SendMessagesRequest(MessagesRequest) error
SendHistoricMessageResponse([]byte) error
SendP2PMessages([]*Envelope) error
SendRawP2PDirect([]rlp.RawValue) error
// Mark marks an envelope known to the peer so that it won't be sent back.
Mark(*Envelope)
// Marked checks if an envelope is already known to the remote peer.
Marked(*Envelope) bool
ID() []byte
IP() net.IP
EnodeID() enode.ID
PoWRequirement() float64
BloomFilter() []byte
ConfirmationsEnabled() bool
}
// WakuHost is the local instance of waku, which both interacts with remote clients
// (peers) and local clients (through RPC API)
type WakuHost interface {
// MaxMessageSize returns the maximum accepted message size.
MaxMessageSize() uint32
// LightClientMode returns whether the host is running in light client mode
LightClientMode() bool
// Mailserver returns whether the host is running a mailserver
Mailserver() bool
// LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed
LightClientModeConnectionRestricted() bool
// ConfirmationsEnabled returns true if message confirmations are enabled.
ConfirmationsEnabled() bool
// RateLimits returns the current rate limits for the host
RateLimits() RateLimits
// MinPow returns the MinPow for the host
MinPow() float64
// BloomFilterMode returns whether the host is using bloom filter
BloomFilterMode() bool
// BloomFilter returns the bloom filter for the host
BloomFilter() []byte
//TopicInterest returns the topics for the host
TopicInterest() []TopicType
// IsEnvelopeCached checks if envelope with specific hash has already been received and cached.
IsEnvelopeCached(common.Hash) bool
// Envelopes returns all the envelopes queued
Envelopes() []*Envelope
SendEnvelopeEvent(EnvelopeEvent) int
// OnNewEnvelopes handles newly received envelopes from a peer
OnNewEnvelopes([]*Envelope, Peer) ([]EnvelopeError, error)
// OnNewP2PEnvelopes handles envelopes received though the P2P
// protocol (i.e from a mailserver in most cases)
OnNewP2PEnvelopes([]*Envelope, Peer) error
// OnMessagesResponse handles when the peer receive a message response
// from a mailserver
OnMessagesResponse(MessagesResponse, Peer) error
// OnMessagesRequest handles when the peer receive a message request
// this only works if the peer is a mailserver
OnMessagesRequest(MessagesRequest, Peer) error
OnBatchAcknowledged(common.Hash, Peer) error
OnP2PRequestCompleted([]byte, Peer) error
}

View file

@ -16,12 +16,13 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"bytes"
"errors"
"fmt"
"net"
"time"
"github.com/tsenart/tb"
@ -30,7 +31,12 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
)
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
type runLoop func(rw p2p.MsgReadWriter) error
type RateLimiterPeer interface {
ID() []byte
IP() net.IP
}
type RateLimiterHandler interface {
ExceedPeerLimit() error
@ -40,11 +46,11 @@ type RateLimiterHandler interface {
type MetricsRateLimiterHandler struct{}
func (MetricsRateLimiterHandler) ExceedPeerLimit() error {
rateLimitsExceeded.WithLabelValues("peer_id").Inc()
RateLimitsExceeded.WithLabelValues("peer_id").Inc()
return nil
}
func (MetricsRateLimiterHandler) ExceedIPLimit() error {
rateLimitsExceeded.WithLabelValues("ip").Inc()
RateLimitsExceeded.WithLabelValues("ip").Inc()
return nil
}
@ -105,8 +111,8 @@ type PeerRateLimiter struct {
peerIDThrottler *tb.Throttler
ipThrottler *tb.Throttler
limitPerSecIP int64
limitPerSecPeerID int64
LimitPerSecIP int64
LimitPerSecPeerID int64
whitelistedPeerIDs []enode.ID
whitelistedIPs []string
@ -123,15 +129,15 @@ func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandl
return &PeerRateLimiter{
peerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
ipThrottler: tb.NewThrottler(time.Millisecond * 100),
limitPerSecIP: cfg.LimitPerSecIP,
limitPerSecPeerID: cfg.LimitPerSecPeerID,
LimitPerSecIP: cfg.LimitPerSecIP,
LimitPerSecPeerID: cfg.LimitPerSecPeerID,
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
whitelistedIPs: cfg.WhitelistedIPs,
handlers: handlers,
}
}
func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoop) error {
func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runLoop runLoop) error {
in, out := p2p.MsgPipe()
defer in.Close()
defer out.Close()
@ -146,11 +152,13 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
return
}
rateLimitsProcessed.Inc()
RateLimitsProcessed.Inc()
var ip string
if p != nil && p.peer != nil {
ip = p.peer.Node().IP().String()
if p != nil {
// this relies on <nil> being the string representation of nil
// as IP() might return a nil value
ip = p.IP().String()
}
if halted := r.throttleIP(ip); halted {
for _, h := range r.handlers {
@ -197,7 +205,7 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
}()
go func() {
errC <- runLoop(p, out)
errC <- runLoop(out)
}()
return <-errC
@ -206,19 +214,19 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
// throttleIP throttles a number of messages incoming from a given IP.
// It allows 10 packets per second.
func (r *PeerRateLimiter) throttleIP(ip string) bool {
if r.limitPerSecIP == 0 {
if r.LimitPerSecIP == 0 {
return false
}
if stringSliceContains(r.whitelistedIPs, ip) {
return false
}
return r.ipThrottler.Halt(ip, 1, r.limitPerSecIP)
return r.ipThrottler.Halt(ip, 1, r.LimitPerSecIP)
}
// throttlePeer throttles a number of messages incoming from a peer.
// It allows 3 packets per second.
func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool {
if r.limitPerSecIP == 0 {
if r.LimitPerSecIP == 0 {
return false
}
var id enode.ID
@ -226,7 +234,7 @@ func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool {
if enodeIDSliceContains(r.whitelistedPeerIDs, id) {
return false
}
return r.peerIDThrottler.Halt(id.String(), 1, r.limitPerSecPeerID)
return r.peerIDThrottler.Halt(id.String(), 1, r.LimitPerSecPeerID)
}
func stringSliceContains(s []string, searched string) bool {

View file

@ -16,10 +16,11 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"bytes"
"net"
"testing"
"time"
@ -45,7 +46,7 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
}()
messages := make(chan p2p.Msg, 1)
runLoop := func(p *Peer, rw p2p.MsgReadWriter) error {
runLoop := func(rw p2p.MsgReadWriter) error {
msg, err := rw.ReadMsg()
if err != nil {
return err
@ -55,7 +56,7 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
}
r := NewPeerRateLimiter(nil, &mockRateLimiterHandler{})
err := r.decorate(nil, out, runLoop)
err := r.Decorate(nil, out, runLoop)
require.NoError(t, err)
receivedMsg := <-messages
@ -79,7 +80,7 @@ func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) {
func TestPeerLimiterHandler(t *testing.T) {
h := &mockRateLimiterHandler{}
r := NewPeerRateLimiter(nil, h)
p := &Peer{
p := &TestWakuPeer{
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
rw1, rw2 := p2p.MsgPipe()
@ -119,7 +120,7 @@ func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
WhitelistedIPs: []string{"<nil>"}, // no IP is represented as <nil> string
WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}},
}, h)
p := &Peer{
p := &TestWakuPeer{
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
rw1, rw2 := p2p.MsgPipe()
@ -151,8 +152,8 @@ func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
require.Equal(t, 0, h.exceedPeerLimit)
}
func echoMessages(r *PeerRateLimiter, p *Peer, rw p2p.MsgReadWriter) error {
return r.decorate(p, rw, func(p *Peer, rw p2p.MsgReadWriter) error {
func echoMessages(r *PeerRateLimiter, p RateLimiterPeer, rw p2p.MsgReadWriter) error {
return r.Decorate(p, rw, func(rw p2p.MsgReadWriter) error {
for {
msg, err := rw.ReadMsg()
if err != nil {
@ -173,3 +174,16 @@ type mockRateLimiterHandler struct {
func (m *mockRateLimiterHandler) ExceedPeerLimit() error { m.exceedPeerLimit++; return nil }
func (m *mockRateLimiterHandler) ExceedIPLimit() error { m.exceedIPLimit++; return nil }
type TestWakuPeer struct {
peer *p2p.Peer
}
func (p *TestWakuPeer) IP() net.IP {
return p.peer.Node().IP()
}
func (p *TestWakuPeer) ID() []byte {
id := p.peer.ID()
return id[:]
}

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"github.com/ethereum/go-ethereum/common/hexutil"

View file

@ -16,7 +16,7 @@
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
package common
import (
"encoding/json"

View file

@ -18,6 +18,10 @@
package waku
import (
"github.com/status-im/status-go/waku/common"
)
// Config represents the configuration state of a waku node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
@ -30,7 +34,7 @@ type Config struct {
}
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPoW: DefaultMinimumPoW,
MaxMessageSize: common.DefaultMaxMessageSize,
MinimumAcceptedPoW: common.DefaultMinimumPoW,
RestrictLightClientsConn: true,
}

View file

@ -23,7 +23,9 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/waku/common"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
)
@ -39,14 +41,14 @@ const (
// DeliverMail should use p2pMessageCode for delivery,
// in order to bypass the expiry checks.
type MailServer interface {
Archive(env *Envelope)
DeliverMail(peerID []byte, request *Envelope) // DEPRECATED; use Deliver()
Deliver(peerID []byte, request MessagesRequest)
Archive(env *common.Envelope)
DeliverMail(peerID []byte, request *common.Envelope) // DEPRECATED; use Deliver()
Deliver(peerID []byte, request common.MessagesRequest)
}
// MailServerResponse is the response payload sent by the mailserver.
type MailServerResponse struct {
LastEnvelopeHash common.Hash
LastEnvelopeHash gethcommon.Hash
Cursor []byte
Error error
}
@ -57,7 +59,7 @@ func invalidResponseSizeError(size int) error {
// CreateMailServerRequestCompletedPayload creates a payload representing
// a successful request to mailserver
func CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash common.Hash, cursor []byte) []byte {
func CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash gethcommon.Hash, cursor []byte) []byte {
payload := make([]byte, len(requestID))
copy(payload, requestID[:])
payload = append(payload, lastEnvelopeHash[:]...)
@ -67,7 +69,7 @@ func CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash common.
// CreateMailServerRequestFailedPayload creates a payload representing
// a failed request to a mailserver
func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []byte {
func CreateMailServerRequestFailedPayload(requestID gethcommon.Hash, err error) []byte {
payload := []byte(mailServerFailedPayloadPrefix)
payload = append(payload, requestID[:]...)
payload = append(payload, []byte(err.Error())...)
@ -79,8 +81,8 @@ func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []by
// * request completed successfully
// * request failed
// If the payload is unknown/unparseable, it returns `nil`
func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength {
func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
if len(payload) < gethcommon.HashLength {
return nil, invalidResponseSizeError(len(payload))
}
@ -94,8 +96,8 @@ func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, err
return tryCreateMailServerRequestCompletedEvent(nodeID, payload)
}
func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength+len(mailServerFailedPayloadPrefix) {
func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
if len(payload) < gethcommon.HashLength+len(mailServerFailedPayloadPrefix) {
return nil, nil
}
@ -106,17 +108,17 @@ func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*En
}
var (
requestID common.Hash
requestID gethcommon.Hash
errorMsg string
)
requestID, remainder = extractHash(remainder)
errorMsg = string(remainder)
event := EnvelopeEvent{
event := common.EnvelopeEvent{
Peer: nodeID,
Hash: requestID,
Event: EventMailServerRequestCompleted,
Event: common.EventMailServerRequestCompleted,
Data: &MailServerResponse{
Error: errors.New(errorMsg),
},
@ -126,7 +128,7 @@ func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*En
}
func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (*common.EnvelopeEvent, error) {
// check if payload is
// - requestID or
// - requestID + lastEnvelopeHash or
@ -134,19 +136,19 @@ func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (
// requestID is the hash of the request envelope.
// lastEnvelopeHash is the last envelope sent by the mail server
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
if len(payload) > common.HashLength*2+cursorSize {
if len(payload) > gethcommon.HashLength*2+cursorSize {
return nil, invalidResponseSizeError(len(payload))
}
var (
requestID common.Hash
lastEnvelopeHash common.Hash
requestID gethcommon.Hash
lastEnvelopeHash gethcommon.Hash
cursor []byte
)
requestID, remainder := extractHash(payload)
if len(remainder) >= common.HashLength {
if len(remainder) >= gethcommon.HashLength {
lastEnvelopeHash, remainder = extractHash(remainder)
}
@ -154,10 +156,10 @@ func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (
cursor = remainder
}
event := EnvelopeEvent{
event := common.EnvelopeEvent{
Peer: nodeID,
Hash: requestID,
Event: EventMailServerRequestCompleted,
Event: common.EventMailServerRequestCompleted,
Data: &MailServerResponse{
LastEnvelopeHash: lastEnvelopeHash,
Cursor: cursor,
@ -167,9 +169,9 @@ func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (
return &event, nil
}
func extractHash(payload []byte) (common.Hash, []byte) {
prefix, remainder := extractPrefix(payload, common.HashLength)
return common.BytesToHash(prefix), remainder
func extractHash(payload []byte) (gethcommon.Hash, []byte) {
prefix, remainder := extractPrefix(payload, gethcommon.HashLength)
return gethcommon.BytesToHash(prefix), remainder
}
func extractPrefix(payload []byte, size int) ([]byte, []byte) {

View file

@ -1,390 +0,0 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
import (
"bytes"
"fmt"
"math"
"sync"
"time"
mapset "github.com/deckarep/golang-set"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
// Peer represents a waku protocol peer connection.
type Peer struct {
host *Waku
peer *p2p.Peer
ws p2p.MsgReadWriter
logger *zap.Logger
trusted bool
powRequirement float64
// bloomMu is to allow thread safe access to
// the bloom filter
bloomMu sync.Mutex
bloomFilter []byte
// topicInterestMu is to allow thread safe access to
// the map of topic interests
topicInterestMu sync.Mutex
topicInterest map[TopicType]bool
// fullNode is used to indicate that the node will be accepting any
// envelope. The opposite is an "empty node" , which is when
// a bloom filter is all 0s or topic interest is an empty map (not nil).
// In that case no envelope is accepted.
fullNode bool
confirmationsEnabled bool
rateLimitsMu sync.Mutex
rateLimits RateLimits
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
quit chan struct{}
}
// newPeer creates a new waku peer object, but does not run the handshake itself.
func newPeer(host *Waku, remote *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) *Peer {
if logger == nil {
logger = zap.NewNop()
}
return &Peer{
host: host,
peer: remote,
ws: rw,
logger: logger,
trusted: false,
powRequirement: 0.0,
known: mapset.NewSet(),
quit: make(chan struct{}),
bloomFilter: MakeFullNodeBloom(),
fullNode: true,
}
}
// start initiates the peer updater, periodically broadcasting the waku packets
// into the network.
func (p *Peer) start() {
go p.update()
p.logger.Debug("starting peer", zap.Binary("peerID", p.ID()))
}
// stop terminates the peer updater, stopping message forwarding to it.
func (p *Peer) stop() {
close(p.quit)
p.logger.Debug("stopping peer", zap.Binary("peerID", p.ID()))
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
func (p *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
opts := p.host.toStatusOptions()
go func() {
errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, opts)
}()
// Fetch the remote status packet and verify protocol match
packet, err := p.ws.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
var (
peerProtocolVersion uint64
peerOptions statusOptions
)
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
if _, err := s.List(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
// Validate protocol version.
if err := s.Decode(&peerProtocolVersion); err != nil {
return fmt.Errorf("p [%x]: failed to decode peer protocol version: %v", p.ID(), err)
}
if peerProtocolVersion != ProtocolVersion {
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, ProtocolVersion)
}
// Decode and validate other status packet options.
if err := s.Decode(&peerOptions); err != nil {
return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err)
}
if err := s.ListEnd(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
if err := p.setOptions(peerOptions.WithDefaults()); err != nil {
return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err)
}
if err := <-errc; err != nil {
return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err)
}
return nil
}
func (p *Peer) setOptions(peerOptions statusOptions) error {
p.logger.Debug("settings options", zap.Binary("peerID", p.ID()), zap.Any("Options", peerOptions))
if err := peerOptions.Validate(); err != nil {
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
}
// Validate and save peer's PoW.
pow := peerOptions.PoWRequirementF()
if pow != nil {
if math.IsInf(*pow, 0) || math.IsNaN(*pow) || *pow < 0.0 {
return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID())
}
p.powRequirement = *pow
}
if peerOptions.TopicInterest != nil {
p.setTopicInterest(peerOptions.TopicInterest)
} else if peerOptions.BloomFilter != nil {
// Validate and save peer's bloom filters.
bloom := peerOptions.BloomFilter
bloomSize := len(bloom)
if bloomSize != 0 && bloomSize != BloomFilterSize {
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize)
}
p.setBloomFilter(bloom)
}
if peerOptions.LightNodeEnabled != nil {
// Validate and save other peer's options.
if *peerOptions.LightNodeEnabled && p.host.LightClientMode() && p.host.LightClientModeConnectionRestricted() {
return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID())
}
}
if peerOptions.ConfirmationsEnabled != nil {
p.confirmationsEnabled = *peerOptions.ConfirmationsEnabled
}
if peerOptions.RateLimits != nil {
p.setRateLimits(*peerOptions.RateLimits)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
func (p *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
// Loop and transmit until termination is requested
for {
select {
case <-expire.C:
p.expire()
case <-transmit.C:
if err := p.broadcast(); err != nil {
p.logger.Debug("broadcasting failed", zap.Binary("peer", p.ID()), zap.Error(err))
return
}
case <-p.quit:
return
}
}
}
// mark marks an envelope known to the peer so that it won't be sent back.
func (p *Peer) mark(envelope *Envelope) {
p.known.Add(envelope.Hash())
}
// marked checks if an envelope is already known to the remote peer.
func (p *Peer) marked(envelope *Envelope) bool {
return p.known.Contains(envelope.Hash())
}
// expire iterates over all the known envelopes in the host and removes all
// expired (unknown) ones from the known list.
func (p *Peer) expire() {
unmark := make(map[common.Hash]struct{})
p.known.Each(func(v interface{}) bool {
if !p.host.isEnvelopeCached(v.(common.Hash)) {
unmark[v.(common.Hash)] = struct{}{}
}
return true
})
// Dump all known but no longer cached
for hash := range unmark {
p.known.Remove(hash)
}
}
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (p *Peer) broadcast() error {
envelopes := p.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.topicOrBloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if len(bundle) == 0 {
return nil
}
batchHash, err := sendBundle(p.ws, bundle)
if err != nil {
p.logger.Debug("failed to deliver envelopes", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
p.mark(e)
event := EnvelopeEvent{
Event: EventEnvelopeSent,
Hash: e.Hash(),
Peer: p.peer.ID(),
}
if p.confirmationsEnabled {
event.Batch = batchHash
}
p.host.envelopeFeed.Send(event)
}
p.logger.Debug("broadcasted bundles successfully", zap.Binary("peer", p.ID()), zap.Int("count", len(bundle)))
return nil
}
// ID returns a peer's id
func (p *Peer) ID() []byte {
id := p.peer.ID()
return id[:]
}
func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.ws, statusUpdateCode, statusOptions{PoWRequirement: &i})
}
func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(p.ws, statusUpdateCode, statusOptions{BloomFilter: bloom})
}
func (p *Peer) notifyAboutTopicInterestChange(topics []TopicType) error {
return p2p.Send(p.ws, statusUpdateCode, statusOptions{TopicInterest: topics})
}
func (p *Peer) bloomMatch(env *Envelope) bool {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
return p.fullNode || BloomFilterMatch(p.bloomFilter, env.Bloom())
}
func (p *Peer) topicInterestMatch(env *Envelope) bool {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if p.topicInterest == nil {
return false
}
return p.topicInterest[env.Topic]
}
// topicOrBloomMatch matches against topic-interest if topic interest
// is not nil. Otherwise it will match against the bloom-filter.
// If the bloom-filter is nil, or full, the node is considered a full-node
// and any envelope will be accepted. An empty topic-interest (but not nil)
// signals that we are not interested in any envelope.
func (p *Peer) topicOrBloomMatch(env *Envelope) bool {
p.topicInterestMu.Lock()
topicInterestMode := p.topicInterest != nil
p.topicInterestMu.Unlock()
if topicInterestMode {
return p.topicInterestMatch(env)
}
return p.bloomMatch(env)
}
func (p *Peer) setBloomFilter(bloom []byte) {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
p.bloomFilter = bloom
p.fullNode = isFullNode(bloom)
if p.fullNode && p.bloomFilter == nil {
p.bloomFilter = MakeFullNodeBloom()
}
p.topicInterest = nil
}
func (p *Peer) setTopicInterest(topicInterest []TopicType) {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if topicInterest == nil {
p.topicInterest = nil
return
}
p.topicInterest = make(map[TopicType]bool)
for _, topic := range topicInterest {
p.topicInterest[topic] = true
}
p.fullNode = false
p.bloomFilter = nil
}
func (p *Peer) setRateLimits(r RateLimits) {
p.rateLimitsMu.Lock()
p.rateLimits = r
p.rateLimitsMu.Unlock()
}
func MakeFullNodeBloom() []byte {
bloom := make([]byte, BloomFilterSize)
for i := 0; i < BloomFilterSize; i++ {
bloom[i] = 0xFF
}
return bloom
}
func sendBundle(rw p2p.MsgWriter, bundle []*Envelope) (rst common.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
if err != nil {
return
}
return crypto.Keccak256Hash(data), nil
}

19
waku/v0/const.go Normal file
View file

@ -0,0 +1,19 @@
package v0
// Waku protocol parameters
const (
Version = uint64(0) // Peer version number
VersionStr = "0" // The same, as a string
Name = "waku" // Nickname of the protocol
// Waku protocol message codes, according to https://github.com/vacp2p/specs/blob/master/specs/waku/waku-0.md
StatusCode = 0 // used in the handshake
MessagesCode = 1 // regular message
StatusUpdateCode = 22 // update of settings
BatchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
MessageResponseCode = 12 // includes confirmation for delivery and information about errors
P2PRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
P2PRequestCode = 126 // peer-to-peer message, used by Dapp protocol
P2PMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
)

5
waku/v0/init.go Normal file
View file

@ -0,0 +1,5 @@
package v0
func init() {
initRLPKeyFields()
}

55
waku/v0/message.go Normal file
View file

@ -0,0 +1,55 @@
package v0
import (
"bytes"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
// MultiVersionResponse allows to decode response into chosen version.
type MultiVersionResponse struct {
Version uint
Response rlp.RawValue
}
// DecodeResponse1 decodes response into first version of the messages response.
func (m MultiVersionResponse) DecodeResponse1() (resp common.MessagesResponse, err error) {
return resp, rlp.DecodeBytes(m.Response, &resp)
}
// Version1MessageResponse first version of the message response.
type Version1MessageResponse struct {
Version uint
Response common.MessagesResponse
}
// NewMessagesResponse returns instance of the version messages response.
func NewMessagesResponse(batch gethcommon.Hash, errors []common.EnvelopeError) Version1MessageResponse {
return Version1MessageResponse{
Version: 1,
Response: common.MessagesResponse{
Hash: batch,
Errors: errors,
},
}
}
func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
Code: MessagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
if err != nil {
return
}
return crypto.Keccak256Hash(data), nil
}

42
waku/v0/message_test.go Normal file
View file

@ -0,0 +1,42 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package v0
import (
"testing"
"github.com/stretchr/testify/require"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
func TestEncodeDecodeVersionedResponse(t *testing.T) {
response := NewMessagesResponse(gethcommon.Hash{1}, []common.EnvelopeError{{Code: 1}})
bytes, err := rlp.EncodeToBytes(response)
require.NoError(t, err)
var mresponse MultiVersionResponse
require.NoError(t, rlp.DecodeBytes(bytes, &mresponse))
v1resp, err := mresponse.DecodeResponse1()
require.NoError(t, err)
require.Equal(t, response.Response.Hash, v1resp.Hash)
}

621
waku/v0/peer.go Normal file
View file

@ -0,0 +1,621 @@
package v0
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"net"
"sync"
"time"
"go.uber.org/zap"
mapset "github.com/deckarep/golang-set"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
type Peer struct {
host common.WakuHost
rw p2p.MsgReadWriter
p2pPeer *p2p.Peer
logger *zap.Logger
quit chan struct{}
trusted bool
powRequirement float64
// bloomMu is to allow thread safe access to
// the bloom filter
bloomMu sync.Mutex
bloomFilter []byte
// topicInterestMu is to allow thread safe access to
// the map of topic interests
topicInterestMu sync.Mutex
topicInterest map[common.TopicType]bool
// fullNode is used to indicate that the node will be accepting any
// envelope. The opposite is an "empty node" , which is when
// a bloom filter is all 0s or topic interest is an empty map (not nil).
// In that case no envelope is accepted.
fullNode bool
confirmationsEnabled bool
rateLimitsMu sync.Mutex
rateLimits common.RateLimits
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
}
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) *Peer {
if logger == nil {
logger = zap.NewNop()
}
return &Peer{
host: host,
p2pPeer: p2pPeer,
logger: logger,
rw: rw,
trusted: false,
powRequirement: 0.0,
known: mapset.NewSet(),
quit: make(chan struct{}),
bloomFilter: common.MakeFullNodeBloom(),
fullNode: true,
}
}
func (p *Peer) Start() error {
if err := p.handshake(); err != nil {
return err
}
go p.update()
p.logger.Debug("starting peer", zap.Binary("peerID", p.ID()))
return nil
}
func (p *Peer) Stop() {
close(p.quit)
p.logger.Debug("stopping peer", zap.Binary("peerID", p.ID()))
}
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{PoWRequirement: &i})
}
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{BloomFilter: bloom})
}
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error {
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{TopicInterest: topics})
}
func (p *Peer) SetPeerTrusted(trusted bool) {
p.trusted = trusted
}
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error {
return p2p.Send(p.rw, P2PRequestCode, envelope)
}
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error {
return p2p.Send(p.rw, P2PRequestCode, request)
}
func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
size, r, err := rlp.EncodeToReader(payload)
if err != nil {
return err
}
return p.rw.WriteMsg(p2p.Msg{Code: P2PRequestCompleteCode, Size: uint32(size), Payload: r})
}
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error {
return p2p.Send(p.rw, P2PMessageCode, envelopes)
}
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error {
return p2p.Send(p.rw, P2PMessageCode, envelopes)
}
func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) {
p.rw = rw
}
// Mark marks an envelope known to the peer so that it won't be sent back.
func (p *Peer) Mark(envelope *common.Envelope) {
p.known.Add(envelope.Hash())
}
// Marked checks if an envelope is already known to the remote peer.
func (p *Peer) Marked(envelope *common.Envelope) bool {
return p.known.Contains(envelope.Hash())
}
func (p *Peer) BloomFilter() []byte {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
bloomFilterCopy := make([]byte, len(p.bloomFilter))
copy(bloomFilterCopy, p.bloomFilter)
return bloomFilterCopy
}
func (p *Peer) PoWRequirement() float64 {
return p.powRequirement
}
func (p *Peer) ConfirmationsEnabled() bool {
return p.confirmationsEnabled
}
// ID returns a peer's id
func (p *Peer) ID() []byte {
id := p.p2pPeer.ID()
return id[:]
}
func (p *Peer) EnodeID() enode.ID {
return p.p2pPeer.ID()
}
func (p *Peer) IP() net.IP {
return p.p2pPeer.Node().IP()
}
func (p *Peer) Run() error {
logger := p.logger.Named("Run")
for {
// fetch the next packet
packet, err := p.rw.ReadMsg()
if err != nil {
logger.Info("failed to read a message", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
if packet.Size > p.host.MaxMessageSize() {
logger.Warn("oversize message received", zap.Binary("peer", p.ID()), zap.Uint32("size", packet.Size))
return errors.New("oversize message received")
}
if err := p.handlePacket(packet); err != nil {
logger.Warn("failed to handle packet message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
}
_ = packet.Discard()
}
}
func (p *Peer) handlePacket(packet p2p.Msg) error {
switch packet.Code {
case MessagesCode:
if err := p.handleMessagesCode(packet); err != nil {
p.logger.Warn("failed to handle MessagesCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case MessageResponseCode:
if err := p.handleMessageResponseCode(packet); err != nil {
p.logger.Warn("failed to handle MessageResponseCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case BatchAcknowledgedCode:
if err := p.handleBatchAcknowledgeCode(packet); err != nil {
p.logger.Warn("failed to handle BatchAcknowledgedCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case StatusUpdateCode:
if err := p.handleStatusUpdateCode(packet); err != nil {
p.logger.Warn("failed to decode status update message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PMessageCode:
if err := p.handleP2PMessageCode(packet); err != nil {
p.logger.Warn("failed to decode direct message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PRequestCode:
if err := p.handleP2PRequestCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PRequestCompleteCode:
if err := p.handleP2PRequestCompleteCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request complete message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
default:
// New message common might be implemented in the future versions of Waku.
// For forward compatibility, just ignore.
p.logger.Debug("ignored packet with message code", zap.Uint64("code", packet.Code))
}
return nil
}
func (p *Peer) handleMessagesCode(packet p2p.Msg) error {
// decode the contained envelopes
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
return fmt.Errorf("failed to read packet payload: %v", err)
}
var envelopes []*common.Envelope
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
return fmt.Errorf("invalid payload: %v", err)
}
envelopeErrors, err := p.host.OnNewEnvelopes(envelopes, p)
if p.host.ConfirmationsEnabled() {
go p.sendConfirmation(data, envelopeErrors) // nolint: errcheck
}
return err
}
func (p *Peer) handleMessageResponseCode(packet p2p.Msg) error {
var resp MultiVersionResponse
if err := packet.Decode(&resp); err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
return fmt.Errorf("invalid response message: %v", err)
}
if resp.Version != 1 {
p.logger.Info("received unsupported version of MultiVersionResponse for MessageResponseCode packet", zap.Uint("version", resp.Version))
return nil
}
response, err := resp.DecodeResponse1()
if err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
return fmt.Errorf("failed to decode response message: %v", err)
}
return p.host.OnMessagesResponse(response, p)
}
func (p *Peer) handleP2PRequestCode(packet p2p.Msg) error {
// Must be processed if mail server is implemented. Otherwise ignore.
if !p.host.Mailserver() {
return nil
}
// Read all data as we will try to decode it possibly twice.
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
return fmt.Errorf("invalid p2p request messages: %v", err)
}
r := bytes.NewReader(data)
packet.Payload = r
// As we failed to decode the request, let's set the offset
// to the beginning and try decode it again.
if _, err := r.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("invalid p2p request message: %v", err)
}
var request common.MessagesRequest
errReq := packet.Decode(&request)
if errReq == nil {
return p.host.OnMessagesRequest(request, p)
}
p.logger.Info("failed to decode p2p request message", zap.Binary("peer", p.ID()), zap.Error(errReq))
return errors.New("invalid p2p request message")
}
func (p *Peer) handleBatchAcknowledgeCode(packet p2p.Msg) error {
var batchHash gethcommon.Hash
if err := packet.Decode(&batchHash); err != nil {
return fmt.Errorf("invalid batch ack message: %v", err)
}
return p.host.OnBatchAcknowledged(batchHash, p)
}
func (p *Peer) handleStatusUpdateCode(packet p2p.Msg) error {
var StatusOptions StatusOptions
err := packet.Decode(&StatusOptions)
if err != nil {
p.logger.Error("failed to decode status-options", zap.Error(err))
common.EnvelopesRejectedCounter.WithLabelValues("invalid_settings_changed").Inc()
return err
}
return p.setOptions(StatusOptions)
}
func (p *Peer) handleP2PMessageCode(packet p2p.Msg) error {
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
// therefore might not satisfy the PoW, expiry and other requirements.
// these messages are only accepted from the trusted peer.
if !p.trusted {
return nil
}
var (
envelopes []*common.Envelope
err error
)
if err = packet.Decode(&envelopes); err != nil {
return fmt.Errorf("invalid direct message payload: %v", err)
}
return p.host.OnNewP2PEnvelopes(envelopes, p)
}
func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error {
if !p.trusted {
return nil
}
var payload []byte
if err := packet.Decode(&payload); err != nil {
return fmt.Errorf("invalid p2p request complete message: %v", err)
}
return p.host.OnP2PRequestCompleted(payload, p)
}
// sendConfirmation sends MessageResponseCode and BatchAcknowledgedCode messages.
func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) {
batchHash := crypto.Keccak256Hash(data)
err = p2p.Send(p.rw, MessageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
if err != nil {
return
}
err = p2p.Send(p.rw, BatchAcknowledgedCode, batchHash) // DEPRECATED
return
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
func (p *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
opts := StatusOptionsFromHost(p.host)
go func() {
errc <- p2p.SendItems(p.rw, StatusCode, Version, opts)
}()
// Fetch the remote status packet and verify protocol match
packet, err := p.rw.ReadMsg()
if err != nil {
return err
}
if packet.Code != StatusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
var (
peerProtocolVersion uint64
peerOptions StatusOptions
)
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
if _, err := s.List(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
// Validate protocol version.
if err := s.Decode(&peerProtocolVersion); err != nil {
return fmt.Errorf("p [%x]: failed to decode peer protocol version: %v", p.ID(), err)
}
if peerProtocolVersion != Version {
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, Version)
}
// Decode and validate other status packet options.
if err := s.Decode(&peerOptions); err != nil {
return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err)
}
if err := s.ListEnd(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
if err := p.setOptions(peerOptions.WithDefaults()); err != nil {
return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err)
}
if err := <-errc; err != nil {
return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
func (p *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(common.ExpirationCycle)
transmit := time.NewTicker(common.TransmissionCycle)
// Loop and transmit until termination is requested
for {
select {
case <-expire.C:
p.expire()
case <-transmit.C:
if err := p.broadcast(); err != nil {
p.logger.Debug("broadcasting failed", zap.Binary("peer", p.ID()), zap.Error(err))
return
}
case <-p.quit:
return
}
}
}
func (p *Peer) setOptions(peerOptions StatusOptions) error {
p.logger.Debug("settings options", zap.Binary("peerID", p.ID()), zap.Any("Options", peerOptions))
if err := peerOptions.Validate(); err != nil {
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
}
// Validate and save peer's PoW.
pow := peerOptions.PoWRequirementF()
if pow != nil {
if math.IsInf(*pow, 0) || math.IsNaN(*pow) || *pow < 0.0 {
return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID())
}
p.powRequirement = *pow
}
if peerOptions.TopicInterest != nil {
p.setTopicInterest(peerOptions.TopicInterest)
} else if peerOptions.BloomFilter != nil {
// Validate and save peer's bloom filters.
bloom := peerOptions.BloomFilter
bloomSize := len(bloom)
if bloomSize != 0 && bloomSize != common.BloomFilterSize {
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize)
}
p.setBloomFilter(bloom)
}
if peerOptions.LightNodeEnabled != nil {
// Validate and save other peer's options.
if *peerOptions.LightNodeEnabled && p.host.LightClientMode() && p.host.LightClientModeConnectionRestricted() {
return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID())
}
}
if peerOptions.ConfirmationsEnabled != nil {
p.confirmationsEnabled = *peerOptions.ConfirmationsEnabled
}
if peerOptions.RateLimits != nil {
p.setRateLimits(*peerOptions.RateLimits)
}
return nil
}
// expire iterates over all the known envelopes in the host and removes all
// expired (unknown) ones from the known list.
func (p *Peer) expire() {
unmark := make(map[gethcommon.Hash]struct{})
p.known.Each(func(v interface{}) bool {
if !p.host.IsEnvelopeCached(v.(gethcommon.Hash)) {
unmark[v.(gethcommon.Hash)] = struct{}{}
}
return true
})
// Dump all known but no longer cached
for hash := range unmark {
p.known.Remove(hash)
}
}
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (p *Peer) broadcast() error {
envelopes := p.host.Envelopes()
bundle := make([]*common.Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !p.Marked(envelope) && envelope.PoW() >= p.powRequirement && p.topicOrBloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if len(bundle) == 0 {
return nil
}
batchHash, err := sendBundle(p.rw, bundle)
if err != nil {
p.logger.Debug("failed to deliver envelopes", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
p.Mark(e)
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: e.Hash(),
Peer: p.EnodeID(),
}
if p.confirmationsEnabled {
event.Batch = batchHash
}
p.host.SendEnvelopeEvent(event)
}
p.logger.Debug("broadcasted bundles successfully", zap.Binary("peer", p.ID()), zap.Int("count", len(bundle)))
return nil
}
func (p *Peer) setBloomFilter(bloom []byte) {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
p.bloomFilter = bloom
p.fullNode = common.IsFullNode(bloom)
if p.fullNode && p.bloomFilter == nil {
p.bloomFilter = common.MakeFullNodeBloom()
}
p.topicInterest = nil
}
func (p *Peer) setTopicInterest(topicInterest []common.TopicType) {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if topicInterest == nil {
p.topicInterest = nil
return
}
p.topicInterest = make(map[common.TopicType]bool)
for _, topic := range topicInterest {
p.topicInterest[topic] = true
}
p.fullNode = false
p.bloomFilter = nil
}
func (p *Peer) setRateLimits(r common.RateLimits) {
p.rateLimitsMu.Lock()
p.rateLimits = r
p.rateLimitsMu.Unlock()
}
// topicOrBloomMatch matches against topic-interest if topic interest
// is not nil. Otherwise it will match against the bloom-filter.
// If the bloom-filter is nil, or full, the node is considered a full-node
// and any envelope will be accepted. An empty topic-interest (but not nil)
// signals that we are not interested in any envelope.
func (p *Peer) topicOrBloomMatch(env *common.Envelope) bool {
p.topicInterestMu.Lock()
topicInterestMode := p.topicInterest != nil
p.topicInterestMu.Unlock()
if topicInterestMode {
return p.topicInterestMatch(env)
}
return p.bloomMatch(env)
}
func (p *Peer) topicInterestMatch(env *common.Envelope) bool {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if p.topicInterest == nil {
return false
}
return p.topicInterest[env.Topic]
}
func (p *Peer) bloomMatch(env *common.Envelope) bool {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
return p.fullNode || common.BloomFilterMatch(p.bloomFilter, env.Bloom())
}

60
waku/v0/peer_test.go Normal file
View file

@ -0,0 +1,60 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package v0
import (
"testing"
"github.com/status-im/status-go/waku/common"
)
var sharedTopic = common.TopicType{0xF, 0x1, 0x2, 0}
var wrongTopic = common.TopicType{0, 0, 0, 0}
//two generic waku node handshake. one don't send light flag
func TestTopicOrBloomMatch(t *testing.T) {
p := Peer{}
p.setTopicInterest([]common.TopicType{sharedTopic})
envelope := &common.Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &common.Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}
func TestTopicOrBloomMatchFullNode(t *testing.T) {
p := Peer{}
// Set as full node
p.fullNode = true
p.setTopicInterest([]common.TopicType{sharedTopic})
envelope := &common.Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &common.Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}

View file

@ -1,4 +1,4 @@
package waku
package v0
import (
"errors"
@ -9,9 +9,11 @@ import (
"strings"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
// statusOptionKey is a current type used in statusOptions as a key.
// statusOptionKey is a current type used in StatusOptions as a key.
type statusOptionKey string
var (
@ -20,24 +22,49 @@ var (
keyFieldIdx = make(map[statusOptionKey]int)
)
// statusOptions defines additional information shared between peers
// StatusOptions defines additional information shared between peers
// during the handshake.
// There might be more options provided then fields in statusOptions
// There might be more options provided then fields in StatusOptions
// and they should be ignored during deserialization to stay forward compatible.
// In the case of RLP, options should be serialized to an array of tuples
// where the first item is a field name and the second is a RLP-serialized value.
type statusOptions struct {
PoWRequirement *uint64 `rlp:"key=0"` // RLP does not support float64 natively
BloomFilter []byte `rlp:"key=1"`
LightNodeEnabled *bool `rlp:"key=2"`
ConfirmationsEnabled *bool `rlp:"key=3"`
RateLimits *RateLimits `rlp:"key=4"`
TopicInterest []TopicType `rlp:"key=5"`
type StatusOptions struct {
PoWRequirement *uint64 `rlp:"key=0"` // RLP does not support float64 natively
BloomFilter []byte `rlp:"key=1"`
LightNodeEnabled *bool `rlp:"key=2"`
ConfirmationsEnabled *bool `rlp:"key=3"`
RateLimits *common.RateLimits `rlp:"key=4"`
TopicInterest []common.TopicType `rlp:"key=5"`
}
func StatusOptionsFromHost(host common.WakuHost) StatusOptions {
opts := StatusOptions{}
rateLimits := host.RateLimits()
opts.RateLimits = &rateLimits
lightNode := host.LightClientMode()
opts.LightNodeEnabled = &lightNode
minPoW := host.MinPow()
opts.SetPoWRequirementFromF(minPoW)
confirmationsEnabled := host.ConfirmationsEnabled()
opts.ConfirmationsEnabled = &confirmationsEnabled
bloomFilterMode := host.BloomFilterMode()
if bloomFilterMode {
opts.BloomFilter = host.BloomFilter()
} else {
opts.TopicInterest = host.TopicInterest()
}
return opts
}
// initFLPKeyFields initialises the values of `idxFieldKey` and `keyFieldIdx`
func initRLPKeyFields() {
o := statusOptions{}
o := StatusOptions{}
v := reflect.ValueOf(o)
for i := 0; i < v.NumField(); i++ {
@ -67,7 +94,7 @@ func initRLPKeyFields() {
// WithDefaults adds the default values for a given peer.
// This are not the host default values, but the default values that ought to
// be used when receiving from an update from a peer.
func (o statusOptions) WithDefaults() statusOptions {
func (o StatusOptions) WithDefaults() StatusOptions {
if o.PoWRequirement == nil {
o.PoWRequirement = &defaultMinPoW
}
@ -83,17 +110,17 @@ func (o statusOptions) WithDefaults() statusOptions {
}
if o.RateLimits == nil {
o.RateLimits = &RateLimits{}
o.RateLimits = &common.RateLimits{}
}
if o.BloomFilter == nil {
o.BloomFilter = MakeFullNodeBloom()
o.BloomFilter = common.MakeFullNodeBloom()
}
return o
}
func (o statusOptions) PoWRequirementF() *float64 {
func (o StatusOptions) PoWRequirementF() *float64 {
if o.PoWRequirement == nil {
return nil
}
@ -101,12 +128,12 @@ func (o statusOptions) PoWRequirementF() *float64 {
return &result
}
func (o *statusOptions) SetPoWRequirementFromF(val float64) {
func (o *StatusOptions) SetPoWRequirementFromF(val float64) {
requirement := math.Float64bits(val)
o.PoWRequirement = &requirement
}
func (o statusOptions) EncodeRLP(w io.Writer) error {
func (o StatusOptions) EncodeRLP(w io.Writer) error {
v := reflect.ValueOf(o)
var optionsList []interface{}
for i := 0; i < v.NumField(); i++ {
@ -125,7 +152,7 @@ func (o statusOptions) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, optionsList)
}
func (o *statusOptions) DecodeRLP(s *rlp.Stream) error {
func (o *StatusOptions) DecodeRLP(s *rlp.Stream) error {
_, err := s.List()
if err != nil {
return fmt.Errorf("expected an outer list: %v", err)
@ -154,7 +181,7 @@ loop:
// a higher index.
idx, ok := keyFieldIdx[key]
if !ok {
// Read the rest of the list items and dump them.
// Read the rest of the list items and dump peer.
_, err := s.Raw()
if err != nil {
return fmt.Errorf("failed to read the value of key %s: %v", key, err)
@ -172,8 +199,8 @@ loop:
return s.ListEnd()
}
func (o statusOptions) Validate() error {
if len(o.TopicInterest) > 1000 {
func (o StatusOptions) Validate() error {
if len(o.TopicInterest) > 10000 {
return errors.New("topic interest is limited by 1000 items")
}
return nil

View file

@ -1,4 +1,4 @@
package waku
package v0
import (
"math"
@ -7,29 +7,31 @@ import (
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
func TestEncodeDecodeRLP(t *testing.T) {
initRLPKeyFields()
pow := math.Float64bits(6.02)
lightNodeEnabled := true
confirmationsEnabled := true
opts := statusOptions{
opts := StatusOptions{
PoWRequirement: &pow,
BloomFilter: TopicToBloom(TopicType{0xaa, 0xbb, 0xcc, 0xdd}),
BloomFilter: common.TopicToBloom(common.TopicType{0xaa, 0xbb, 0xcc, 0xdd}),
LightNodeEnabled: &lightNodeEnabled,
ConfirmationsEnabled: &confirmationsEnabled,
RateLimits: &RateLimits{
RateLimits: &common.RateLimits{
IPLimits: 10,
PeerIDLimits: 5,
TopicLimits: 1,
},
TopicInterest: []TopicType{{0x01}, {0x02}, {0x03}, {0x04}},
TopicInterest: []common.TopicType{{0x01}, {0x02}, {0x03}, {0x04}},
}
data, err := rlp.EncodeToBytes(opts)
require.NoError(t, err)
var optsDecoded statusOptions
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
require.EqualValues(t, opts, optsDecoded)
@ -42,11 +44,11 @@ func TestBackwardCompatibility(t *testing.T) {
data, err := rlp.EncodeToBytes(alist)
require.NoError(t, err)
var optsDecoded statusOptions
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
pow := math.Float64bits(2.05)
require.EqualValues(t, statusOptions{PoWRequirement: &pow}, optsDecoded)
require.EqualValues(t, StatusOptions{PoWRequirement: &pow}, optsDecoded)
}
func TestForwardCompatibility(t *testing.T) {
@ -58,10 +60,10 @@ func TestForwardCompatibility(t *testing.T) {
data, err := rlp.EncodeToBytes(alist)
require.NoError(t, err)
var optsDecoded statusOptions
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
require.EqualValues(t, statusOptions{PoWRequirement: &pow}, optsDecoded)
require.EqualValues(t, StatusOptions{PoWRequirement: &pow}, optsDecoded)
}
func TestInitRLPKeyFields(t *testing.T) {

View file

@ -29,13 +29,17 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
"github.com/stretchr/testify/require"
)
var keys = []string{
@ -81,7 +85,7 @@ type TestData struct {
}
type TestNode struct {
shh *Waku
waku *Waku
id *ecdsa.PrivateKey
server *p2p.Server
filerID string
@ -93,8 +97,8 @@ var result TestData
var nodes [NumNodes]*TestNode
var sharedKey = hexutil.MustDecode("0x03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31")
var wrongKey = hexutil.MustDecode("0xf91156714d7ec88d3edc1c652c2181dbb3044e8771c683f3b30d33c12b986b11")
var sharedTopic = TopicType{0xF, 0x1, 0x2, 0}
var wrongTopic = TopicType{0, 0, 0, 0}
var sharedTopic = common.TopicType{0xF, 0x1, 0x2, 0}
var wrongTopic = common.TopicType{0, 0, 0, 0}
var expectedMessage = []byte("per aspera ad astra")
var unexpectedMessage = []byte("per rectum ad astra")
var masterBloomFilter []byte
@ -143,31 +147,31 @@ func TestSimulationBloomFilter(t *testing.T) {
func resetParams(t *testing.T) {
// change pow only for node zero
masterPow = 7777777.0
_ = nodes[0].shh.SetMinimumPoW(masterPow, true)
_ = nodes[0].waku.SetMinimumPoW(masterPow, true)
// change bloom for all nodes
masterBloomFilter = TopicToBloom(sharedTopic)
masterBloomFilter = common.TopicToBloom(sharedTopic)
for i := 0; i < NumNodes; i++ {
_ = nodes[i].shh.SetBloomFilter(masterBloomFilter)
_ = nodes[i].waku.SetBloomFilter(masterBloomFilter)
}
round++
}
func initBloom(t *testing.T) {
masterBloomFilter = make([]byte, BloomFilterSize)
masterBloomFilter = make([]byte, common.BloomFilterSize)
_, err := mrand.Read(masterBloomFilter) // nolint: gosec
if err != nil {
t.Fatalf("rand failed: %s.", err)
}
msgBloom := TopicToBloom(sharedTopic)
msgBloom := common.TopicToBloom(sharedTopic)
masterBloomFilter = addBloom(masterBloomFilter, msgBloom)
for i := 0; i < 32; i++ {
masterBloomFilter[i] = 0xFF
}
if !BloomFilterMatch(masterBloomFilter, msgBloom) {
if !common.BloomFilterMatch(masterBloomFilter, msgBloom) {
t.Fatalf("bloom mismatch on initBloom.")
}
}
@ -179,22 +183,22 @@ func initializeBloomFilterMode(t *testing.T) {
for i := 0; i < NumNodes; i++ {
var node TestNode
b := make([]byte, BloomFilterSize)
b := make([]byte, common.BloomFilterSize)
copy(b, masterBloomFilter)
config := DefaultConfig
config.BloomFilterMode = true
node.shh = New(&config, nil)
_ = node.shh.SetMinimumPoW(masterPow, false)
_ = node.shh.SetBloomFilter(b)
if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) {
node.waku = New(&config, nil)
_ = node.waku.SetMinimumPoW(masterPow, false)
_ = node.waku.SetBloomFilter(b)
if !bytes.Equal(node.waku.BloomFilter(), masterBloomFilter) {
t.Fatalf("bloom mismatch on init.")
}
_ = node.shh.Start(nil)
topics := make([]TopicType, 0)
_ = node.waku.Start(nil)
topics := make([]common.TopicType, 0)
topics = append(topics, sharedTopic)
f := Filter{KeySym: sharedKey, Messages: NewMemoryMessageStore()}
f := common.Filter{KeySym: sharedKey, Messages: common.NewMemoryMessageStore()}
f.Topics = [][]byte{topics[0][:]}
node.filerID, err = node.shh.Subscribe(&f)
node.filerID, err = node.waku.Subscribe(&f)
if err != nil {
t.Fatalf("failed to install the filter: %s.", err)
}
@ -202,14 +206,14 @@ func initializeBloomFilterMode(t *testing.T) {
if err != nil {
t.Fatalf("failed convert the key: %s.", keys[i])
}
name := common.MakeName("waku-go", "2.0")
name := gethcommon.MakeName("waku-go", "2.0")
node.server = &p2p.Server{
Config: p2p.Config{
PrivateKey: node.id,
MaxPeers: NumNodes/2 + 1,
Name: name,
Protocols: node.shh.Protocols(),
Protocols: node.waku.Protocols(),
ListenAddr: "127.0.0.1:0",
NAT: nat.Any(),
},
@ -245,8 +249,8 @@ func stopServers() {
for i := 0; i < NumNodes; i++ {
n := nodes[i]
if n != nil {
_ = n.shh.Unsubscribe(n.filerID)
_ = n.shh.Stop()
_ = n.waku.Unsubscribe(n.filerID)
_ = n.waku.Stop()
n.server.Stop()
}
}
@ -269,7 +273,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
for j := 0; j < iterations; j++ {
for i := first; i < NumNodes; i++ {
f := nodes[i].shh.GetFilter(nodes[i].filerID)
f := nodes[i].waku.GetFilter(nodes[i].filerID)
if f == nil {
t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerID, i, round)
}
@ -288,7 +292,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
}
if !includingNodeZero {
f := nodes[0].shh.GetFilter(nodes[0].filerID)
f := nodes[0].waku.GetFilter(nodes[0].filerID)
if f != nil {
t.Fatalf("node zero received a message with low PoW.")
}
@ -297,7 +301,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
t.Fatalf("Test was not complete (%d round): timeout %d seconds. nodes=%v", round, iterations*cycle/1000, nodes)
}
func validateMail(t *testing.T, index int, mail []*ReceivedMessage) {
func validateMail(t *testing.T, index int, mail []*common.ReceivedMessage) {
var cnt int
for _, m := range mail {
if bytes.Equal(m.Payload, expectedMessage) {
@ -329,7 +333,7 @@ func checkTestStatus() {
for i := 0; i < NumNodes; i++ {
arr[i] = nodes[i].server.PeerCount()
envelopes := nodes[i].shh.Envelopes()
envelopes := nodes[i].waku.Envelopes()
if len(envelopes) >= NumNodes {
cnt++
}
@ -356,7 +360,7 @@ func isTestComplete() bool {
}
for i := 0; i < NumNodes; i++ {
envelopes := nodes[i].shh.Envelopes()
envelopes := nodes[i].waku.Envelopes()
if len(envelopes) < NumNodes+1 {
return false
}
@ -370,7 +374,7 @@ func sendMsg(t *testing.T, expected bool, id int) {
return
}
opt := MessageParams{KeySym: sharedKey, Topic: sharedTopic, Payload: expectedMessage, PoW: 0.00000001, WorkTime: 1}
opt := common.MessageParams{KeySym: sharedKey, Topic: sharedTopic, Payload: expectedMessage, PoW: 0.00000001, WorkTime: 1}
if !expected {
opt.KeySym = wrongKey
opt.Topic = wrongTopic
@ -378,7 +382,7 @@ func sendMsg(t *testing.T, expected bool, id int) {
opt.Payload[0] = byte(id)
}
msg, err := NewSentMessage(&opt)
msg, err := common.NewSentMessage(&opt)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
@ -387,7 +391,7 @@ func sendMsg(t *testing.T, expected bool, id int) {
t.Fatalf("failed to seal message: %s", err)
}
err = nodes[id].shh.Send(envelope)
err = nodes[id].waku.Send(envelope)
if err != nil {
t.Fatalf("failed to send message: %s", err)
}
@ -402,7 +406,7 @@ func TestPeerBasic(t *testing.T) {
}
params.PoW = 0.001
msg, err := NewSentMessage(params)
msg, err := common.NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
@ -411,9 +415,9 @@ func TestPeerBasic(t *testing.T) {
t.Fatalf("failed Wrap with seed %d.", seed)
}
p := newPeer(nil, nil, nil, nil)
p.mark(env)
if !p.marked(env) {
p := v0.NewPeer(nil, nil, nil, nil)
p.Mark(env)
if !p.Marked(env) {
t.Fatalf("failed mark with seed %d.", seed)
}
}
@ -433,10 +437,10 @@ func checkPowExchangeForNodeZero(t *testing.T) {
func checkPowExchangeForNodeZeroOnce(t *testing.T, mustPass bool) bool {
cnt := 0
for i, node := range nodes {
for peer := range node.shh.peers {
if peer.peer.ID() == nodes[0].server.Self().ID() {
for protocol := range node.waku.peers {
if protocol.EnodeID() == nodes[0].server.Self().ID() {
cnt++
if peer.powRequirement != masterPow {
if protocol.PoWRequirement() != masterPow {
if mustPass {
t.Fatalf("node %d: failed to set the new pow requirement for node zero.", i)
} else {
@ -454,11 +458,11 @@ func checkPowExchangeForNodeZeroOnce(t *testing.T, mustPass bool) bool {
func checkPowExchange(t *testing.T) {
for i, node := range nodes {
for peer := range node.shh.peers {
if peer.peer.ID() != nodes[0].server.Self().ID() {
if peer.powRequirement != masterPow {
for protocol := range node.waku.peers {
if protocol.EnodeID() != nodes[0].server.Self().ID() {
if protocol.PoWRequirement() != masterPow {
t.Fatalf("node %d: failed to exchange pow requirement in round %d; expected %f, got %f",
i, round, masterPow, peer.powRequirement)
i, round, masterPow, protocol.PoWRequirement())
}
}
}
@ -467,14 +471,12 @@ func checkPowExchange(t *testing.T) {
func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool {
for i, node := range nodes {
for peer := range node.shh.peers {
peer.bloomMu.Lock()
equals := bytes.Equal(peer.bloomFilter, masterBloomFilter)
peer.bloomMu.Unlock()
for protocol := range node.waku.peers {
equals := bytes.Equal(protocol.BloomFilter(), masterBloomFilter)
if !equals {
if mustPass {
t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
i, round, masterBloomFilter, peer.bloomFilter)
i, round, masterBloomFilter, protocol.BloomFilter())
} else {
return false
}
@ -512,125 +514,82 @@ func waitForServersToStart(t *testing.T) {
//two generic waku node handshake
func TestPeerHandshakeWithTwoFullNode(t *testing.T) {
w1 := Waku{}
var pow uint64 = 123
p1 := newPeer(
&w1,
p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}),
&rwStub{[]interface{}{
ProtocolVersion,
statusOptions{PoWRequirement: &pow},
}},
nil,
)
err := p1.handshake()
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
var pow float64 = 0.1
err := w1.SetMinimumPoW(pow, true)
if err != nil {
t.Fatal(err)
}
w2 := New(nil, nil)
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err = p2.Start()
if err != nil {
t.Fatal(err)
}
require.Equal(t, pow, p2.PoWRequirement())
}
//two generic waku node handshake. one don't send light flag
func TestHandshakeWithOldVersionWithoutLightModeFlag(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w2 := New(nil, nil)
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err := p2.Start()
if err != nil {
t.Fatal(err)
}
}
//two generic waku node handshake. one don't send light flag
func TestHandshakeWithOldVersionWithoutLightModeFlag(t *testing.T) {
w1 := Waku{}
var pow uint64 = 123
p1 := newPeer(
&w1,
p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}),
&rwStub{[]interface{}{
ProtocolVersion,
statusOptions{PoWRequirement: &pow},
}},
nil,
)
err := p1.handshake()
if err != nil {
t.Fatal()
}
}
//two generic waku node handshake. one don't send light flag
func TestTopicOrBloomMatch(t *testing.T) {
p := Peer{}
p.setTopicInterest([]TopicType{sharedTopic})
envelope := &Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}
func TestTopicOrBloomMatchFullNode(t *testing.T) {
p := Peer{}
// Set as full node
p.fullNode = true
p.setTopicInterest([]TopicType{sharedTopic})
envelope := &Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}
//two light nodes handshake. restriction disabled
//two light nodes handshake. restriction enable
func TestTwoLightPeerHandshakeRestrictionOff(t *testing.T) {
w1 := Waku{}
w1.settings.RestrictLightClientsConn = false
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
var pow uint64 = 123
var lightNodeEnabled = true
p1 := newPeer(
&w1,
p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}),
&rwStub{[]interface{}{
ProtocolVersion,
statusOptions{PoWRequirement: &pow, LightNodeEnabled: &lightNodeEnabled},
}},
nil,
)
err := p1.handshake()
if err != nil {
t.FailNow()
}
w1.settings.RestrictLightClientsConn = false
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = false
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
require.NoError(t, p2.Start())
}
//two light nodes handshake. restriction enabled
func TestTwoLightPeerHandshakeError(t *testing.T) {
w1 := Waku{}
w1.settings.RestrictLightClientsConn = true
rw1, rw2 := p2p.MsgPipe()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
p1 := newPeer(
&w1,
p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}),
&rwStub{[]interface{}{ProtocolVersion, uint64(123), make([]byte, BloomFilterSize), true}},
nil,
)
err := p1.handshake()
if err == nil {
t.FailNow()
}
}
w1.settings.RestrictLightClientsConn = true
type rwStub struct {
payload []interface{}
}
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = true
func (stub *rwStub) ReadMsg() (p2p.Msg, error) {
size, r, err := rlp.EncodeToReader(stub.payload)
if err != nil {
return p2p.Msg{}, err
}
return p2p.Msg{Code: statusCode, Size: uint32(size), Payload: r}, nil
}
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
func (stub *rwStub) WriteMsg(m p2p.Msg) error {
return nil
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
require.Error(t, p2.Start())
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff