Notify users that envelope was discarded and retry sending it (#1446)

API for querying mail servers that skips already received data on subsequent requests
This commit is contained in:
Dmitry Shulyak 2019-04-30 09:46:12 +03:00 committed by GitHub
parent 28ec255d77
commit 218a35e609
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1701 additions and 9 deletions

100
db/db.go
View file

@ -6,8 +6,10 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
type storagePrefix byte
@ -20,8 +22,35 @@ const (
DeduplicatorCache
// MailserversCache is a list of mail servers provided by users.
MailserversCache
// TopicHistoryBucket isolated bucket for storing history metadata.
TopicHistoryBucket
// HistoryRequestBucket isolated bucket for storing list of pending requests.
HistoryRequestBucket
)
// NewMemoryDB returns leveldb with memory backend prefixed with a bucket.
func NewMemoryDB() (*leveldb.DB, error) {
return leveldb.Open(storage.NewMemStorage(), nil)
}
// NewDBNamespace returns instance that ensures isolated operations.
func NewDBNamespace(db *leveldb.DB, prefix storagePrefix) LevelDBNamespace {
return LevelDBNamespace{
db: db,
prefix: prefix,
}
}
// NewMemoryDBNamespace wraps in memory leveldb with provided bucket.
// Mostly used for tests. Including tests in other packages.
func NewMemoryDBNamespace(prefix storagePrefix) (pdb LevelDBNamespace, err error) {
db, err := NewMemoryDB()
if err != nil {
return pdb, err
}
return NewDBNamespace(db, prefix), nil
}
// Key creates a DB key for a specified service with specified data
func Key(prefix storagePrefix, data ...[]byte) []byte {
keyLength := 1
@ -59,3 +88,74 @@ func Open(path string, opts *opt.Options) (db *leveldb.DB, err error) {
}
return
}
// LevelDBNamespace database where all operations will be prefixed with a certain bucket.
type LevelDBNamespace struct {
db *leveldb.DB
prefix storagePrefix
}
func (db LevelDBNamespace) prefixedKey(key []byte) []byte {
endkey := make([]byte, len(key)+1)
endkey[0] = byte(db.prefix)
copy(endkey[1:], key)
return endkey
}
func (db LevelDBNamespace) Put(key, value []byte) error {
return db.db.Put(db.prefixedKey(key), value, nil)
}
func (db LevelDBNamespace) Get(key []byte) ([]byte, error) {
return db.db.Get(db.prefixedKey(key), nil)
}
// Range returns leveldb util.Range prefixed with a single byte.
// If prefix is nil range will iterate over all records in a given bucket.
func (db LevelDBNamespace) Range(prefix, limit []byte) *util.Range {
if limit == nil {
return util.BytesPrefix(db.prefixedKey(prefix))
}
return &util.Range{Start: db.prefixedKey(prefix), Limit: db.prefixedKey(limit)}
}
// Delete removes key from database.
func (db LevelDBNamespace) Delete(key []byte) error {
return db.db.Delete(db.prefixedKey(key), nil)
}
// NewIterator returns iterator for a given slice.
func (db LevelDBNamespace) NewIterator(slice *util.Range) NamespaceIterator {
return NamespaceIterator{db.db.NewIterator(slice, nil)}
}
// NamespaceIterator wraps leveldb iterator, works mostly the same way.
// The only difference is that first byte of the key is dropped.
type NamespaceIterator struct {
iter iterator.Iterator
}
// Key returns key of the current item.
func (iter NamespaceIterator) Key() []byte {
return iter.iter.Key()[1:]
}
// Value returns actual value of the current item.
func (iter NamespaceIterator) Value() []byte {
return iter.iter.Value()
}
// Error returns accumulated error.
func (iter NamespaceIterator) Error() error {
return iter.iter.Error()
}
// Prev moves cursor backward.
func (iter NamespaceIterator) Prev() bool {
return iter.iter.Prev()
}
// Next moves cursor forward.
func (iter NamespaceIterator) Next() bool {
return iter.iter.Next()
}

213
db/history.go Normal file
View file

@ -0,0 +1,213 @@
package db
import (
"encoding/binary"
"encoding/json"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
// ErrEmptyKey returned if key is not expected to be empty.
ErrEmptyKey = errors.New("TopicHistoryKey is empty")
)
// DB is a common interface for DB operations.
type DB interface {
Get([]byte) ([]byte, error)
Put([]byte, []byte) error
Delete([]byte) error
Range([]byte, []byte) *util.Range
NewIterator(*util.Range) NamespaceIterator
}
// TopicHistoryKey defines bytes that are used as unique key for TopicHistory.
// first 4 bytes are whisper.TopicType bytes
// next 8 bytes are time.Duration encoded in big endian notation.
type TopicHistoryKey [12]byte
// LoadTopicHistoryFromKey unmarshalls key into topic and duration and loads value of topic history
// from given database.
func LoadTopicHistoryFromKey(db DB, key TopicHistoryKey) (th TopicHistory, err error) {
if (key == TopicHistoryKey{}) {
return th, ErrEmptyKey
}
topic := whisper.TopicType{}
copy(topic[:], key[:4])
duration := binary.BigEndian.Uint64(key[4:])
th = TopicHistory{db: db, Topic: topic, Duration: time.Duration(duration)}
return th, th.Load()
}
// TopicHistory stores necessary information.
type TopicHistory struct {
db DB
// whisper topic
Topic whisper.TopicType
Duration time.Duration
// Timestamp that was used for the first request with this topic.
// Used to identify overlapping ranges.
First time.Time
// Timestamp of the last synced envelope.
Current time.Time
End time.Time
RequestID common.Hash
}
// Key returns unique identifier for this TopicHistory.
func (t TopicHistory) Key() TopicHistoryKey {
key := TopicHistoryKey{}
copy(key[:], t.Topic[:])
binary.BigEndian.PutUint64(key[4:], uint64(t.Duration))
return key
}
// Value marshalls TopicHistory into bytes.
func (t TopicHistory) Value() ([]byte, error) {
return json.Marshal(t)
}
// Load TopicHistory from db using key and unmarshalls it.
func (t *TopicHistory) Load() error {
key := t.Key()
if (key == TopicHistoryKey{}) {
return errors.New("key is empty")
}
value, err := t.db.Get(key[:])
if err != nil {
return err
}
return json.Unmarshal(value, t)
}
// Save persists TopicHistory on disk.
func (t TopicHistory) Save() error {
key := t.Key()
val, err := t.Value()
if err != nil {
return err
}
return t.db.Put(key[:], val)
}
// Delete removes topic history from database.
func (t TopicHistory) Delete() error {
key := t.Key()
return t.db.Delete(key[:])
}
// SameRange returns true if topic has same range, which means:
// true if Current is zero and Duration is the same
// and true if Current is the same
func (t TopicHistory) SameRange(other TopicHistory) bool {
zero := time.Time{}
if t.Current == zero && other.Current == zero {
return t.Duration == other.Duration
}
return t.Current == other.Current
}
// Pending returns true if this topic was requested from a mail server.
func (t TopicHistory) Pending() bool {
return t.RequestID != common.Hash{}
}
// HistoryRequest is kept in the database while request is in the progress.
// Stores necessary information to identify topics with associated ranges included in the request.
type HistoryRequest struct {
requestDB DB
topicDB DB
histories []TopicHistory
// Generated ID
ID common.Hash
// List of the topics
TopicHistoryKeys []TopicHistoryKey
}
// AddHistory adds instance to internal list of instance and add instance key to the list
// which will be persisted on disk.
func (req *HistoryRequest) AddHistory(history TopicHistory) {
req.histories = append(req.histories, history)
req.TopicHistoryKeys = append(req.TopicHistoryKeys, history.Key())
}
// Histories returns internal lsit of topic histories.
func (req *HistoryRequest) Histories() []TopicHistory {
// TODO Lazy load from database on first access
return req.histories
}
// Value returns content of HistoryRequest as bytes.
func (req HistoryRequest) Value() ([]byte, error) {
return json.Marshal(req)
}
// Save persists all attached histories and request itself on the disk.
func (req HistoryRequest) Save() error {
for i := range req.histories {
th := &req.histories[i]
th.RequestID = req.ID
if err := th.Save(); err != nil {
return err
}
}
val, err := req.Value()
if err != nil {
return err
}
return req.requestDB.Put(req.ID.Bytes(), val)
}
// Delete HistoryRequest from store and update every topic.
func (req HistoryRequest) Delete() error {
return req.requestDB.Delete(req.ID.Bytes())
}
// Load reads request and topic histories content from disk and unmarshalls them.
func (req *HistoryRequest) Load() error {
val, err := req.requestDB.Get(req.ID.Bytes())
if err != nil {
return err
}
return req.RawUnmarshall(val)
}
func (req *HistoryRequest) loadHistories() error {
for _, hk := range req.TopicHistoryKeys {
th, err := LoadTopicHistoryFromKey(req.topicDB, hk)
if err != nil {
return err
}
req.histories = append(req.histories, th)
}
return nil
}
// RawUnmarshall unmarshall given bytes into the structure.
// Used in range queries to unmarshall content of the iter.Value directly into request struct.
func (req *HistoryRequest) RawUnmarshall(val []byte) error {
err := json.Unmarshal(val, req)
if err != nil {
return err
}
return req.loadHistories()
}
// Includes checks if TopicHistory is included into the request.
func (req *HistoryRequest) Includes(history TopicHistory) bool {
key := history.Key()
for i := range req.TopicHistoryKeys {
if key == req.TopicHistoryKeys[i] {
return true
}
}
return false
}

91
db/history_store.go Normal file
View file

@ -0,0 +1,91 @@
package db
import (
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
)
// NewHistoryStore returns HistoryStore instance.
func NewHistoryStore(db *leveldb.DB) HistoryStore {
return HistoryStore{
topicDB: NewDBNamespace(db, TopicHistoryBucket),
requestDB: NewDBNamespace(db, HistoryRequestBucket),
}
}
// HistoryStore provides utility methods for quering history and requests store.
type HistoryStore struct {
topicDB DB
requestDB DB
}
// GetHistory creates history instance and loads history from database.
// Returns instance populated with topic and duration if history is not found in database.
func (h HistoryStore) GetHistory(topic whisper.TopicType, duration time.Duration) (TopicHistory, error) {
thist := h.NewHistory(topic, duration)
err := thist.Load()
if err != nil && err != errors.ErrNotFound {
return TopicHistory{}, err
}
return thist, nil
}
// NewRequest returns instance of the HistoryRequest.
func (h HistoryStore) NewRequest() HistoryRequest {
return HistoryRequest{requestDB: h.requestDB, topicDB: h.topicDB}
}
// NewHistory creates TopicHistory object with required values.
func (h HistoryStore) NewHistory(topic whisper.TopicType, duration time.Duration) TopicHistory {
return TopicHistory{db: h.topicDB, Duration: duration, Topic: topic}
}
// GetRequest loads HistoryRequest from database.
func (h HistoryStore) GetRequest(id common.Hash) (HistoryRequest, error) {
req := HistoryRequest{requestDB: h.requestDB, topicDB: h.topicDB, ID: id}
err := req.Load()
if err != nil {
return HistoryRequest{}, err
}
return req, nil
}
// GetAllRequests loads all not-finished history requests from database.
func (h HistoryStore) GetAllRequests() ([]HistoryRequest, error) {
rst := []HistoryRequest{}
iter := h.requestDB.NewIterator(h.requestDB.Range(nil, nil))
for iter.Next() {
req := HistoryRequest{
requestDB: h.requestDB,
topicDB: h.topicDB,
}
err := req.RawUnmarshall(iter.Value())
if err != nil {
return nil, err
}
rst = append(rst, req)
}
return rst, nil
}
// GetHistoriesByTopic returns all histories with a given topic.
// This is needed when we will have multiple range per single topic.
// TODO explain
func (h HistoryStore) GetHistoriesByTopic(topic whisper.TopicType) ([]TopicHistory, error) {
rst := []TopicHistory{}
iter := h.topicDB.NewIterator(h.topicDB.Range(topic[:], nil))
for iter.Next() {
key := TopicHistoryKey{}
copy(key[:], iter.Key())
th, err := LoadTopicHistoryFromKey(h.topicDB, key)
if err != nil {
return nil, err
}
rst = append(rst, th)
}
return rst, nil
}

82
db/history_store_test.go Normal file
View file

@ -0,0 +1,82 @@
package db
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
)
func createInMemStore(t *testing.T) HistoryStore {
db, err := NewMemoryDB()
require.NoError(t, err)
return NewHistoryStore(db)
}
func TestGetNewHistory(t *testing.T) {
topic := whisper.TopicType{1}
duration := time.Hour
store := createInMemStore(t)
th, err := store.GetHistory(topic, duration)
require.NoError(t, err)
require.Equal(t, duration, th.Duration)
require.Equal(t, topic, th.Topic)
}
func TestGetExistingHistory(t *testing.T) {
topic := whisper.TopicType{1}
duration := time.Hour
store := createInMemStore(t)
th, err := store.GetHistory(topic, duration)
require.NoError(t, err)
now := time.Now()
th.Current = now
require.NoError(t, th.Save())
th, err = store.GetHistory(topic, duration)
require.NoError(t, err)
require.Equal(t, now.Unix(), th.Current.Unix())
}
func TestNewHistoryRequest(t *testing.T) {
store := createInMemStore(t)
id := common.Hash{1}
req, err := store.GetRequest(id)
require.Error(t, err)
req = store.NewRequest()
req.ID = id
th, err := store.GetHistory(whisper.TopicType{1}, time.Hour)
require.NoError(t, err)
req.AddHistory(th)
require.NoError(t, req.Save())
req, err = store.GetRequest(id)
require.NoError(t, err)
require.Len(t, req.Histories(), 1)
}
func TestGetAllRequests(t *testing.T) {
store := createInMemStore(t)
idOne := common.Hash{1}
idTwo := common.Hash{2}
req := store.NewRequest()
req.ID = idOne
require.NoError(t, req.Save())
all, err := store.GetAllRequests()
require.NoError(t, err)
require.Len(t, all, 1)
req = store.NewRequest()
req.ID = idTwo
require.NoError(t, req.Save())
all, err = store.GetAllRequests()
require.NoError(t, err)
require.Len(t, all, 2)
}

133
db/history_test.go Normal file
View file

@ -0,0 +1,133 @@
package db
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
)
func TestTopicHistoryStoreLoadFromKey(t *testing.T) {
db, err := NewMemoryDBNamespace(TopicHistoryBucket)
require.NoError(t, err)
th := TopicHistory{
db: db,
Topic: whisper.TopicType{1, 1, 1},
Duration: 10 * time.Hour,
}
require.NoError(t, th.Save())
now := time.Now()
th.Current = now
require.NoError(t, th.Save())
th, err = LoadTopicHistoryFromKey(db, th.Key())
require.NoError(t, err)
require.Equal(t, now.Unix(), th.Current.Unix())
}
func TestTopicHistorySameRange(t *testing.T) {
now := time.Now()
testCases := []struct {
description string
result bool
histories [2]TopicHistory
}{
{
description: "SameDurationCurrentNotSet",
result: true,
histories: [2]TopicHistory{
{Duration: time.Minute}, {Duration: time.Minute},
},
},
{
description: "DifferentDurationCurrentNotset",
result: false,
histories: [2]TopicHistory{
{Duration: time.Minute}, {Duration: time.Hour},
},
},
{
description: "SameCurrent",
result: true,
histories: [2]TopicHistory{
{Current: now}, {Current: now},
},
},
{
description: "DifferentCurrent",
result: false,
histories: [2]TopicHistory{
{Current: now}, {Current: now.Add(time.Hour)},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
require.Equal(t, tc.result, tc.histories[0].SameRange(tc.histories[1]))
})
}
}
func TestAddHistory(t *testing.T) {
topic := whisper.TopicType{1, 1, 1}
now := time.Now()
topicdb, err := NewMemoryDBNamespace(TopicHistoryBucket)
require.NoError(t, err)
requestdb, err := NewMemoryDBNamespace(HistoryRequestBucket)
require.NoError(t, err)
th := TopicHistory{db: topicdb, Topic: topic, Current: now}
id := common.Hash{1}
req := HistoryRequest{requestDB: requestdb, topicDB: topicdb, ID: id}
req.AddHistory(th)
require.NoError(t, req.Save())
req = HistoryRequest{requestDB: requestdb, topicDB: topicdb, ID: id}
require.NoError(t, req.Load())
require.Len(t, req.Histories(), 1)
require.Equal(t, th.Topic, req.Histories()[0].Topic)
}
func TestRequestIncludesMethod(t *testing.T) {
topicOne := whisper.TopicType{1}
topicTwo := whisper.TopicType{2}
testCases := []struct {
description string
result bool
topics []TopicHistory
input TopicHistory
}{
{
description: "EmptyTopic",
result: false,
input: TopicHistory{Topic: topicOne},
},
{
description: "MatchesTopic",
result: true,
topics: []TopicHistory{{Topic: topicOne}},
input: TopicHistory{Topic: topicOne},
},
{
description: "NotMatchesTopic",
result: false,
topics: []TopicHistory{{Topic: topicOne}},
input: TopicHistory{Topic: topicTwo},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
req := HistoryRequest{}
for _, t := range tc.topics {
req.AddHistory(t)
}
require.Equal(t, tc.result, req.Includes(tc.input))
})
}
}

View file

@ -367,6 +367,10 @@ type ShhextConfig struct {
// MaxMessageDeliveryAttempts defines how many times we will try to deliver not-acknowledged envelopes.
MaxMessageDeliveryAttempts int
// WhisperCacheDir is a folder where whisper filters may persist messages before delivering them
// to a client.
WhisperCacheDir string
}
// Validate validates the ShhextConfig struct and returns an error if inconsistent values are found

View file

@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/services/shhext/chat"
"github.com/status-im/status-go/services/shhext/dedup"
@ -154,6 +155,15 @@ type SyncMessagesResponse struct {
Error string `json:"error"`
}
// InitiateHistoryRequestParams type for initiating history requests from a peer.
type InitiateHistoryRequestParams struct {
Peer string
SymKeyID string
Requests []TopicRequest
Force bool
Timeout time.Duration
}
// -----
// PUBLIC API
// -----
@ -418,6 +428,14 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate
// ConfirmMessagesProcessed is a method to confirm that messages was consumed by
// the client side.
func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) error {
for _, msg := range messages {
if msg.P2P {
err := api.service.historyUpdates.UpdateTopicHistory(msg.Topic, time.Unix(int64(msg.Timestamp), 0))
if err != nil {
return err
}
}
}
return api.service.deduplicator.AddMessages(messages)
}
@ -493,6 +511,103 @@ func (api *PublicAPI) SendDirectMessage(ctx context.Context, msg chat.SendDirect
return api.Post(ctx, whisperMessage)
}
func (api *PublicAPI) requestMessagesUsingPayload(request db.HistoryRequest, peer, symkeyID string, payload []byte, force bool, timeout time.Duration, topics []whisper.TopicType) (hash common.Hash, err error) {
shh := api.service.w
now := api.service.w.GetCurrentTime()
mailServerNode, err := api.getPeer(peer)
if err != nil {
return hash, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err)
}
var (
symKey []byte
publicKey *ecdsa.PublicKey
)
if symkeyID != "" {
symKey, err = shh.GetSymKey(symkeyID)
if err != nil {
return hash, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err)
}
} else {
publicKey = mailServerNode.Pubkey()
}
envelope, err := makeEnvelop(
payload,
symKey,
publicKey,
api.service.nodeID,
shh.MinPow(),
now,
)
if err != nil {
return hash, err
}
hash = envelope.Hash()
request.ID = hash
err = request.Save()
if err != nil {
return hash, err
}
if !force {
err = api.service.requestsRegistry.Register(hash, topics)
if err != nil {
return hash, err
}
}
if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, timeout); err != nil {
err = request.Delete()
if err != nil {
return hash, err
}
if !force {
api.service.requestsRegistry.Unregister(hash)
}
return hash, err
}
return hash, nil
}
// InitiateHistoryRequests is a stateful API for initiating history request for each topic.
// Caller of this method needs to define only two parameters per each TopicRequest:
// - Topic
// - Duration in nanoseconds. Will be used to determine starting time for history request.
// After that status-go will guarantee that request for this topic and date will be performed.
func (api *PublicAPI) InitiateHistoryRequests(request InitiateHistoryRequestParams) ([]hexutil.Bytes, error) {
rst := []hexutil.Bytes{}
requests, err := api.service.historyUpdates.CreateRequests(request.Requests)
if err != nil {
return nil, err
}
for i := range requests {
req := requests[i]
options := CreateTopicOptionsFromRequest(req)
bloom := options.ToBloomFilterOption()
payload, err := bloom.ToMessagesRequestPayload()
if err != nil {
return rst, err
}
hash, err := api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics())
if err != nil {
return rst, err
}
rst = append(rst, hash[:])
}
return rst, nil
}
// CompleteRequest client must mark request completed when all envelopes were processed.
func (api *PublicAPI) CompleteRequest(ctx context.Context, hex string) error {
return api.service.historyUpdates.UpdateFinishedRequest(common.HexToHash(hex))
}
// DEPRECATED: use SendDirectMessage with DH flag
// SendPairingMessage sends a 1:1 chat message to our own devices to initiate a pairing session
func (api *PublicAPI) SendPairingMessage(ctx context.Context, msg chat.SendDirectMessageRPC) ([]hexutil.Bytes, error) {

View file

@ -196,7 +196,7 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent)
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
envelopeErrors, ok := event.Data.([]whisper.EnvelopeError)
if event.Data != nil && !ok {
log.Warn("received unexpected data for the confirmation event", "batch", event.Batch)
log.Error("received unexpected data in the the confirmation event", "batch", event.Batch)
}
failedEnvelopes := map[common.Hash]struct{}{}
for i := range envelopeErrors {

360
services/shhext/history.go Normal file
View file

@ -0,0 +1,360 @@
package shhext
import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver"
whisper "github.com/status-im/whisper/whisperv6"
)
const (
// WhisperTimeAllowance is needed to ensure that we won't miss envelopes that were
// delivered to mail server after we made a request.
WhisperTimeAllowance = 20 * time.Second
)
// TimeSource is a function that returns current time.
type TimeSource func() time.Time
// NewHistoryUpdateReactor creates HistoryUpdateReactor instance.
func NewHistoryUpdateReactor(store db.HistoryStore, registry *RequestsRegistry, timeSource TimeSource) *HistoryUpdateReactor {
return &HistoryUpdateReactor{
store: store,
registry: registry,
timeSource: timeSource,
}
}
// HistoryUpdateReactor responsible for tracking progress for all history requests.
// It listens for 2 events:
// - when envelope from mail server is received we will update appropriate topic on disk
// - when confirmation for request completion is received - we will set last envelope timestamp as the last timestamp
// for all TopicLists in current request.
type HistoryUpdateReactor struct {
mu sync.Mutex
store db.HistoryStore
registry *RequestsRegistry
timeSource TimeSource
}
// UpdateFinishedRequest removes successfully finished request and updates every topic
// attached to the request.
func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(id common.Hash) error {
reactor.mu.Lock()
defer reactor.mu.Unlock()
req, err := reactor.store.GetRequest(id)
if err != nil {
return err
}
for i := range req.Histories() {
th := &req.Histories()[i]
th.RequestID = common.Hash{}
th.Current = th.End
th.End = time.Time{}
if err := th.Save(); err != nil {
return err
}
}
return req.Delete()
}
// UpdateTopicHistory updates Current timestamp for the TopicHistory with a given timestamp.
func (reactor *HistoryUpdateReactor) UpdateTopicHistory(topic whisper.TopicType, timestamp time.Time) error {
reactor.mu.Lock()
defer reactor.mu.Unlock()
histories, err := reactor.store.GetHistoriesByTopic(topic)
if err != nil {
return err
}
if len(histories) == 0 {
return fmt.Errorf("no histories for topic 0x%x", topic)
}
for i := range histories {
th := &histories[i]
// this case could happen only iff envelopes were delivered out of order
// last envelope received, request completed, then others envelopes received
// request completed, last envelope received, and then all others envelopes received
if !th.Pending() {
continue
}
if timestamp.Before(th.End) && timestamp.After(th.Current) {
th.Current = timestamp
}
err := th.Save()
if err != nil {
return err
}
}
return nil
}
// TopicRequest defines what user has to provide.
type TopicRequest struct {
Topic whisper.TopicType
Duration time.Duration
}
// CreateRequests receives list of topic with desired timestamps and initiates both pending requests and requests
// that cover new topics.
func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest) ([]db.HistoryRequest, error) {
reactor.mu.Lock()
defer reactor.mu.Unlock()
seen := map[whisper.TopicType]struct{}{}
for i := range topicRequests {
if _, exist := seen[topicRequests[i].Topic]; exist {
return nil, errors.New("only one duration per topic is allowed")
}
seen[topicRequests[i].Topic] = struct{}{}
}
histories := map[whisper.TopicType]db.TopicHistory{}
for i := range topicRequests {
th, err := reactor.store.GetHistory(topicRequests[i].Topic, topicRequests[i].Duration)
if err != nil {
return nil, err
}
histories[th.Topic] = th
}
requests, err := reactor.store.GetAllRequests()
if err != nil {
return nil, err
}
filtered := []db.HistoryRequest{}
for i := range requests {
req := requests[i]
for _, th := range histories {
if th.Pending() {
delete(histories, th.Topic)
}
}
if !reactor.registry.Has(req.ID) {
filtered = append(filtered, req)
}
}
adjusted, err := adjustRequestedHistories(reactor.store, mapToList(histories))
if err != nil {
return nil, err
}
filtered = append(filtered,
GroupHistoriesByRequestTimespan(reactor.store, adjusted)...)
return RenewRequests(filtered, reactor.timeSource()), nil
}
// for every history that is not included in any request check if there are other ranges with such topic in db
// if so check if they can be merged
// if not then adjust second part so that End of it will be equal to First of previous
func adjustRequestedHistories(store db.HistoryStore, histories []db.TopicHistory) ([]db.TopicHistory, error) {
adjusted := []db.TopicHistory{}
for i := range histories {
all, err := store.GetHistoriesByTopic(histories[i].Topic)
if err != nil {
return nil, err
}
th, err := adjustRequestedHistory(&histories[i], all...)
if err != nil {
return nil, err
}
if th != nil {
adjusted = append(adjusted, *th)
}
}
return adjusted, nil
}
func adjustRequestedHistory(th *db.TopicHistory, others ...db.TopicHistory) (*db.TopicHistory, error) {
sort.Slice(others, func(i, j int) bool {
return others[i].Duration > others[j].Duration
})
if len(others) == 1 && others[0].Duration == th.Duration {
return th, nil
}
for j := range others {
if others[j].Duration == th.Duration {
// skip instance with same duration
continue
} else if th.Duration > others[j].Duration {
if th.Current.Equal(others[j].First) {
// this condition will be reached when query for new index successfully finished
th.Current = others[j].Current
// FIXME next two db operations must be completed atomically
err := th.Save()
if err != nil {
return nil, err
}
err = others[j].Delete()
if err != nil {
return nil, err
}
} else if (others[j].First != time.Time{}) {
// select First timestamp with lowest value. if there are multiple indexes that cover such ranges:
// 6:00 - 7:00 Duration: 3h
// 7:00 - 8:00 2h
// 8:00 - 9:00 1h
// and client created new index with Duration 4h
// 4h index must have End value set to 6:00
if (others[j].First.Before(th.End) || th.End == time.Time{}) {
th.End = others[j].First
}
} else {
// remove previous if it is covered by new one
// client created multiple indexes without any succsefully executed query
err := others[j].Delete()
if err != nil {
return nil, err
}
}
} else if th.Duration < others[j].Duration {
if !others[j].Pending() {
th = &others[j]
} else {
return nil, nil
}
}
}
return th, nil
}
// RenewRequests re-sets current, first and end timestamps.
// Changes should not be persisted on disk in this method.
func RenewRequests(requests []db.HistoryRequest, now time.Time) []db.HistoryRequest {
zero := time.Time{}
for i := range requests {
req := requests[i]
histories := req.Histories()
for j := range histories {
history := &histories[j]
if history.Current == zero {
history.Current = now.Add(-(history.Duration))
}
if history.First == zero {
history.First = history.Current
}
if history.End == zero {
history.End = now
}
}
}
return requests
}
// CreateTopicOptionsFromRequest transforms histories attached to a single request to a simpler format - TopicOptions.
func CreateTopicOptionsFromRequest(req db.HistoryRequest) TopicOptions {
histories := req.Histories()
rst := make(TopicOptions, len(histories))
for i := range histories {
history := histories[i]
rst[i] = TopicOption{
Topic: history.Topic,
Range: Range{
Start: uint64(history.Current.Add(-(WhisperTimeAllowance)).Unix()),
End: uint64(history.End.Unix()),
},
}
}
return rst
}
func mapToList(topics map[whisper.TopicType]db.TopicHistory) []db.TopicHistory {
rst := make([]db.TopicHistory, 0, len(topics))
for key := range topics {
rst = append(rst, topics[key])
}
return rst
}
// GroupHistoriesByRequestTimespan creates requests from provided histories.
// Multiple histories will be included into the same request only if they share timespan.
func GroupHistoriesByRequestTimespan(store db.HistoryStore, histories []db.TopicHistory) []db.HistoryRequest {
requests := []db.HistoryRequest{}
for _, th := range histories {
var added bool
for i := range requests {
req := &requests[i]
histories := req.Histories()
if histories[0].SameRange(th) {
req.AddHistory(th)
added = true
}
}
if !added {
req := store.NewRequest()
req.AddHistory(th)
requests = append(requests, req)
}
}
return requests
}
// Range of the request.
type Range struct {
Start uint64
End uint64
}
// TopicOption request for a single topic.
type TopicOption struct {
Topic whisper.TopicType
Range Range
}
// TopicOptions is a list of topic-based requsts.
type TopicOptions []TopicOption
// ToBloomFilterOption creates bloom filter request from a list of topics.
func (options TopicOptions) ToBloomFilterOption() BloomFilterOption {
topics := make([]whisper.TopicType, len(options))
var start, end uint64
for i := range options {
opt := options[i]
topics[i] = opt.Topic
if opt.Range.Start > start {
start = opt.Range.Start
}
if opt.Range.End > end {
end = opt.Range.End
}
}
return BloomFilterOption{
Range: Range{Start: start, End: end},
Filter: topicsToBloom(topics...),
}
}
// Topics returns list of whisper TopicType attached to each TopicOption.
func (options TopicOptions) Topics() []whisper.TopicType {
rst := make([]whisper.TopicType, len(options))
for i := range options {
rst[i] = options[i].Topic
}
return rst
}
// BloomFilterOption is a request based on bloom filter.
type BloomFilterOption struct {
Range Range
Filter []byte
}
// ToMessagesRequestPayload creates mailserver.MessagesRequestPayload and encodes it to bytes using rlp.
func (filter BloomFilterOption) ToMessagesRequestPayload() ([]byte, error) {
// TODO fix this conversion.
// we start from time.Duration which is int64, then convert to uint64 for rlp-serilizability
// why uint32 here? max uint32 is smaller than max int64
payload := mailserver.MessagesRequestPayload{
Lower: uint32(filter.Range.Start),
Upper: uint32(filter.Range.End),
Bloom: filter.Filter,
// Client must tell the MailServer if it supports batch responses.
// This can be removed in the future.
Batch: true,
Limit: 10000,
}
return rlp.EncodeToBytes(payload)
}

View file

@ -0,0 +1,350 @@
package shhext
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func createInMemStore(t *testing.T) db.HistoryStore {
mdb, err := db.NewMemoryDB()
require.NoError(t, err)
return db.NewHistoryStore(mdb)
}
func TestRenewRequest(t *testing.T) {
req := db.HistoryRequest{}
duration := time.Hour
req.AddHistory(db.TopicHistory{Duration: duration})
firstNow := time.Now()
RenewRequests([]db.HistoryRequest{req}, firstNow)
initial := firstNow.Add(-duration).Unix()
th := req.Histories()[0]
require.Equal(t, initial, th.Current.Unix())
require.Equal(t, initial, th.First.Unix())
require.Equal(t, firstNow.Unix(), th.End.Unix())
secondNow := time.Now()
RenewRequests([]db.HistoryRequest{req}, secondNow)
require.Equal(t, initial, th.Current.Unix())
require.Equal(t, initial, th.First.Unix())
require.Equal(t, secondNow.Unix(), th.End.Unix())
}
func TestCreateTopicOptionsFromRequest(t *testing.T) {
req := db.HistoryRequest{}
topic := whisper.TopicType{1}
now := time.Now()
req.AddHistory(db.TopicHistory{Topic: topic, Current: now, End: now})
options := CreateTopicOptionsFromRequest(req)
require.Len(t, options, len(req.Histories()),
"length must be equal to the number of topic histories attached to request")
require.Equal(t, topic, options[0].Topic)
require.Equal(t, uint64(now.Add(-WhisperTimeAllowance).Unix()), options[0].Range.Start,
"start of the range must be adjusted by the whisper time allowance")
require.Equal(t, uint64(now.Unix()), options[0].Range.End)
}
func TestTopicOptionsToBloom(t *testing.T) {
options := TopicOptions{
{Topic: whisper.TopicType{1}, Range: Range{Start: 1, End: 10}},
{Topic: whisper.TopicType{2}, Range: Range{Start: 3, End: 12}},
}
bloom := options.ToBloomFilterOption()
require.Equal(t, uint64(3), bloom.Range.Start, "Start must be the latest Start across all options")
require.Equal(t, uint64(12), bloom.Range.End, "End must be the latest End across all options")
require.Equal(t, topicsToBloom(options[0].Topic, options[1].Topic), bloom.Filter)
}
func TestBloomFilterToMessageRequestPayload(t *testing.T) {
var (
start uint32 = 10
end uint32 = 20
filter = []byte{1, 1, 1, 1}
message = mailserver.MessagesRequestPayload{
Lower: start,
Upper: end,
Bloom: filter,
Batch: true,
Limit: 10000,
}
bloomOption = BloomFilterOption{
Filter: filter,
Range: Range{
Start: uint64(start),
End: uint64(end),
},
}
)
expected, err := rlp.EncodeToBytes(message)
require.NoError(t, err)
payload, err := bloomOption.ToMessagesRequestPayload()
require.NoError(t, err)
require.Equal(t, expected, payload)
}
func TestCreateRequestsEmptyState(t *testing.T) {
now := time.Now()
reactor := NewHistoryUpdateReactor(
createInMemStore(t), NewRequestsRegistry(0),
func() time.Time { return now })
requests, err := reactor.CreateRequests([]TopicRequest{
{Topic: whisper.TopicType{1}, Duration: time.Hour},
{Topic: whisper.TopicType{2}, Duration: time.Hour},
{Topic: whisper.TopicType{3}, Duration: 10 * time.Hour},
})
require.NoError(t, err)
require.Len(t, requests, 2)
var (
oneTopic, twoTopic db.HistoryRequest
)
if len(requests[0].Histories()) == 1 {
oneTopic, twoTopic = requests[0], requests[1]
} else {
oneTopic, twoTopic = requests[1], requests[0]
}
require.Len(t, oneTopic.Histories(), 1)
require.Len(t, twoTopic.Histories(), 2)
}
func TestCreateRequestsWithExistingRequest(t *testing.T) {
store := createInMemStore(t)
req := store.NewRequest()
req.ID = common.Hash{1}
th := store.NewHistory(whisper.TopicType{1}, time.Hour)
req.AddHistory(th)
require.NoError(t, req.Save())
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
requests, err := reactor.CreateRequests([]TopicRequest{
{Topic: whisper.TopicType{1}, Duration: time.Hour},
{Topic: whisper.TopicType{2}, Duration: time.Hour},
{Topic: whisper.TopicType{3}, Duration: time.Hour},
})
require.NoError(t, err)
require.Len(t, requests, 2)
var (
oneTopic, twoTopic db.HistoryRequest
)
if len(requests[0].Histories()) == 1 {
oneTopic, twoTopic = requests[0], requests[1]
} else {
oneTopic, twoTopic = requests[1], requests[0]
}
assert.Len(t, oneTopic.Histories(), 1)
assert.Len(t, twoTopic.Histories(), 2)
}
func TestCreateMultiRequestsWithSameTopic(t *testing.T) {
now := time.Now()
reactor := NewHistoryUpdateReactor(
createInMemStore(t), NewRequestsRegistry(0),
func() time.Time { return now })
topic := whisper.TopicType{1}
requests, err := reactor.CreateRequests([]TopicRequest{
{Topic: topic, Duration: time.Hour},
})
require.NoError(t, err)
require.Len(t, requests, 1)
requests[0].ID = common.Hash{1}
require.NoError(t, requests[0].Save())
// duration changed. request wasn't finished
requests, err = reactor.CreateRequests([]TopicRequest{
{Topic: topic, Duration: 10 * time.Hour},
})
require.NoError(t, err)
require.Len(t, requests, 2)
longest := 0
for i := range requests {
r := &requests[i]
r.ID = common.Hash{byte(i)}
require.NoError(t, r.Save())
require.Len(t, r.Histories(), 1)
if r.Histories()[0].Duration == 10*time.Hour {
longest = i
}
}
require.Equal(t, requests[longest].Histories()[0].End, requests[longest^1].Histories()[0].First)
for _, r := range requests {
require.NoError(t, reactor.UpdateFinishedRequest(r.ID))
}
requests, err = reactor.CreateRequests([]TopicRequest{
{Topic: topic, Duration: 10 * time.Hour},
})
require.NoError(t, err)
require.Len(t, requests, 1)
topics, err := reactor.store.GetHistoriesByTopic(topic)
require.NoError(t, err)
require.Len(t, topics, 1)
require.Equal(t, 10*time.Hour, topics[0].Duration)
}
func TestRequestFinishedUpdate(t *testing.T) {
store := createInMemStore(t)
req := store.NewRequest()
req.ID = common.Hash{1}
now := time.Now()
thOne := store.NewHistory(whisper.TopicType{1}, time.Hour)
thOne.End = now
thTwo := store.NewHistory(whisper.TopicType{2}, time.Hour)
thTwo.End = now
req.AddHistory(thOne)
req.AddHistory(thTwo)
require.NoError(t, req.Save())
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
require.NoError(t, reactor.UpdateTopicHistory(thOne.Topic, now.Add(-time.Minute)))
require.NoError(t, reactor.UpdateFinishedRequest(req.ID))
_, err := store.GetRequest(req.ID)
require.EqualError(t, err, "leveldb: not found")
require.NoError(t, thOne.Load())
require.NoError(t, thTwo.Load())
require.Equal(t, now.Unix(), thOne.Current.Unix())
require.Equal(t, now.Unix(), thTwo.Current.Unix())
}
func TestTopicHistoryUpdate(t *testing.T) {
reqID := common.Hash{1}
store := createInMemStore(t)
request := store.NewRequest()
request.ID = reqID
now := time.Now()
require.NoError(t, request.Save())
th := store.NewHistory(whisper.TopicType{1}, time.Hour)
th.RequestID = request.ID
th.End = now
require.NoError(t, th.Save())
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
timestamp := now.Add(-time.Minute)
require.NoError(t, reactor.UpdateTopicHistory(th.Topic, timestamp))
require.NoError(t, th.Load())
require.Equal(t, timestamp.Unix(), th.Current.Unix())
require.NoError(t, reactor.UpdateTopicHistory(th.Topic, now))
require.NoError(t, th.Load())
require.Equal(t, timestamp.Unix(), th.Current.Unix())
}
func TestGroupHistoriesByRequestTimestamp(t *testing.T) {
requests := GroupHistoriesByRequestTimespan(createInMemStore(t), []db.TopicHistory{
{Topic: whisper.TopicType{1}, Duration: time.Hour},
{Topic: whisper.TopicType{2}, Duration: time.Hour},
{Topic: whisper.TopicType{3}, Duration: 2 * time.Hour},
{Topic: whisper.TopicType{4}, Duration: 2 * time.Hour},
{Topic: whisper.TopicType{5}, Duration: 3 * time.Hour},
{Topic: whisper.TopicType{6}, Duration: 3 * time.Hour},
})
require.Len(t, requests, 3)
for _, req := range requests {
require.Len(t, req.Histories(), 2)
}
}
// initial creation of the history index. no other histories in store
func TestAdjustHistoryWithNoOtherHistories(t *testing.T) {
store := createInMemStore(t)
th := store.NewHistory(whisper.TopicType{1}, time.Hour)
adjusted, err := adjustRequestedHistories(store, []db.TopicHistory{th})
require.NoError(t, err)
require.Len(t, adjusted, 1)
require.Equal(t, th.Topic, adjusted[0].Topic)
}
// Duration for the history index with same topic was gradually incresed:
// {Duration: 1h} {Duration: 2h} {Duration: 3h}
// But actual request wasn't sent
// So when we receive {Duration: 4h} we can merge all of them into single index
// that covers all of them e.g. {Duration: 4h}
func TestAdjustHistoryWithExistingLowerRanges(t *testing.T) {
store := createInMemStore(t)
topic := whisper.TopicType{1}
histories := make([]db.TopicHistory, 3)
i := 0
for i = range histories {
histories[i] = store.NewHistory(topic, time.Duration(i+1)*time.Hour)
require.NoError(t, histories[i].Save())
}
i++
th := store.NewHistory(topic, time.Duration(i+1)*time.Hour)
adjusted, err := adjustRequestedHistories(store, []db.TopicHistory{th})
require.NoError(t, err)
require.Len(t, adjusted, 1)
require.Equal(t, th.Duration, adjusted[0].Duration)
all, err := store.GetHistoriesByTopic(topic)
require.NoError(t, err)
require.Len(t, all, 1)
require.Equal(t, th.Duration, all[0].Duration)
}
// Precondition is based on the previous test. We have same information in the database
// but now every history index request was successfully completed. And End timstamp is set to the First of the next index.
// So, we have:
// {First: now-1h, End: now} {First: now-2h, End: now-1h} {First: now-3h: End: now-2h}
// When we want to create new request with {Duration: 4h}
// We see that there is no reason to keep all indexes and we can squash them.
func TestAdjustHistoriesWithExistingCoveredLowerRanges(t *testing.T) {
store := createInMemStore(t)
topic := whisper.TopicType{1}
histories := make([]db.TopicHistory, 3)
i := 0
now := time.Now()
for i = range histories {
duration := time.Duration(i+1) * time.Hour
prevduration := time.Duration(i) * time.Hour
histories[i] = store.NewHistory(topic, duration)
histories[i].First = now.Add(-duration)
histories[i].Current = now.Add(-prevduration)
require.NoError(t, histories[i].Save())
}
i++
th := store.NewHistory(topic, time.Duration(i+1)*time.Hour)
th.Current = now.Add(-time.Duration(i) * time.Hour)
adjusted, err := adjustRequestedHistories(store, []db.TopicHistory{th})
require.NoError(t, err)
require.Len(t, adjusted, 1)
require.Equal(t, th.Duration, adjusted[0].Duration)
}
func TestAdjustHistoryReplaceTopicWithHigherDuration(t *testing.T) {
store := createInMemStore(t)
topic := whisper.TopicType{1}
hour := store.NewHistory(topic, time.Hour)
require.NoError(t, hour.Save())
minute := store.NewHistory(topic, time.Minute)
adjusted, err := adjustRequestedHistories(store, []db.TopicHistory{minute})
require.NoError(t, err)
require.Len(t, adjusted, 1)
require.Equal(t, hour.Duration, adjusted[0].Duration)
}
// if client requested lower duration than the one we have in the index already it will
// it will be discarded and we will use existing index
func TestAdjustHistoryRemoveTopicIfPendingWithHigherDuration(t *testing.T) {
store := createInMemStore(t)
topic := whisper.TopicType{1}
hour := store.NewHistory(topic, time.Hour)
hour.RequestID = common.Hash{1}
require.NoError(t, hour.Save())
minute := store.NewHistory(topic, time.Minute)
adjusted, err := adjustRequestedHistories(store, []db.TopicHistory{minute})
require.NoError(t, err)
require.Len(t, adjusted, 0)
}

View file

@ -57,6 +57,14 @@ func (r *RequestsRegistry) Register(uid common.Hash, topics []whisper.TopicType)
return nil
}
// Has returns true if given uid is stored in registry.
func (r *RequestsRegistry) Has(uid common.Hash) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, exist := r.uidToTopics[uid]
return exist
}
// Unregister removes request with given UID from registry.
func (r *RequestsRegistry) Unregister(uid common.Hash) {
r.mu.Lock()

View file

@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext/chat"
"github.com/status-im/status-go/services/shhext/dedup"
@ -46,6 +47,7 @@ type Service struct {
envelopesMonitor *EnvelopesMonitor
mailMonitor *MailRequestMonitor
requestsRegistry *RequestsRegistry
historyUpdates *HistoryUpdateReactor
server *p2p.Server
nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator
@ -53,25 +55,25 @@ type Service struct {
dataDir string
installationID string
pfsEnabled bool
peerStore *mailservers.PeerStore
cache *mailservers.Cache
connManager *mailservers.ConnectionManager
lastUsedMonitor *mailservers.LastUsedConnectionMonitor
peerStore *mailservers.PeerStore
cache *mailservers.Cache
connManager *mailservers.ConnectionManager
lastUsedMonitor *mailservers.LastUsedConnectionMonitor
}
// Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil)
// New returns a new Service. dataDir is a folder path to a network-independent location
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config params.ShhextConfig) *Service {
cache := mailservers.NewCache(db)
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, config params.ShhextConfig) *Service {
cache := mailservers.NewCache(ldb)
ps := mailservers.NewPeerStore(cache)
delay := defaultRequestsDelay
if config.RequestsDelay != 0 {
delay = config.RequestsDelay
}
requestsRegistry := NewRequestsRegistry(delay)
historyUpdates := NewHistoryUpdateReactor(db.NewHistoryStore(ldb), requestsRegistry, w.GetCurrentTime)
mailMonitor := &MailRequestMonitor{
w: w,
handler: handler,
@ -85,7 +87,8 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
envelopesMonitor: envelopesMonitor,
mailMonitor: mailMonitor,
requestsRegistry: requestsRegistry,
deduplicator: dedup.NewDeduplicator(w, db),
historyUpdates: historyUpdates,
deduplicator: dedup.NewDeduplicator(w, ldb),
dataDir: config.BackupDisabledDataDir,
installationID: config.InstallationID,
pfsEnabled: config.PFSEnabled,

View file

@ -3,6 +3,7 @@ package shhext
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"math"
@ -13,10 +14,12 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
@ -681,3 +684,230 @@ func (s *WhisperRetriesSuite) TestDeliveredFromFirstAttempt() {
func (s *WhisperRetriesSuite) TestDeliveredFromSecondAttempt() {
s.testDelivery(2)
}
func TestRequestWithTrackingHistorySuite(t *testing.T) {
suite.Run(t, new(RequestWithTrackingHistorySuite))
}
type RequestWithTrackingHistorySuite struct {
suite.Suite
envelopeSymkey string
envelopeSymkeyID string
localWhisperAPI *whisper.PublicWhisperAPI
localAPI *PublicAPI
localService *Service
mailSymKey string
remoteMailserver *mailserver.WMailServer
remoteNode *enode.Node
remoteWhisper *whisper.Whisper
}
func (s *RequestWithTrackingHistorySuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
conf := &whisper.Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 100 << 10,
}
local := whisper.New(conf)
s.Require().NoError(local.Start(nil))
s.localWhisperAPI = whisper.NewPublicWhisperAPI(local)
s.localService = New(local, nil, db, params.ShhextConfig{})
localPkey, err := crypto.GenerateKey()
s.Require().NoError(err)
s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}}))
s.localAPI = NewPublicAPI(s.localService)
remote := whisper.New(conf)
s.remoteWhisper = remote
s.Require().NoError(remote.Start(nil))
s.remoteMailserver = &mailserver.WMailServer{}
remote.RegisterServer(s.remoteMailserver)
password := "test"
tmpdir, err := ioutil.TempDir("", "tracking-history-tests-")
s.Require().NoError(err)
s.Require().NoError(s.remoteMailserver.Init(remote, &params.WhisperConfig{
DataDir: tmpdir,
MailServerPassword: password,
}))
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
// we need proper enode for a remote node. it will be used when mail server request is made
s.remoteNode = enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
remotePeer := p2p.NewPeer(s.remoteNode.ID(), "1", []p2p.Cap{{"shh", 6}})
localPeer := p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"shh", 6}})
// FIXME close this in tear down
rw1, rw2 := p2p.MsgPipe()
go func() {
err := local.HandlePeer(remotePeer, rw1)
s.Require().NoError(err)
}()
go func() {
err := remote.HandlePeer(localPeer, rw2)
s.Require().NoError(err)
}()
s.mailSymKey, err = s.localWhisperAPI.GenerateSymKeyFromPassword(context.Background(), password)
s.Require().NoError(err)
s.envelopeSymkey = "topics"
s.envelopeSymkeyID, err = s.localWhisperAPI.GenerateSymKeyFromPassword(context.Background(), s.envelopeSymkey)
s.Require().NoError(err)
}
func (s *RequestWithTrackingHistorySuite) postEnvelopes(topics ...whisper.TopicType) []hexutil.Bytes {
var (
rst = make([]hexutil.Bytes, len(topics))
err error
)
for i, t := range topics {
rst[i], err = s.localWhisperAPI.Post(context.Background(), whisper.NewMessage{
SymKeyID: s.envelopeSymkeyID,
TTL: 10,
Topic: t,
})
s.Require().NoError(err)
}
return rst
}
func (s *RequestWithTrackingHistorySuite) waitForArchival(hexes []hexutil.Bytes) {
events := make(chan whisper.EnvelopeEvent, 2)
sub := s.remoteWhisper.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
s.Require().NoError(waitForArchival(events, 2*time.Second, hexes...))
}
func (s *RequestWithTrackingHistorySuite) createEmptyFilter(topics ...whisper.TopicType) string {
filterid, err := s.localWhisperAPI.NewMessageFilter(whisper.Criteria{
SymKeyID: s.envelopeSymkeyID,
Topics: topics,
AllowP2P: true,
})
s.Require().NoError(err)
s.Require().NotNil(filterid)
messages, err := s.localWhisperAPI.GetFilterMessages(filterid)
s.Require().NoError(err)
s.Require().Empty(messages)
return filterid
}
func (s *RequestWithTrackingHistorySuite) initiateHistoryRequest(topics ...TopicRequest) []hexutil.Bytes {
requests, err := s.localAPI.InitiateHistoryRequests(InitiateHistoryRequestParams{
Peer: s.remoteNode.String(),
SymKeyID: s.mailSymKey,
Timeout: 10 * time.Second,
Requests: topics,
})
s.Require().NoError(err)
return requests
}
func (s *RequestWithTrackingHistorySuite) waitMessagesDelivered(filterid string, hexes ...hexutil.Bytes) {
var received int
s.Require().NoError(utils.Eventually(func() error {
messages, err := s.localWhisperAPI.GetFilterMessages(filterid)
if err != nil {
return err
}
received += len(messages)
if received != len(hexes) {
return fmt.Errorf("expecting to receive %d messages, received %d", len(hexes), received)
}
return nil
}, 2*time.Second, 200*time.Millisecond))
}
func (s *RequestWithTrackingHistorySuite) waitNoRequests() {
store := s.localService.historyUpdates.store
s.Require().NoError(utils.Eventually(func() error {
reqs, err := store.GetAllRequests()
if err != nil {
return err
}
if len(reqs) != 0 {
return fmt.Errorf("not all requests were removed. count %d", len(reqs))
}
return nil
}, 2*time.Second, 200*time.Millisecond))
}
func (s *RequestWithTrackingHistorySuite) TestMultipleMergeIntoOne() {
topic1 := whisper.TopicType{1, 1, 1, 1}
topic2 := whisper.TopicType{2, 2, 2, 2}
topic3 := whisper.TopicType{3, 3, 3, 3}
hexes := s.postEnvelopes(topic1, topic2, topic3)
s.waitForArchival(hexes)
filterid := s.createEmptyFilter(topic1, topic2, topic3)
requests := s.initiateHistoryRequest(
TopicRequest{Topic: topic1, Duration: time.Hour},
TopicRequest{Topic: topic2, Duration: time.Hour},
TopicRequest{Topic: topic3, Duration: 10 * time.Hour},
)
// since we are using different duration for 3rd topic there will be 2 requests
s.Require().Len(requests, 2)
s.waitMessagesDelivered(filterid, hexes...)
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic1, time.Now()))
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic2, time.Now()))
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic3, time.Now()))
for _, r := range requests {
s.Require().NoError(s.localAPI.CompleteRequest(context.TODO(), r.String()))
}
s.waitNoRequests()
requests = s.initiateHistoryRequest(
TopicRequest{Topic: topic1, Duration: time.Hour},
TopicRequest{Topic: topic2, Duration: time.Hour},
TopicRequest{Topic: topic3, Duration: 10 * time.Hour},
)
s.Len(requests, 1)
}
func (s *RequestWithTrackingHistorySuite) TestSingleRequest() {
topic1 := whisper.TopicType{1, 1, 1, 1}
topic2 := whisper.TopicType{255, 255, 255, 255}
hexes := s.postEnvelopes(topic1, topic2)
s.waitForArchival(hexes)
filterid := s.createEmptyFilter(topic1, topic2)
requests := s.initiateHistoryRequest(
TopicRequest{Topic: topic1, Duration: time.Hour},
TopicRequest{Topic: topic2, Duration: time.Hour},
)
s.Require().Len(requests, 1)
s.waitMessagesDelivered(filterid, hexes...)
}
func waitForArchival(events chan whisper.EnvelopeEvent, duration time.Duration, hashes ...hexutil.Bytes) error {
waiting := map[common.Hash]struct{}{}
for _, hash := range hashes {
waiting[common.BytesToHash(hash)] = struct{}{}
}
timeout := time.After(duration)
for {
select {
case <-timeout:
return errors.New("timed out while waiting for mailserver to archive envelopes")
case ev := <-events:
if ev.Event != whisper.EventMailServerEnvelopeArchived {
continue
}
if _, exist := waiting[ev.Hash]; exist {
delete(waiting, ev.Hash)
if len(waiting) == 0 {
return nil
}
}
}
}
}

View file

@ -14,6 +14,9 @@ const (
// to any peer
EventEnvelopeExpired = "envelope.expired"
// EventEnvelopeDiscarded is triggerd when envelope was discarded by a peer for some reason.
EventEnvelopeDiscarded = "envelope.discarded"
// EventMailServerRequestCompleted is triggered when whisper receives a message ack from the mailserver
EventMailServerRequestCompleted = "mailserver.request.completed"