[#797] Unify EnsureSync on t/utils/utils.go and StatusManager (#806)

This commit is contained in:
Adrià Cidre 2018-04-09 09:16:43 +02:00 committed by GitHub
parent 359b3621e9
commit df35ad6dbe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 67 additions and 149 deletions

View file

@ -24,14 +24,13 @@ import (
// errors // errors
var ( var (
ErrNodeExists = errors.New("node is already running") ErrNodeExists = errors.New("node is already running")
ErrNoRunningNode = errors.New("there is no running node") ErrNoRunningNode = errors.New("there is no running node")
ErrInvalidStatusNode = errors.New("status node is not properly initialized") ErrInvalidStatusNode = errors.New("status node is not properly initialized")
ErrInvalidWhisperService = errors.New("whisper service is unavailable") ErrInvalidService = errors.New("service is unavailable")
ErrInvalidLightEthereumService = errors.New("LES service is unavailable") ErrInvalidAccountManager = errors.New("could not retrieve account manager")
ErrInvalidAccountManager = errors.New("could not retrieve account manager") ErrAccountKeyStoreMissing = errors.New("account key store is not set")
ErrAccountKeyStoreMissing = errors.New("account key store is not set") ErrRPCClient = errors.New("failed to init RPC client")
ErrRPCClient = errors.New("failed to init RPC client")
) )
// RPCClientError reported when rpc client is initialized. // RPCClientError reported when rpc client is initialized.
@ -43,14 +42,12 @@ type EthNodeError error
// StatusNode abstracts contained geth node and provides helper methods to // StatusNode abstracts contained geth node and provides helper methods to
// interact with it. // interact with it.
type StatusNode struct { type StatusNode struct {
mu sync.RWMutex mu sync.RWMutex
config *params.NodeConfig // Status node configuration config *params.NodeConfig // Status node configuration
node *node.Node // reference to Geth P2P stack/node gethNode *node.Node // reference to Geth P2P stack/node
whisperService *whisper.Whisper // reference to Whisper service rpcClient *rpc.Client // reference to RPC client
lesService *les.LightEthereum // reference to LES service log log.Logger
rpcClient *rpc.Client // reference to RPC client
log log.Logger
} }
// New makes new instance of StatusNode. // New makes new instance of StatusNode.
@ -77,7 +74,7 @@ func (n *StatusNode) start(config *params.NodeConfig) error {
if err != nil { if err != nil {
return err return err
} }
n.node = ethNode n.gethNode = ethNode
n.config = config n.config = config
// activate MailService required for Offline Inboxing // activate MailService required for Offline Inboxing
@ -92,7 +89,7 @@ func (n *StatusNode) start(config *params.NodeConfig) error {
return EthNodeError(err) return EthNodeError(err)
} }
// init RPC client for this node // init RPC client for this node
localRPCClient, err := n.node.Attach() localRPCClient, err := n.gethNode.Attach()
if err == nil { if err == nil {
n.rpcClient, err = rpc.NewClient(localRPCClient, n.config.UpstreamConfig) n.rpcClient, err = rpc.NewClient(localRPCClient, n.config.UpstreamConfig)
} }
@ -115,13 +112,11 @@ func (n *StatusNode) stop() error {
if err := n.isAvailable(); err != nil { if err := n.isAvailable(); err != nil {
return err return err
} }
if err := n.node.Stop(); err != nil { if err := n.gethNode.Stop(); err != nil {
return err return err
} }
n.node = nil n.gethNode = nil
n.config = nil n.config = nil
n.lesService = nil
n.whisperService = nil
n.rpcClient = nil n.rpcClient = nil
return nil return nil
} }
@ -164,7 +159,7 @@ func (n *StatusNode) GethNode() (*node.Node, error) {
if err := n.isAvailable(); err != nil { if err := n.isAvailable(); err != nil {
return nil, err return nil, err
} }
return n.node, nil return n.gethNode, nil
} }
// populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster // populateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
@ -194,7 +189,7 @@ func (n *StatusNode) removeStaticPeers() error {
n.log.Info("Static peers are disabled") n.log.Info("Static peers are disabled")
return nil return nil
} }
server := n.node.Server() server := n.gethNode.Server()
if server == nil { if server == nil {
return ErrNoRunningNode return ErrNoRunningNode
} }
@ -236,7 +231,7 @@ func (n *StatusNode) addPeer(url string) error {
if err != nil { if err != nil {
return err return err
} }
n.node.Server().AddPeer(parsedNode) n.gethNode.Server().AddPeer(parsedNode)
return nil return nil
} }
@ -245,7 +240,7 @@ func (n *StatusNode) removePeer(url string) error {
if err != nil { if err != nil {
return err return err
} }
n.node.Server().RemovePeer(parsedNode) n.gethNode.Server().RemovePeer(parsedNode)
return nil return nil
} }
@ -254,7 +249,7 @@ func (n *StatusNode) PeerCount() int {
if !n.IsRunning() { if !n.IsRunning() {
return 0 return 0
} }
return n.node.Server().PeerCount() return n.gethNode.Server().PeerCount()
} }
// Config exposes reference to running node's configuration // Config exposes reference to running node's configuration
@ -268,44 +263,28 @@ func (n *StatusNode) Config() (*params.NodeConfig, error) {
return n.config, nil return n.config, nil
} }
// LightEthereumService exposes reference to LES service running on top of the node // gethService is a wrapper for gethNode.Service which retrieves a currently
func (n *StatusNode) LightEthereumService() (*les.LightEthereum, error) { // running service registered of a specific type.
n.mu.RLock() func (n *StatusNode) gethService(serviceInstance interface{}, serviceName string) error {
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil { if err := n.isAvailable(); err != nil {
return nil, err return err
} }
if n.lesService == nil { if err := n.gethNode.Service(serviceInstance); err != nil || serviceInstance == nil {
if err := n.node.Service(&n.lesService); err != nil { n.log.Warn("Cannot obtain ", serviceName, " service", "error", err)
n.log.Warn("Cannot obtain LES service", "error", err) return ErrInvalidService
return nil, ErrInvalidLightEthereumService
}
} }
if n.lesService == nil {
return nil, ErrInvalidLightEthereumService return nil
} }
return n.lesService, nil
// LightEthereumService exposes reference to LES service running on top of the node
func (n *StatusNode) LightEthereumService() (l *les.LightEthereum, err error) {
return l, n.gethService(&l, "LES")
} }
// WhisperService exposes reference to Whisper service running on top of the node // WhisperService exposes reference to Whisper service running on top of the node
func (n *StatusNode) WhisperService() (*whisper.Whisper, error) { func (n *StatusNode) WhisperService() (w *whisper.Whisper, err error) {
n.mu.RLock() return w, n.gethService(&w, "whisper")
defer n.mu.RUnlock()
if err := n.isAvailable(); err != nil {
return nil, err
}
if n.whisperService == nil {
if err := n.node.Service(&n.whisperService); err != nil {
n.log.Warn("Cannot obtain whisper service", "error", err)
return nil, ErrInvalidWhisperService
}
}
if n.whisperService == nil {
return nil, ErrInvalidWhisperService
}
return n.whisperService, nil
} }
// AccountManager exposes reference to node's accounts manager // AccountManager exposes reference to node's accounts manager
@ -316,7 +295,7 @@ func (n *StatusNode) AccountManager() (*accounts.Manager, error) {
if err := n.isAvailable(); err != nil { if err := n.isAvailable(); err != nil {
return nil, err return nil, err
} }
accountManager := n.node.AccountManager() accountManager := n.gethNode.AccountManager()
if accountManager == nil { if accountManager == nil {
return nil, ErrInvalidAccountManager return nil, ErrInvalidAccountManager
} }
@ -331,7 +310,7 @@ func (n *StatusNode) AccountKeyStore() (*keystore.KeyStore, error) {
if err := n.isAvailable(); err != nil { if err := n.isAvailable(); err != nil {
return nil, err return nil, err
} }
accountManager := n.node.AccountManager() accountManager := n.gethNode.AccountManager()
if accountManager == nil { if accountManager == nil {
return nil, ErrInvalidAccountManager return nil, ErrInvalidAccountManager
} }
@ -358,7 +337,7 @@ func (n *StatusNode) RPCClient() *rpc.Client {
// isAvailable check if we have a node running and make sure is fully started // isAvailable check if we have a node running and make sure is fully started
func (n *StatusNode) isAvailable() error { func (n *StatusNode) isAvailable() error {
if n.node == nil || n.node.Server() == nil { if n.gethNode == nil || n.gethNode.Server() == nil {
return ErrNoRunningNode return ErrNoRunningNode
} }
return nil return nil

View file

@ -255,7 +255,7 @@ func testResetChainData(t *testing.T) bool {
return false return false
} }
EnsureNodeSync(statusAPI.StatusNode()) EnsureNodeSync(statusAPI.StatusNode().EnsureSync)
testCompleteTransaction(t) testCompleteTransaction(t)
return true return true
@ -771,7 +771,7 @@ func testCompleteTransaction(t *testing.T) bool {
txQueue := txQueueManager.TransactionQueue() txQueue := txQueueManager.TransactionQueue()
txQueue.Reset() txQueue.Reset()
EnsureNodeSync(statusAPI.StatusNode()) EnsureNodeSync(statusAPI.StatusNode().EnsureSync)
// log into account from which transactions will be sent // log into account from which transactions will be sent
if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil {
@ -1439,7 +1439,7 @@ func startTestNode(t *testing.T) <-chan struct{} {
// sync // sync
if syncRequired { if syncRequired {
t.Logf("Sync is required") t.Logf("Sync is required")
EnsureNodeSync(statusAPI.StatusNode()) EnsureNodeSync(statusAPI.StatusNode().EnsureSync)
} else { } else {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }

View file

@ -234,7 +234,7 @@ func (s *APIBackendTestSuite) TestResetChainData() {
s.StartTestBackend(e2e.WithDataDir(path)) s.StartTestBackend(e2e.WithDataDir(path))
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
require.NoError(s.Backend.ResetChainData()) require.NoError(s.Backend.ResetChainData())

View file

@ -39,7 +39,7 @@ func (s *JailRPCTestSuite) TestJailRPCSend() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// load Status JS and add test command to it // load Status JS and add test command to it
s.jail.SetBaseJS(baseStatusJSCode) s.jail.SetBaseJS(baseStatusJSCode)
@ -110,7 +110,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// obtain VM for a given chat (to send custom JS to jailed version of Send()) // obtain VM for a given chat (to send custom JS to jailed version of Send())
s.jail.CreateAndInitCell(testChatID) s.jail.CreateAndInitCell(testChatID)
@ -193,7 +193,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// log into account from which transactions will be sent // log into account from which transactions will be sent
err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)

View file

@ -164,7 +164,7 @@ func (s *RPCTestSuite) TestCallContextResult() {
s.StartTestNode() s.StartTestNode()
defer s.StopTestNode() defer s.StopTestNode()
EnsureNodeSync(s.StatusNode) EnsureNodeSync(s.StatusNode.EnsureSync)
client := s.StatusNode.RPCClient() client := s.StatusNode.RPCClient()
s.NotNil(client) s.NotNil(client)

View file

@ -3,7 +3,6 @@ package e2e
import ( import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/les"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/api" "github.com/status-im/status-go/geth/api"
@ -124,15 +123,6 @@ func (s *BackendTestSuite) WhisperService() *whisper.Whisper {
return whisperService return whisperService
} }
// LightEthereumService returns a reference to the LES service.
func (s *BackendTestSuite) LightEthereumService() *les.LightEthereum {
lightEthereum, err := s.Backend.StatusNode().LightEthereumService()
s.NoError(err)
s.NotNil(lightEthereum)
return lightEthereum
}
// TxQueueManager returns a reference to the TxQueueManager. // TxQueueManager returns a reference to the TxQueueManager.
func (s *BackendTestSuite) TxQueueManager() *transactions.Manager { func (s *BackendTestSuite) TxQueueManager() *transactions.Manager {
return s.Backend.TxQueueManager() return s.Backend.TxQueueManager()

View file

@ -37,7 +37,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
s.NoError(err) s.NoError(err)
@ -187,7 +187,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
sampleAddress, _, _, err := s.Backend.AccountManager().CreateAccount(TestConfig.Account1.Password) sampleAddress, _, _, err := s.Backend.AccountManager().CreateAccount(TestConfig.Account1.Password)
s.NoError(err) s.NoError(err)
@ -282,7 +282,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// create an account // create an account
sampleAddress, _, _, err := s.Backend.AccountManager().CreateAccount(TestConfig.Account1.Password) sampleAddress, _, _, err := s.Backend.AccountManager().CreateAccount(TestConfig.Account1.Password)
@ -419,7 +419,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// log into account from which transactions will be sent // log into account from which transactions will be sent
s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
@ -493,7 +493,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// reset queue // reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset() s.Backend.TxQueueManager().TransactionQueue().Reset()
@ -583,7 +583,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
s.StartTestBackend() s.StartTestBackend()
defer s.StopTestBackend() defer s.StopTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// reset queue // reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset() s.Backend.TxQueueManager().TransactionQueue().Reset()
@ -780,7 +780,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactionsUpstream()
func (s *TransactionsTestSuite) setupLocalNode() { func (s *TransactionsTestSuite) setupLocalNode() {
s.StartTestBackend() s.StartTestBackend()
EnsureNodeSync(s.Backend.StatusNode()) EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
} }
func (s *TransactionsTestSuite) setupUpstreamNode() { func (s *TransactionsTestSuite) setupUpstreamNode() {

View file

@ -2,6 +2,7 @@ package utils
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"flag" "flag"
@ -16,7 +17,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
@ -52,6 +52,8 @@ var (
// All general log messages in this package should be routed through this logger. // All general log messages in this package should be routed through this logger.
logger = log.New("package", "status-go/t/utils") logger = log.New("package", "status-go/t/utils")
syncTimeout = 50 * time.Minute
) )
func init() { func init() {
@ -77,7 +79,7 @@ func init() {
// setup auxiliary directories // setup auxiliary directories
TestDataDir = filepath.Join(RootDir, ".ethereumtest") TestDataDir = filepath.Join(RootDir, ".ethereumtest")
TestConfig, err = loadTestConfig(GetNetworkID()) TestConfig, err = loadTestConfig()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -98,71 +100,18 @@ func LoadFromFile(filename string) string {
return buf.String() return buf.String()
} }
// LightEthereumProvider interface to be used on EnsureNodeSync // EnsureSync waits until blockchain synchronization is complete and returns.
// TODO (adriacidre) This interface name should be reviewed as it has a lot type EnsureSync func(context.Context) error
// of unrelated methods.
type LightEthereumProvider interface {
// NodeConfig returns reference to running node's configuration
Config() (*params.NodeConfig, error)
// LightEthereumService exposes reference to LES service running on top of the node
LightEthereumService() (*les.LightEthereum, error)
// PeerCount returns number of connected peers
PeerCount() int
}
// EnsureNodeSync waits until node synchronzation is done to continue // EnsureNodeSync waits until node synchronzation is done to continue
// with tests afterwards. Panics in case of an error or a timeout. // with tests afterwards. Panics in case of an error or a timeout.
func EnsureNodeSync(lesProvider LightEthereumProvider) { func EnsureNodeSync(ensureSync EnsureSync) {
nc, err := lesProvider.Config() ctx, cancel := context.WithTimeout(context.Background(), syncTimeout)
if err != nil { defer cancel()
panic("can't retrieve NodeConfig")
}
// Don't wait for any blockchain sync for the local private chain as blocks are never mined.
if nc.NetworkID == params.StatusChainNetworkID {
return
}
les, err := lesProvider.LightEthereumService() if err := ensureSync(ctx); err != nil {
if err != nil {
panic(err) panic(err)
} }
if les == nil {
panic("LightEthereumService is nil")
}
// todo(@jeka): we should extract it into config
timeout := time.NewTimer(50 * time.Minute)
defer timeout.Stop()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-timeout.C:
panic("timeout during node synchronization")
case <-ticker.C:
downloader := les.Downloader()
if downloader == nil {
continue
}
if lesProvider.PeerCount() == 0 {
logger.Debug("No establishished connections with a peers, continue waiting for a sync")
continue
}
if downloader.Synchronising() {
logger.Debug("synchronization is in progress")
continue
}
progress := downloader.Progress()
if progress.CurrentBlock >= progress.HighestBlock {
return
}
logger.Debug(
fmt.Sprintf("synchronization is not finished yet: current block %d < highest block %d",
progress.CurrentBlock, progress.HighestBlock),
)
}
}
} }
// GetRemoteURLFromNetworkID returns associated network url for giving network id. // GetRemoteURLFromNetworkID returns associated network url for giving network id.
@ -305,7 +254,7 @@ type testConfig struct {
const passphraseEnvName = "ACCOUNT_PASSWORD" const passphraseEnvName = "ACCOUNT_PASSWORD"
// loadTestConfig loads test configuration values from disk // loadTestConfig loads test configuration values from disk
func loadTestConfig(networkID int) (*testConfig, error) { func loadTestConfig() (*testConfig, error) {
var config testConfig var config testConfig
configData := static.MustAsset("config/test-data.json") configData := static.MustAsset("config/test-data.json")
@ -313,7 +262,7 @@ func loadTestConfig(networkID int) (*testConfig, error) {
return nil, err return nil, err
} }
if networkID == params.StatusChainNetworkID { if GetNetworkID() == params.StatusChainNetworkID {
accountsData := static.MustAsset("config/status-chain-accounts.json") accountsData := static.MustAsset("config/status-chain-accounts.json")
if err := json.Unmarshal(accountsData, &config); err != nil { if err := json.Unmarshal(accountsData, &config); err != nil {
return nil, err return nil, err