From e32c5546e1e5d13f597d2c4d56c92cf831d35ea8 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Sat, 25 Nov 2023 23:24:20 +0000 Subject: [PATCH] test: request community from storenode (#4364) * feat: request community info from storenode test * shutdownWaitGroup * fix requestCommunityInfoFromMailserver timestamp roundin --- protocol/common/shard/shard.go | 3 +- .../communities_messenger_helpers_test.go | 82 ---------- ...nities_messenger_token_permissions_test.go | 3 +- protocol/messenger.go | 76 +++++++--- protocol/messenger_communities.go | 11 +- protocol/messenger_config.go | 8 +- protocol/messenger_handler.go | 8 +- protocol/messenger_mailserver_cycle.go | 139 ++++++++--------- protocol/messenger_messages_tracking_test.go | 2 +- protocol/messenger_storenode_request_test.go | 140 ++++++++++++++++++ protocol/messenger_sync_raw_messages.go | 15 +- protocol/messenger_testing_utils.go | 138 +++++++++++++++++ protocol/transport/filters_manager.go | 7 +- services/ext/service.go | 40 +---- wakuv2/config.go | 3 +- wakuv2/waku.go | 14 +- 16 files changed, 444 insertions(+), 245 deletions(-) create mode 100644 protocol/messenger_storenode_request_test.go diff --git a/protocol/common/shard/shard.go b/protocol/common/shard/shard.go index c1354aa0e..efbbd75a9 100644 --- a/protocol/common/shard/shard.go +++ b/protocol/common/shard/shard.go @@ -1,8 +1,9 @@ package shard import ( - "github.com/status-im/status-go/protocol/protobuf" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" + + "github.com/status-im/status-go/protocol/protobuf" ) type Shard struct { diff --git a/protocol/communities_messenger_helpers_test.go b/protocol/communities_messenger_helpers_test.go index d3735a85b..82ef20631 100644 --- a/protocol/communities_messenger_helpers_test.go +++ b/protocol/communities_messenger_helpers_test.go @@ -5,8 +5,6 @@ import ( "crypto/ecdsa" "encoding/json" "errors" - "os" - "io/ioutil" "sync" "time" @@ -21,7 +19,6 @@ import ( "github.com/status-im/status-go/account" "github.com/status-im/status-go/account/generator" "github.com/status-im/status-go/appdatabase" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts" @@ -29,7 +26,6 @@ import ( "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/common" - "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" @@ -38,7 +34,6 @@ import ( walletToken "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/transactions" - waku "github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/walletdatabase" ) @@ -146,83 +141,6 @@ func (c *CollectiblesServiceMock) DeploymentSignatureDigest(chainID uint64, addr return gethcommon.Hex2Bytes("ccbb375343347491706cf4b43796f7b96ccc89c9e191a8b78679daeba1684ec7"), nil } -func newWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool) *waku.Waku { - config := &waku.Config{ - DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(), - } - - var onPeerStats func(connStatus types.ConnStatus) - var connStatusChan chan struct{} - if !useLocalWaku { - enrTreeAddress := testENRBootstrap - envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") - if envEnrTreeAddress != "" { - enrTreeAddress = envEnrTreeAddress - } - config.EnableDiscV5 = true - config.DiscV5BootstrapNodes = []string{enrTreeAddress} - config.DiscoveryLimit = 20 - config.WakuNodes = []string{enrTreeAddress} - - connStatusChan = make(chan struct{}) - terminator := sync.Once{} - onPeerStats = func(connStatus types.ConnStatus) { - if connStatus.IsOnline { - terminator.Do(func() { - connStatusChan <- struct{}{} - }) - } - - } - } - - waku, err := waku.New("", "", config, logger, nil, nil, nil, onPeerStats) - s.Require().NoError(err) - s.Require().NoError(waku.Start()) - - if !useLocalWaku { - select { - case <-time.After(30 * time.Second): - s.Require().Fail("timeout elapsed") - case <-connStatusChan: - // proceed, peers found - close(connStatusChan) - } - } - - return waku -} - -func createWakuNetwork(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []types.Waku { - nodes := make([]*waku.Waku, len(nodeNames)) - for i, name := range nodeNames { - logger := parentLogger.With(zap.String("name", name+"-waku")) - node := newWakuV2(s, logger, true) - nodes[i] = node - } - - // Setup local network graph - for i := 0; i < len(nodes); i++ { - for j := 0; j < len(nodes); j++ { - if i == j { - continue - } - - addrs := nodes[j].ListenAddresses() - s.Require().Greater(len(addrs), 0) - _, err := nodes[i].AddRelayPeer(addrs[0]) - s.Require().NoError(err) - err = nodes[i].DialPeer(addrs[0]) - s.Require().NoError(err) - } - } - wrappers := make([]types.Waku, len(nodes)) - for i, n := range nodes { - wrappers[i] = gethbridge.NewGethWakuV2Wrapper(n) - } - return wrappers -} - func newMessenger(s *suite.Suite, shh types.Waku, logger *zap.Logger, password string, walletAddresses []string, mockedBalances *map[uint64]map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big, collectiblesService communitytokens.ServiceInterface) *Messenger { accountsManagerMock := &AccountManagerMock{} diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 1eeb7c525..1d0f574c7 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -26,7 +26,6 @@ import ( const testChainID1 = 1 -const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" const ownerPassword = "123456" const alicePassword = "qwerty" const bobPassword = "bob123" @@ -130,7 +129,7 @@ type MessengerCommunitiesTokenPermissionsSuite struct { func (s *MessengerCommunitiesTokenPermissionsSuite) SetupTest() { s.logger = tt.MustCreateTestLogger() - wakuNodes := createWakuNetwork(&s.Suite, s.logger, []string{"owner", "bob", "alice"}) + wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob", "alice"}) ownerLogger := s.logger.With(zap.String("name", "owner")) s.ownerWaku = wakuNodes[0] diff --git a/protocol/messenger.go b/protocol/messenger.go index 11759e898..d17522970 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -62,6 +62,7 @@ import ( "github.com/status-im/status-go/services/communitytokens" ensservice "github.com/status-im/status-go/services/ens" "github.com/status-im/status-go/services/ext/mailservers" + localnotifications "github.com/status-im/status-go/services/local-notifications" mailserversDB "github.com/status-im/status-go/services/mailservers" "github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/services/wallet/token" @@ -135,16 +136,16 @@ type Messenger struct { mailserverCycle mailserverCycle database *sql.DB multiAccounts *multiaccounts.Database - mailservers *mailserversDB.Database settings *accounts.Database account *multiaccounts.Account mailserversDatabase *mailserversDB.Database browserDatabase *browsers.Database httpServer *server.MediaServer - quit chan struct{} - ctx context.Context - cancel context.CancelFunc + quit chan struct{} + ctx context.Context + cancel context.CancelFunc + shutdownWaitGroup sync.WaitGroup importingCommunities map[string]bool importingChannels map[string]bool @@ -160,13 +161,12 @@ type Messenger struct { requestedContactsLock sync.RWMutex requestedContacts map[string]*transport.Filter - connectionState connection.State - telemetryClient *telemetry.Client - contractMaker *contracts.ContractMaker - downloadHistoryArchiveTasksWaitGroup sync.WaitGroup - verificationDatabase *verification.Persistence - savedAddressesManager *wallet.SavedAddressesManager - walletAPI *wallet.API + connectionState connection.State + telemetryClient *telemetry.Client + contractMaker *contracts.ContractMaker + verificationDatabase *verification.Persistence + savedAddressesManager *wallet.SavedAddressesManager + walletAPI *wallet.API // TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed mutex sync.Mutex @@ -198,6 +198,7 @@ type peerStatus struct { } type mailserverCycle struct { sync.RWMutex + allMailservers []mailserversDB.Mailserver activeMailserver *mailserversDB.Mailserver peers map[string]peerStatus events chan *p2p.PeerEvent @@ -480,8 +481,6 @@ func NewMessenger( return nil, err } - mailservers := mailserversDB.NewDB(database) - savedAddressesManager := wallet.NewSavedAddressesManager(c.walletDb) selfContact, err := buildSelfContact(identity, settings, c.multiAccount, c.account) @@ -529,7 +528,6 @@ func NewMessenger( settings: settings, peerStore: peerStore, verificationDatabase: verification.NewPersistence(database), - mailservers: mailservers, mailserverCycle: mailserverCycle{ peers: make(map[string]peerStatus), availabilitySubscriptions: make([]chan struct{}, 0), @@ -838,7 +836,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } response.Mailservers = mailservers - err = m.StartMailserverCycle() + err = m.StartMailserverCycle(mailservers) if err != nil { return nil, err } @@ -1444,13 +1442,7 @@ func (m *Messenger) handleENSVerified(records []*ens.VerificationRecord) { } } - m.logger.Info("calling on contacts") - if m.config.onContactENSVerified != nil { - m.logger.Info("called on contacts") - response := &MessengerResponse{Contacts: contacts} - m.config.onContactENSVerified(response) - } - + m.PublishMessengerResponse(&MessengerResponse{Contacts: contacts}) } func (m *Messenger) handleENSVerificationSubscription(c chan []*ens.VerificationRecord) { @@ -1892,7 +1884,7 @@ func (m *Messenger) Init() error { func (m *Messenger) Shutdown() (err error) { close(m.quit) m.cancel() - m.downloadHistoryArchiveTasksWaitGroup.Wait() + m.shutdownWaitGroup.Wait() for i, task := range m.shutdownTasks { m.logger.Debug("running shutdown task", zap.Int("n", i)) if tErr := task(); tErr != nil { @@ -3279,6 +3271,44 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { return m.handleRetrievedMessages(chatWithMessages, true, false) } +func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) { + m.shutdownWaitGroup.Add(1) + go func() { + defer m.shutdownWaitGroup.Done() + ticker := time.NewTicker(tick) + defer ticker.Stop() + for { + select { + case <-ticker.C: + m.ProcessAllMessages() + case <-cancel: + return + } + } + }() +} + +func (m *Messenger) ProcessAllMessages() { + response, err := m.RetrieveAll() + if err != nil { + m.logger.Error("failed to retrieve raw messages", zap.Error(err)) + return + } + m.PublishMessengerResponse(response) +} + +func (m *Messenger) PublishMessengerResponse(response *MessengerResponse) { + if response.IsEmpty() { + return + } + + notifications := response.Notifications() + // Clear notifications as not used for now + response.ClearNotifications() + signal.SendNewMessages(response) + localnotifications.PushMessages(notifications) +} + func (m *Messenger) GetStats() types.StatsSummary { return m.transport.GetStats() } diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 3be317cad..5457c4808 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "errors" "fmt" + "math" "sync" "time" @@ -2589,6 +2590,11 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard } filter = filters[0] m.requestedCommunities[communityID] = filter + m.logger.Debug("created filter for community", + zap.String("filterID", filter.FilterID), + zap.String("communityID", communityID), + zap.String("PubsubTopic", filter.PubsubTopic), + ) } else { //we don't remember filter id associated with community because it was already installed m.requestedCommunities[communityID] = nil @@ -2596,7 +2602,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard defer m.forgetCommunityRequest(communityID) - to := uint32(m.transport.GetCurrentTime() / 1000) + to := uint32(math.Ceil(float64(m.GetCurrentTimeInMillis()) / 1000)) from := to - oneMonthInSeconds _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { @@ -2624,6 +2630,9 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard return nil, nil } + // TODO: We can force to process all messages then we don't have to wait? + //m.ProcessAllMessages() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 6e9919476..2391a4ee7 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -68,11 +68,6 @@ type MessengerSignalsHandler interface { } type config struct { - // This needs to be exposed until we move here mailserver logic - // as otherwise the client is not notified of a new filter and - // won't be pulling messages from mailservers until it reloads the chats/filters - onContactENSVerified func(*MessengerResponse) - // systemMessagesTranslations holds translations for system-messages systemMessagesTranslations *systemMessageTranslationsMap // Config for the envelopes monitor @@ -283,9 +278,8 @@ func WithSignalsHandler(h MessengerSignalsHandler) Option { } } -func WithENSVerificationConfig(onENSVerified func(*MessengerResponse), url, address string) Option { +func WithENSVerificationConfig(url, address string) Option { return func(c *config) error { - c.onContactENSVerified = onENSVerified c.verifyENSURL = url c.verifyENSContractAddress = address return nil diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 3ea27616b..9bd8f6c3f 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -1333,8 +1333,8 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage defer task.Waiter.Done() // this wait groups tracks all ongoing tasks across communities - m.downloadHistoryArchiveTasksWaitGroup.Add(1) - defer m.downloadHistoryArchiveTasksWaitGroup.Done() + m.shutdownWaitGroup.Add(1) + defer m.shutdownWaitGroup.Done() m.downloadAndImportHistoryArchives(communityID, magnetlink, task.CancelChan) }(currentTask, id) @@ -1646,8 +1646,8 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS task.Waiter.Add(1) defer task.Waiter.Done() - m.downloadHistoryArchiveTasksWaitGroup.Add(1) - defer m.downloadHistoryArchiveTasksWaitGroup.Done() + m.shutdownWaitGroup.Add(1) + defer m.shutdownWaitGroup.Done() m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.CancelChan) }(currentTask) diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index 4bdc42f09..02daa0907 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -3,6 +3,7 @@ package protocol import ( "context" "crypto/rand" + "fmt" "math" "math/big" "sort" @@ -59,20 +60,34 @@ func (m *Messenger) activeMailserverID() ([]byte, error) { return m.mailserverCycle.activeMailserver.IDBytes() } -func (m *Messenger) StartMailserverCycle() error { +func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error { + m.mailserverCycle.allMailservers = mailservers - if m.server == nil { - m.logger.Warn("not starting mailserver cycle") - return nil + version := m.transport.WakuVersion() + + switch version { + case 1: + if m.server == nil { + m.logger.Warn("not starting mailserver cycle") + return nil + } + + m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20) + m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events) + go m.updateWakuV1PeerStatus() + + case 2: + go m.updateWakuV2PeerStatus() + + default: + return fmt.Errorf("unsupported waku version: %d", version) } - m.logger.Debug("started mailserver cycle") + m.logger.Debug("starting mailserver cycle", + zap.Uint("WakuVersion", m.transport.WakuVersion()), + zap.Any("mailservers", mailservers), + ) - m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20) - m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events) - - go m.updateWakuV1PeerStatus() - go m.updateWakuV2PeerStatus() return nil } @@ -169,27 +184,41 @@ func (m *Messenger) getFleet() (string, error) { } func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) { - // Append user mailservers + // Get configured fleet fleet, err := m.getFleet() if err != nil { return nil, err } + // Get default mailservers for given fleet allMailservers := m.mailserversByFleet(fleet) - customMailservers, err := m.mailservers.Mailservers() - if err != nil { - return nil, err - } + // Add custom configured mailservers + if m.mailserversDatabase != nil { + customMailservers, err := m.mailserversDatabase.Mailservers() + if err != nil { + return nil, err + } - for _, c := range customMailservers { - if c.Fleet == fleet { - c.Version = m.transport.WakuVersion() - allMailservers = append(allMailservers, c) + for _, c := range customMailservers { + if c.Fleet == fleet { + c.Version = m.transport.WakuVersion() + allMailservers = append(allMailservers, c) + } } } - return allMailservers, nil + // Filter mailservers by configured waku version + wakuVersion := m.transport.WakuVersion() + matchingMailservers := make([]mailservers.Mailserver, 0, len(allMailservers)) + + for _, ms := range allMailservers { + if ms.Version == wakuVersion { + matchingMailservers = append(matchingMailservers, ms) + } + } + + return matchingMailservers, nil } type SortedMailserver struct { @@ -208,24 +237,7 @@ func (m *Messenger) findNewMailserver() error { return m.connectToMailserver(*pinnedMailserver) } - // Append user mailservers - fleet, err := m.getFleet() - if err != nil { - return err - } - - allMailservers := m.mailserversByFleet(fleet) - - customMailservers, err := m.mailservers.Mailservers() - if err != nil { - return err - } - - for _, c := range customMailservers { - if c.Fleet == fleet { - allMailservers = append(allMailservers, c) - } - } + allMailservers := m.mailserverCycle.allMailservers m.logger.Info("Finding a new mailserver...") @@ -340,6 +352,10 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { return err } + if ms.Version != m.transport.WakuVersion() { + return errors.New("mailserver waku version doesn't match") + } + if activeMailserverStatus != connected { // Attempt to connect to mailserver by adding it as a peer @@ -442,16 +458,12 @@ func (m *Messenger) penalizeMailserver(id string) { } func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) error { - m.logger.Debug("connected peers", zap.Any("connected", connectedPeers)) - m.logger.Debug("peers info", zap.Any("peer-info", m.mailserverCycle.peers)) + m.logger.Debug("mailserver cycle event", + zap.Any("connected", connectedPeers), + zap.Any("peer-info", m.mailserverCycle.peers)) m.mailPeersMutex.Lock() - allMailservers, err := m.allMailservers() - if err != nil { - return err - } - for pID, pInfo := range m.mailserverCycle.peers { if pInfo.status == disconnected { continue @@ -461,7 +473,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e found := false for _, connectedPeer := range connectedPeers { - id, err := m.mailserverAddressToID(connectedPeer.UniqueID, allMailservers) + id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers) if err != nil { m.logger.Error("failed to convert id to hex", zap.Error(err)) return err @@ -487,7 +499,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e // not available error if m.mailserverCycle.activeMailserver != nil { for _, connectedPeer := range connectedPeers { - id, err := m.mailserverAddressToID(connectedPeer.UniqueID, allMailservers) + id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers) if err != nil { m.logger.Error("failed to convert id to hex", zap.Error(err)) return err @@ -562,12 +574,6 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e } func (m *Messenger) updateWakuV1PeerStatus() { - - if m.transport.WakuVersion() != 1 { - m.logger.Debug("waku version not 1, returning") - return - } - ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -609,11 +615,6 @@ func (m *Messenger) updateWakuV1PeerStatus() { } func (m *Messenger) updateWakuV2PeerStatus() { - if m.transport.WakuVersion() != 2 { - m.logger.Debug("waku version not 2, returning") - return - } - connSubscription, err := m.transport.SubscribeToConnStatusChanges() if err != nil { m.logger.Error("Could not subscribe to connection status changes", zap.Error(err)) @@ -643,8 +644,6 @@ func (m *Messenger) updateWakuV2PeerStatus() { } case <-m.quit: - close(m.mailserverCycle.events) - m.mailserverCycle.subscription.Unsubscribe() connSubscription.Unsubscribe() return } @@ -667,11 +666,6 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) { return nil, nil } - customMailservers, err := m.mailservers.Mailservers() - if err != nil { - return nil, err - } - fleetMailservers := mailservers.DefaultMailservers() for _, c := range fleetMailservers { @@ -680,10 +674,17 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) { } } - for _, c := range customMailservers { - if c.Fleet == fleet && c.ID == pinnedMailserver { - c.Version = m.transport.WakuVersion() - return &c, nil + if m.mailserversDatabase != nil { + customMailservers, err := m.mailserversDatabase.Mailservers() + if err != nil { + return nil, err + } + + for _, c := range customMailservers { + if c.Fleet == fleet && c.ID == pinnedMailserver { + c.Version = m.transport.WakuVersion() + return &c, nil + } } } diff --git a/protocol/messenger_messages_tracking_test.go b/protocol/messenger_messages_tracking_test.go index 9edf0c74c..4f5ff801e 100644 --- a/protocol/messenger_messages_tracking_test.go +++ b/protocol/messenger_messages_tracking_test.go @@ -91,7 +91,7 @@ type MessengerMessagesTrackingSuite struct { func (s *MessengerMessagesTrackingSuite) SetupTest() { s.logger = tt.MustCreateTestLogger() - wakuNodes := createWakuNetwork(&s.Suite, s.logger, []string{"bob", "alice"}) + wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"bob", "alice"}) s.bobWaku = wakuNodes[0] s.bob, s.bobInterceptor = s.newMessenger(s.bobWaku, s.logger.With(zap.String("name", "bob"))) diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go new file mode 100644 index 000000000..78fccd325 --- /dev/null +++ b/protocol/messenger_storenode_request_test.go @@ -0,0 +1,140 @@ +package protocol + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/status-im/status-go/appdatabase" + gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/params" + "github.com/status-im/status-go/protocol/protobuf" + "github.com/status-im/status-go/protocol/requests" + "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/t/helpers" + + mailserversDB "github.com/status-im/status-go/services/mailservers" + waku2 "github.com/status-im/status-go/wakuv2" +) + +const ( + localFleet = "local-test-fleet-1" + localMailserverID = "local-test-mailserver" +) + +func TestMessengerStoreNodeRequestSuite(t *testing.T) { + suite.Run(t, new(MessengerStoreNodeRequestSuite)) +} + +type MessengerStoreNodeRequestSuite struct { + suite.Suite + + owner *Messenger + bob *Messenger + + wakuStoreNode *waku2.Waku + ownerWaku types.Waku + bobWaku types.Waku + + logger *zap.Logger +} + +func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger { + privateKey, err := crypto.GenerateKey() + s.Require().NoError(err) + + mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + + mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) + err = mailserversDatabase.Add(mailserversDB.Mailserver{ + ID: localMailserverID, + Name: localMailserverID, + Address: mailserverAddress, + Fleet: localFleet, + }) + s.Require().NoError(err) + + options := []Option{ + WithMailserversDatabase(mailserversDatabase), + WithClusterConfig(params.ClusterConfig{ + Fleet: localFleet, + }), + } + + messenger, err := newMessengerWithKey(shh, privateKey, logger, options) + + s.Require().NoError(err) + return messenger +} + +func (s *MessengerStoreNodeRequestSuite) SetupTest() { + s.logger = tt.MustCreateTestLogger() + + // Create store node + + storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku")) + s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true) + + storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses() + s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) + + storeNodeAddress := storeNodeListenAddresses[0] + s.logger.Info("store node ready", zap.String("address", storeNodeAddress)) + + // Create community owner + + ownerWakuLogger := s.logger.With(zap.String("name", "owner-waku")) + s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, ownerWakuLogger, true, false)) + + ownerLogger := s.logger.With(zap.String("name", "owner")) + s.owner = s.newMessenger(s.ownerWaku, ownerLogger, storeNodeAddress) + + // Create an independent user + + bobWakuLogger := s.logger.With(zap.String("name", "owner-waku")) + s.bobWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, bobWakuLogger, true, false)) + + bobLogger := s.logger.With(zap.String("name", "bob")) + s.bob = s.newMessenger(s.bobWaku, bobLogger, storeNodeAddress) + s.bob.StartRetrieveMessagesLoop(time.Second, nil) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { + + WaitForAvailableStoreNode(&s.Suite, s.owner, time.Second) + + createCommunityRequest := &requests.CreateCommunity{ + Name: "panda-lovers", + Description: "we love pandas", + Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, + Color: "#ff0000", + Tags: []string{"Web3"}, + } + + response, err := s.owner.CreateCommunity(createCommunityRequest, true) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Communities(), 1) + + community := response.Communities()[0] + communityID := community.IDString() + + WaitForAvailableStoreNode(&s.Suite, s.bob, time.Second) + + request := FetchCommunityRequest{ + CommunityKey: communityID, + Shard: nil, + TryDatabase: false, + WaitForResponse: true, + } + + fetchedCommunity, err := s.bob.FetchCommunity(&request) + s.Require().NoError(err) + s.Require().NotNil(fetchedCommunity) + s.Require().Equal(communityID, fetchedCommunity.IDString()) +} diff --git a/protocol/messenger_sync_raw_messages.go b/protocol/messenger_sync_raw_messages.go index 3baf323c9..73db99398 100644 --- a/protocol/messenger_sync_raw_messages.go +++ b/protocol/messenger_sync_raw_messages.go @@ -10,8 +10,6 @@ import ( "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/protobuf" - localnotifications "github.com/status-im/status-go/services/local-notifications" - "github.com/status-im/status-go/signal" ) type RawMessageHandler func(ctx context.Context, rawMessage common.RawMessage) (common.RawMessage, error) @@ -313,17 +311,6 @@ func (m *Messenger) HandleSyncRawMessages(rawMessages []*protobuf.RawMessage) er if err != nil { return err } - publishMessengerResponse(response) + m.PublishMessengerResponse(response) return nil } - -// this is a copy implementation of the one in ext/service.go, we should refactor this? -func publishMessengerResponse(response *MessengerResponse) { - if !response.IsEmpty() { - notifications := response.Notifications() - // Clear notifications as not used for now - response.ClearNotifications() - signal.SendNewMessages(response) - localnotifications.PushMessages(notifications) - } -} diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 1d72e5bdd..9769125e5 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -2,17 +2,30 @@ package protocol import ( "context" + "database/sql" "errors" + "os" "sync" "time" + "go.uber.org/zap" + "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + + "github.com/status-im/status-go/appdatabase" + gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/t/helpers" + waku2 "github.com/status-im/status-go/wakuv2" ) +const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" + // WaitOnMessengerResponse Wait until the condition is true or the timeout is reached. func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) { response := &MessengerResponse{} @@ -185,3 +198,128 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim s.Require().True(ok) } + +func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Duration) { + finish := make(chan struct{}) + cancel := make(chan struct{}) + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer func() { + wg.Done() + }() + for !m.isActiveMailserverAvailable() { + select { + case <-m.SubscribeMailserverAvailable(): + case <-cancel: + return + } + } + }() + + go func() { + defer func() { + close(finish) + }() + wg.Wait() + }() + + select { + case <-finish: + case <-time.After(timeout): + close(cancel) + } + + s.Require().True(m.isActiveMailserverAvailable()) +} + +func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku { + wakuConfig := &waku2.Config{ + DefaultShardPubsubTopic: relay.DefaultWakuTopic, // shard.DefaultShardPubsubTopic(), + } + + var onPeerStats func(connStatus types.ConnStatus) + var connStatusChan chan struct{} + var db *sql.DB + + if !useLocalWaku { + enrTreeAddress := testENRBootstrap + envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") + if envEnrTreeAddress != "" { + enrTreeAddress = envEnrTreeAddress + } + + wakuConfig.EnableDiscV5 = true + wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress} + wakuConfig.DiscoveryLimit = 20 + wakuConfig.WakuNodes = []string{enrTreeAddress} + + connStatusChan = make(chan struct{}) + terminator := sync.Once{} + onPeerStats = func(connStatus types.ConnStatus) { + if connStatus.IsOnline { + terminator.Do(func() { + connStatusChan <- struct{}{} + }) + } + } + } + + if enableStore { + var err error + db, err = helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + + wakuConfig.EnableStore = true + wakuConfig.StoreCapacity = 200 + wakuConfig.StoreSeconds = 200 + } + + wakuNode, err := waku2.New("", "", wakuConfig, logger, db, nil, nil, onPeerStats) + s.Require().NoError(err) + s.Require().NoError(wakuNode.Start()) + + if !useLocalWaku { + select { + case <-time.After(30 * time.Second): + s.Require().Fail("timeout elapsed") + case <-connStatusChan: + // proceed, peers found + close(connStatusChan) + } + } + + return wakuNode +} + +func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []types.Waku { + nodes := make([]*waku2.Waku, len(nodeNames)) + for i, name := range nodeNames { + logger := parentLogger.With(zap.String("name", name+"-waku")) + wakuNode := NewWakuV2(s, logger, true, false) + nodes[i] = wakuNode + } + + // Setup local network graph + for i := 0; i < len(nodes); i++ { + for j := 0; j < len(nodes); j++ { + if i == j { + continue + } + + addrs := nodes[j].ListenAddresses() + s.Require().Greater(len(addrs), 0) + _, err := nodes[i].AddRelayPeer(addrs[0]) + s.Require().NoError(err) + err = nodes[i].DialPeer(addrs[0]) + s.Require().NoError(err) + } + } + wrappers := make([]types.Waku, len(nodes)) + for i, n := range nodes { + wrappers[i] = gethbridge.NewGethWakuV2Wrapper(n) + } + return wrappers +} diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 94d21c84d..8016ca5f9 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -562,7 +562,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, f.filters[chatID] = chat - f.logger.Debug("registering filter for", zap.String("chatID", chatID), zap.String("type", "public"), zap.String("topic", filterAndTopic.Topic.String())) + f.logger.Debug("registering filter for", + zap.String("chatID", chatID), + zap.String("type", "public"), + zap.String("ContentTopic", filterAndTopic.Topic.String()), + zap.String("PubsubTopic", pubsubTopic), + ) return chat, nil } diff --git a/services/ext/service.go b/services/ext/service.go index f98fcf2a0..7feb6da89 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -51,7 +51,6 @@ import ( "github.com/status-im/status-go/services/browsers" "github.com/status-im/status-go/services/communitytokens" "github.com/status-im/status-go/services/ext/mailservers" - localnotifications "github.com/status-im/status-go/services/local-notifications" mailserversDB "github.com/status-im/status-go/services/mailservers" "github.com/status-im/status-go/services/wallet" w_common "github.com/status-im/status-go/services/wallet/common" @@ -194,7 +193,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) { if err != nil { return nil, err } - go s.retrieveMessagesLoop(time.Second, s.cancelMessenger) + s.messenger.StartRetrieveMessagesLoop(time.Second, s.cancelMessenger) go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger) if s.config.ShhextConfig.BandwidthStatsEnabled { @@ -204,39 +203,6 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) { return response, nil } -func publishMessengerResponse(response *protocol.MessengerResponse) { - if !response.IsEmpty() { - notifications := response.Notifications() - // Clear notifications as not used for now - response.ClearNotifications() - PublisherSignalHandler{}.NewMessages(response) - localnotifications.PushMessages(notifications) - } -} - -func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) { - ticker := time.NewTicker(tick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // We might be shutting down here - if s.messenger == nil { - return - } - response, err := s.messenger.RetrieveAll() - if err != nil { - log.Error("failed to retrieve raw messages", "err", err) - continue - } - publishMessengerResponse(response) - case <-cancel: - return - } - } -} - func (s *Service) retrieveStats(tick time.Duration, cancel <-chan struct{}) { ticker := time.NewTicker(tick) defer ticker.Stop() @@ -346,7 +312,7 @@ func (s *Service) verifyTransactionLoop(tick time.Duration, cancel <-chan struct log.Error("failed to validate transactions", "err", err) continue } - publishMessengerResponse(response) + s.messenger.PublishMessengerResponse(response) case <-cancel: cancelVerifyTransaction() @@ -436,7 +402,7 @@ func buildMessengerOptions( protocol.WithBrowserDatabase(browsers.NewDB(appDb)), protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig), protocol.WithSignalsHandler(messengerSignalsHandler), - protocol.WithENSVerificationConfig(publishMessengerResponse, config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress), + protocol.WithENSVerificationConfig(config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress), protocol.WithClusterConfig(config.ClusterConfig), protocol.WithTorrentConfig(&config.TorrentConfig), protocol.WithHTTPServer(httpServer), diff --git a/wakuv2/config.go b/wakuv2/config.go index 4f2fd3fcc..19df98f01 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -19,8 +19,9 @@ package wakuv2 import ( - "github.com/status-im/status-go/wakuv2/common" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + + "github.com/status-im/status-go/wakuv2/common" ) // Config represents the configuration state of a waku node. diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 9cdc3a759..418a93074 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1324,7 +1324,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag w.statusTelemetryClient.PushReceivedEnvelope(envelope) } - logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex()), zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp)) + logger := w.logger.With( + zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), + zap.String("contentTopic", envelope.Message().ContentTopic), + zap.Int64("timestamp", envelope.Message().Timestamp), + ) + logger.Debug("received new envelope") trouble := false @@ -1396,7 +1401,12 @@ func (w *Waku) processQueue() { case <-w.ctx.Done(): return case e := <-w.msgQueue: - logger := w.logger.With(zap.String("hash", e.Hash().String()), zap.String("contentTopic", e.ContentTopic.ContentTopic()), zap.Int64("timestamp", e.Envelope.Message().Timestamp)) + logger := w.logger.With( + zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())), + zap.String("pubsubTopic", e.PubsubTopic), + zap.String("contentTopic", e.ContentTopic.ContentTopic()), + zap.Int64("timestamp", e.Envelope.Message().Timestamp), + ) if e.MsgType == common.StoreMessageType { // We need to insert it first, and then remove it if not matched, // as messages are processed asynchronously