feat: trigger collectibles refresh on transfer

This commit is contained in:
Dario Gabriel Lipicar 2023-10-09 09:43:53 -03:00 committed by dlipicar
parent addf6e4aaf
commit 094228871e
8 changed files with 518 additions and 190 deletions

View file

@ -12,6 +12,31 @@ type Commander interface {
Command() Command
}
// SingleShotCommand runs once.
type SingleShotCommand struct {
Interval time.Duration
Init func(context.Context) error
Runable func(context.Context) error
}
func (c SingleShotCommand) Run(ctx context.Context) error {
timer := time.NewTimer(c.Interval)
if c.Init != nil {
err := c.Init(ctx)
if err != nil {
return err
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
_ = c.Runable(ctx)
}
}
}
// FiniteCommand terminates when error is nil.
type FiniteCommand struct {
Interval time.Duration

View file

@ -16,8 +16,18 @@ import (
)
const (
fetchLimit = 50 // Limit number of collectibles we fetch per provider call
accountOwnershipUpdateInterval = 30 * time.Minute
fetchLimit = 50 // Limit number of collectibles we fetch per provider call
accountOwnershipUpdateInterval = 30 * time.Minute
accountOwnershipUpdateDelayInterval = 30 * time.Second
)
type OwnershipState = int
const (
OwnershipStateIdle OwnershipState = iota + 1
OwnershipStateDelayed
OwnershipStateUpdating
OwnershipStateError
)
type periodicRefreshOwnedCollectiblesCommand struct {
@ -43,6 +53,17 @@ func newPeriodicRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *O
return ret
}
func (c *periodicRefreshOwnedCollectiblesCommand) DelayedCommand() async.Command {
return async.SingleShotCommand{
Interval: accountOwnershipUpdateDelayInterval,
Init: func(ctx context.Context) (err error) {
c.state.Store(OwnershipStateDelayed)
return nil
},
Runable: c.Command(),
}.Run
}
func (c *periodicRefreshOwnedCollectiblesCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: accountOwnershipUpdateInterval,

View file

@ -0,0 +1,352 @@
package collectibles
import (
"context"
"database/sql"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/accounts/accountsevent"
walletAccounts "github.com/status-im/status-go/services/wallet/accounts"
"github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
activityRefetchMarginSeconds = 30 * 60 // Trigger a fetch if activity is detected this many seconds before the last fetch
)
type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand
type commandPerAddressAndChainID = map[common.Address]commandPerChainID
type timerPerChainID = map[walletCommon.ChainID]*time.Timer
type timerPerAddressAndChainID = map[common.Address]timerPerChainID
type Controller struct {
manager *Manager
ownershipDB *OwnershipDB
walletFeed *event.Feed
accountsDB *accounts.Database
accountsFeed *event.Feed
networkManager *network.Manager
cancelFn context.CancelFunc
commands commandPerAddressAndChainID
timers timerPerAddressAndChainID
group *async.Group
accountsWatcher *walletAccounts.Watcher
walletEventsWatcher *walletevent.Watcher
commandsLock sync.RWMutex
}
func NewController(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Controller {
return &Controller{
manager: manager,
ownershipDB: NewOwnershipDB(db),
walletFeed: walletFeed,
accountsDB: accountsDB,
accountsFeed: accountsFeed,
networkManager: networkManager,
commands: make(commandPerAddressAndChainID),
timers: make(timerPerAddressAndChainID),
}
}
func (c *Controller) Start() {
// Setup periodical collectibles refresh
_ = c.startPeriodicalOwnershipFetch()
// Setup collectibles fetch when a new account gets added
c.startAccountsWatcher()
// Setup collectibles fetch when relevant activity is detected
c.startWalletEventsWatcher()
}
func (c *Controller) Stop() {
c.stopWalletEventsWatcher()
c.stopAccountsWatcher()
c.stopPeriodicalOwnershipFetch()
}
func (c *Controller) RefetchOwnedCollectibles() {
c.stopPeriodicalOwnershipFetch()
c.manager.ResetConnectionStatus()
_ = c.startPeriodicalOwnershipFetch()
}
func (c *Controller) GetCommandState(chainID walletCommon.ChainID, address common.Address) OwnershipState {
c.commandsLock.RLock()
defer c.commandsLock.RUnlock()
state := OwnershipStateIdle
if c.commands[address] != nil && c.commands[address][chainID] != nil {
state = c.commands[address][chainID].GetState()
}
return state
}
func (c *Controller) isPeriodicalOwnershipFetchRunning() bool {
return c.group != nil
}
// Starts periodical fetching for the all wallet addresses and all chains
func (c *Controller) startPeriodicalOwnershipFetch() error {
c.commandsLock.Lock()
defer c.commandsLock.Unlock()
if c.isPeriodicalOwnershipFetchRunning() {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
c.cancelFn = cancel
c.group = async.NewGroup(ctx)
addresses, err := c.accountsDB.GetWalletAddresses()
if err != nil {
return err
}
for _, addr := range addresses {
err := c.startPeriodicalOwnershipFetchForAccount(common.Address(addr))
if err != nil {
log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err)
return err
}
}
return nil
}
func (c *Controller) stopPeriodicalOwnershipFetch() {
c.commandsLock.Lock()
defer c.commandsLock.Unlock()
if !c.isPeriodicalOwnershipFetchRunning() {
return
}
if c.cancelFn != nil {
c.cancelFn()
c.cancelFn = nil
}
if c.group != nil {
c.group.Stop()
c.group.Wait()
c.group = nil
c.commands = make(commandPerAddressAndChainID)
}
}
// Starts (or restarts) periodical fetching for the given account address for all chains
func (c *Controller) startPeriodicalOwnershipFetchForAccount(address common.Address) error {
log.Debug("wallet.api.collectibles.Controller", "Start periodical fetching", "address", address)
networks, err := c.networkManager.Get(false)
if err != nil {
return err
}
areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled()
if err != nil {
return err
}
for _, network := range networks {
if network.IsTest != areTestNetworksEnabled {
continue
}
chainID := walletCommon.ChainID(network.ChainID)
err := c.startPeriodicalOwnershipFetchForAccountAndChainID(address, chainID, false)
if err != nil {
return err
}
}
return nil
}
// Starts (or restarts) periodical fetching for the given account address for all chains
func (c *Controller) startPeriodicalOwnershipFetchForAccountAndChainID(address common.Address, chainID walletCommon.ChainID, delayed bool) error {
log.Debug("wallet.api.collectibles.Controller", "Start periodical fetching", "address", address, "chainID", chainID, "delayed", delayed)
if !c.isPeriodicalOwnershipFetchRunning() {
return errors.New("periodical fetch not initialized")
}
err := c.stopPeriodicalOwnershipFetchForAccountAndChainID(address, chainID)
if err != nil {
return err
}
if _, ok := c.commands[address]; !ok {
c.commands[address] = make(commandPerChainID)
}
command := newPeriodicRefreshOwnedCollectiblesCommand(
c.manager,
c.ownershipDB,
c.walletFeed,
chainID,
address,
)
c.commands[address][chainID] = command
if delayed {
c.group.Add(command.DelayedCommand())
} else {
c.group.Add(command.Command())
}
return nil
}
// Stop periodical fetching for the given account address for all chains
func (c *Controller) stopPeriodicalOwnershipFetchForAccount(address common.Address) error {
log.Debug("wallet.api.collectibles.Controller", "Stop periodical fetching", "address", address)
if !c.isPeriodicalOwnershipFetchRunning() {
return errors.New("periodical fetch not initialized")
}
if _, ok := c.commands[address]; ok {
for chainID := range c.commands[address] {
err := c.stopPeriodicalOwnershipFetchForAccountAndChainID(address, chainID)
if err != nil {
return err
}
}
}
return nil
}
func (c *Controller) stopPeriodicalOwnershipFetchForAccountAndChainID(address common.Address, chainID walletCommon.ChainID) error {
log.Debug("wallet.api.collectibles.Controller", "Stop periodical fetching", "address", address, "chainID", chainID)
if !c.isPeriodicalOwnershipFetchRunning() {
return errors.New("periodical fetch not initialized")
}
if _, ok := c.commands[address]; ok {
if _, ok := c.commands[address][chainID]; ok {
c.commands[address][chainID].Stop()
delete(c.commands[address], chainID)
}
// If it was the last chain, delete the address as well
if len(c.commands[address]) == 0 {
delete(c.commands, address)
}
}
return nil
}
func (c *Controller) startAccountsWatcher() {
if c.accountsWatcher != nil {
return
}
accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) {
c.commandsLock.Lock()
defer c.commandsLock.Unlock()
// Whenever an account gets added, start fetching
if eventType == accountsevent.EventTypeAdded {
for _, address := range changedAddresses {
err := c.startPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
} else if eventType == accountsevent.EventTypeRemoved {
for _, address := range changedAddresses {
err := c.stopPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
}
}
c.accountsWatcher = walletAccounts.NewWatcher(c.accountsDB, c.accountsFeed, accountChangeCb)
c.accountsWatcher.Start()
}
func (c *Controller) stopAccountsWatcher() {
if c.accountsWatcher != nil {
c.accountsWatcher.Stop()
c.accountsWatcher = nil
}
}
func (c *Controller) startWalletEventsWatcher() {
if c.walletEventsWatcher != nil {
return
}
walletEventCb := func(event walletevent.Event) {
// EventRecentHistoryReady ?
if event.Type != transfer.EventInternalERC721TransferDetected {
return
}
chainID := walletCommon.ChainID(event.ChainID)
for _, account := range event.Accounts {
// Check last ownership update timestamp
timestamp, err := c.ownershipDB.GetOwnershipUpdateTimestamp(account, chainID)
if err != nil {
log.Error("Error getting ownership update timestamp", "error", err)
continue
}
if timestamp == InvalidTimestamp {
// Ownership was never fetched for this account
continue
}
timeCheck := timestamp - activityRefetchMarginSeconds
if timeCheck < 0 {
timeCheck = 0
}
if event.At > timeCheck {
// Restart fetching for account + chainID
c.commandsLock.Lock()
err := c.startPeriodicalOwnershipFetchForAccountAndChainID(account, chainID, true)
c.commandsLock.Unlock()
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", account, "error", err)
}
}
}
}
c.walletEventsWatcher = walletevent.NewWatcher(c.walletFeed, walletEventCb)
c.walletEventsWatcher.Start()
}
func (c *Controller) stopWalletEventsWatcher() {
if c.walletEventsWatcher != nil {
c.walletEventsWatcher.Stop()
c.walletEventsWatcher = nil
}
}

View file

@ -14,8 +14,6 @@ import (
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/accounts/accountsevent"
walletaccounts "github.com/status-im/status-go/services/wallet/accounts"
"github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
@ -44,35 +42,21 @@ var (
}
)
type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand
type commandPerAddressAndChainID = map[common.Address]commandPerChainID
type Service struct {
manager *Manager
ownershipDB *OwnershipDB
walletFeed *event.Feed
accountsDB *accounts.Database
accountsFeed *event.Feed
networkManager *network.Manager
cancelFn context.CancelFunc
commands commandPerAddressAndChainID
group *async.Group
scheduler *async.MultiClientScheduler
accountsWatcher *walletaccounts.Watcher
manager *Manager
controller *Controller
ownershipDB *OwnershipDB
walletFeed *event.Feed
scheduler *async.MultiClientScheduler
}
func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service {
return &Service{
manager: manager,
ownershipDB: NewOwnershipDB(db),
walletFeed: walletFeed,
accountsDB: accountsDB,
accountsFeed: accountsFeed,
networkManager: networkManager,
commands: make(commandPerAddressAndChainID),
scheduler: async.NewMultiClientScheduler(),
manager: manager,
controller: NewController(db, walletFeed, accountsDB, accountsFeed, networkManager, manager),
ownershipDB: NewOwnershipDB(db),
walletFeed: walletFeed,
scheduler: async.NewMultiClientScheduler(),
}
}
@ -84,14 +68,6 @@ const (
ErrorCodeFailed
)
type OwnershipState = int
const (
OwnershipStateIdle OwnershipState = iota + 1
OwnershipStateUpdating
OwnershipStateError
)
type OwnershipStatus struct {
State OwnershipState `json:"state"`
Timestamp int64 `json:"timestamp"`
@ -189,160 +165,15 @@ func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []third
}
func (s *Service) RefetchOwnedCollectibles() {
s.stopPeriodicalOwnershipFetch()
s.manager.ResetConnectionStatus()
_ = s.startPeriodicalOwnershipFetch()
}
// Starts periodical fetching for the all wallet addresses and all chains
func (s *Service) startPeriodicalOwnershipFetch() error {
if s.group != nil {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
s.cancelFn = cancel
s.group = async.NewGroup(ctx)
addresses, err := s.accountsDB.GetWalletAddresses()
if err != nil {
return err
}
for _, addr := range addresses {
err := s.startPeriodicalOwnershipFetchForAccount(common.Address(addr))
if err != nil {
log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err)
return err
}
}
return nil
}
func (s *Service) stopPeriodicalOwnershipFetch() {
if s.cancelFn != nil {
s.cancelFn()
s.cancelFn = nil
}
if s.group != nil {
s.group.Stop()
s.group.Wait()
s.group = nil
s.commands = make(commandPerAddressAndChainID)
}
}
// Starts (or restarts) periodical fetching for the given account address for all chains
func (s *Service) startPeriodicalOwnershipFetchForAccount(address common.Address) error {
if s.group == nil {
return errors.New("periodical fetch group not initialized")
}
networks, err := s.networkManager.Get(false)
if err != nil {
return err
}
areTestNetworksEnabled, err := s.accountsDB.GetTestNetworksEnabled()
if err != nil {
return err
}
if _, ok := s.commands[address]; ok {
for chainID, command := range s.commands[address] {
command.Stop()
delete(s.commands[address], chainID)
}
}
s.commands[address] = make(commandPerChainID)
for _, network := range networks {
if network.IsTest != areTestNetworksEnabled {
continue
}
chainID := walletCommon.ChainID(network.ChainID)
command := newPeriodicRefreshOwnedCollectiblesCommand(
s.manager,
s.ownershipDB,
s.walletFeed,
chainID,
address,
)
s.commands[address][chainID] = command
s.group.Add(command.Command())
}
return nil
}
// Stop periodical fetching for the given account address for all chains
func (s *Service) stopPeriodicalOwnershipFetchForAccount(address common.Address) error {
if s.group == nil {
return errors.New("periodical fetch group not initialized")
}
if _, ok := s.commands[address]; ok {
for _, command := range s.commands[address] {
command.Stop()
}
delete(s.commands, address)
}
return nil
}
func (s *Service) startAccountsWatcher() {
if s.accountsWatcher != nil {
return
}
accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) {
// Whenever an account gets added, start fetching
if eventType == accountsevent.EventTypeAdded {
for _, address := range changedAddresses {
err := s.startPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
} else if eventType == accountsevent.EventTypeRemoved {
for _, address := range changedAddresses {
err := s.stopPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
}
}
s.accountsWatcher = walletaccounts.NewWatcher(s.accountsDB, s.accountsFeed, accountChangeCb)
s.accountsWatcher.Start()
}
func (s *Service) stopAccountsWatcher() {
if s.accountsWatcher != nil {
s.accountsWatcher.Stop()
s.accountsWatcher = nil
}
s.controller.RefetchOwnedCollectibles()
}
func (s *Service) Start() {
// Setup periodical collectibles refresh
_ = s.startPeriodicalOwnershipFetch()
// Setup collectibles fetch when a new account gets added
s.startAccountsWatcher()
s.controller.Start()
}
func (s *Service) Stop() {
s.stopAccountsWatcher()
s.stopPeriodicalOwnershipFetch()
s.controller.Stop()
s.scheduler.Stop()
}
@ -398,12 +229,8 @@ func (s *Service) GetOwnershipStatus(chainIDs []walletCommon.ChainID, owners []c
if err != nil {
return nil, err
}
state := OwnershipStateIdle
if s.commands[address] != nil && s.commands[address][chainID] != nil {
state = s.commands[address][chainID].GetState()
}
ret[address][chainID] = OwnershipStatus{
State: state,
State: s.controller.GetCommandState(chainID, address),
Timestamp: timestamp,
}
}

View file

@ -33,6 +33,9 @@ const (
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected walletevent.EventType = "non-archival-node-detected"
// EventInternalERC721TransferDetected emitted when ERC721 transfer is detected
EventInternalERC721TransferDetected walletevent.EventType = walletevent.InternalEventTypePrefix + "erc721-transfer-detected"
numberOfBlocksCheckedPerIteration = 40
noBlockLimit = 0
)
@ -432,6 +435,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...)
c.notifyOfNewTransfers(blockNum, allTransfers)
c.notifyOfNewERC721Transfers(allTransfers)
log.Debug("transfersCommand block end", "chain", c.chainClient.NetworkID(), "address", c.address,
"block", blockNum, "tranfers.len", len(allTransfers), "fetchedTransfers.len", len(c.fetchedTransfers))
@ -621,6 +625,28 @@ func (c *transfersCommand) notifyOfNewTransfers(blockNum *big.Int, transfers []T
}
}
func (c *transfersCommand) notifyOfNewERC721Transfers(transfers []Transfer) {
if c.feed != nil {
// Internal event for ERC721 transfers
latestERC721TransferTimestamp := uint64(0)
for _, transfer := range transfers {
if transfer.Type == w_common.Erc721Transfer {
if transfer.Timestamp > latestERC721TransferTimestamp {
latestERC721TransferTimestamp = transfer.Timestamp
}
}
}
if latestERC721TransferTimestamp > 0 {
c.feed.Send(walletevent.Event{
Type: EventInternalERC721TransferDetected,
Accounts: []common.Address{c.address},
ChainID: c.chainClient.NetworkID(),
At: int64(latestERC721TransferTimestamp),
})
}
}
}
type loadTransfersCommand struct {
accounts []common.Address
db *Database

View file

@ -2,6 +2,7 @@ package walletevent
import (
"math/big"
"strings"
"github.com/ethereum/go-ethereum/common"
)
@ -9,6 +10,15 @@ import (
// EventType type for event types.
type EventType string
// EventType prefix to be used for internal events.
// These events are not forwarded to the client, they are only used
// within status-go.
const InternalEventTypePrefix = "INT-"
func (t EventType) IsInternal() bool {
return strings.HasPrefix(string(t), InternalEventTypePrefix)
}
// Event is a type for transfer events.
type Event struct {
Type EventType `json:"type"`

View file

@ -47,7 +47,9 @@ func (tmr *SignalsTransmitter) Start() error {
}
return
case event := <-events:
signal.SendWalletEvent(event)
if !event.Type.IsInternal() {
signal.SendWalletEvent(event)
}
}
}
}()

View file

@ -0,0 +1,65 @@
package walletevent
import (
"context"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async"
)
type EventCb func(event Event)
// Watcher executes a given callback whenever a wallet event gets sent
type Watcher struct {
feed *event.Feed
group *async.Group
callback EventCb
}
func NewWatcher(feed *event.Feed, callback EventCb) *Watcher {
return &Watcher{
feed: feed,
callback: callback,
}
}
func (w *Watcher) Start() {
if w.group != nil {
return
}
w.group = async.NewGroup(context.Background())
w.group.Add(func(ctx context.Context) error {
return watch(ctx, w.feed, w.callback)
})
}
func (w *Watcher) Stop() {
if w.group != nil {
w.group.Stop()
w.group.Wait()
w.group = nil
}
}
func watch(ctx context.Context, feed *event.Feed, callback EventCb) error {
ch := make(chan Event, 10)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
if err != nil {
log.Error("wallet event watcher subscription failed", "error", err)
}
case ev := <-ch:
if callback != nil {
callback(ev)
}
}
}
}