From 84cb5ca9174b289ec2dbfb7247004145f62059df Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 4 May 2018 08:00:55 +0300 Subject: [PATCH] Ensure that wg.Add is called before wg.Wait Now if Add is to be called it will be called before Wait, this is achieved by doing following: - if cancel() gets lock first and closes channelCh before spawnSync is called we will exit right away - if not than we will ensure that we hold a lock until syncers are spawned so that cancel() will be blocked for this time and it will prevent whole Terminate() from progressing --- .../patches/geth/0016-fix-leveldb-issue.patch | 105 +++++++++--------- .../go-ethereum/eth/downloader/downloader.go | 3 + 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/_assets/patches/geth/0016-fix-leveldb-issue.patch b/_assets/patches/geth/0016-fix-leveldb-issue.patch index a4a0c2b12..0bcf0bb19 100644 --- a/_assets/patches/geth/0016-fix-leveldb-issue.patch +++ b/_assets/patches/geth/0016-fix-leveldb-issue.patch @@ -1,8 +1,8 @@ -diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go -index cbdaa0ca2..60ff8dbc6 100644 ---- a/eth/downloader/downloader.go -+++ b/eth/downloader/downloader.go -@@ -143,6 +143,8 @@ +diff --git i/eth/downloader/downloader.go w/eth/downloader/downloader.go +index 43f0e3db9..b337f95c9 100644 +--- i/eth/downloader/downloader.go ++++ w/eth/downloader/downloader.go +@@ -143,6 +143,8 @@ type Downloader struct { quitCh chan struct{} // Quit channel to signal termination quitLock sync.RWMutex // Lock to prevent double closes @@ -11,7 +11,7 @@ index cbdaa0ca2..60ff8dbc6 100644 // Testing hooks syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch -@@ -403,7 +405,9 @@ +@@ -403,7 +405,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) @@ -21,7 +21,7 @@ index cbdaa0ca2..60ff8dbc6 100644 // reset on error if err != nil { d.mux.Post(FailedEvent{err}) -@@ -471,12 +475,17 @@ +@@ -471,14 +475,22 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } @@ -33,15 +33,20 @@ index cbdaa0ca2..60ff8dbc6 100644 // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(fetchers []func() error) error { +func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { ++ d.cancelLock.Lock() + select { + case <-d.cancelCh: ++ d.cancelLock.Unlock() + return errCancel + default: + } errc := make(chan error, len(fetchers)) d.cancelWg.Add(len(fetchers)) ++ d.cancelLock.Unlock() for _, fn := range fetchers { -@@ -539,6 +548,10 @@ + fn := fn + go func() { defer d.cancelWg.Done(); errc <- fn() }() +@@ -539,6 +551,10 @@ func (d *Downloader) Terminate() { // Cancel any pending download requests d.Cancel() @@ -52,75 +57,75 @@ index cbdaa0ca2..60ff8dbc6 100644 } // fetchHeight retrieves the head header of the remote peer to aid in estimating -diff --git a/eth/handler.go b/eth/handler.go -index c2426544f..08818a507 100644 ---- a/eth/handler.go -+++ b/eth/handler.go +diff --git i/eth/handler.go w/eth/handler.go +index 4069359c9..da9ebb243 100644 +--- i/eth/handler.go ++++ w/eth/handler.go @@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() { // Quit fetcher, txsyncLoop. close(pm.quitSync) - + + // Stop downloader and make sure that all the running downloads are complete. + pm.downloader.Terminate() + // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet -diff --git a/eth/sync.go b/eth/sync.go -index 2da1464bc..f14e84303 100644 ---- a/eth/sync.go -+++ b/eth/sync.go +diff --git i/eth/sync.go w/eth/sync.go +index e49e40087..4367434a6 100644 +--- i/eth/sync.go ++++ w/eth/sync.go @@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.fetcher.Start() defer pm.fetcher.Stop() - defer pm.downloader.Terminate() - + // Wait for different events to fire synchronisation operations forceSync := time.NewTicker(forceSyncCycle) -diff --git a/les/handler.go b/les/handler.go -index 864abe605..67e459594 100644 ---- a/les/handler.go -+++ b/les/handler.go +diff --git i/les/backend.go w/les/backend.go +index 6a324cb04..e3844bf84 100644 +--- i/les/backend.go ++++ w/les/backend.go +@@ -20,7 +20,6 @@ package les + import ( + "fmt" + "sync" +- "time" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" +@@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error { + + s.eventMux.Stop() + +- time.Sleep(time.Millisecond * 200) + s.chainDb.Close() + close(s.shutdownChan) + +diff --git i/les/handler.go w/les/handler.go +index 9627f392b..f2bbe899f 100644 +--- i/les/handler.go ++++ w/les/handler.go @@ -241,6 +241,9 @@ func (pm *ProtocolManager) Stop() { - + close(pm.quitSync) // quits syncer, fetcher - + + // Stop downloader and make sure that all the running downloads are complete. + pm.downloader.Terminate() + // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet -diff --git a/les/sync.go b/les/sync.go -index c0e17f97d..983dc7f36 100644 ---- a/les/sync.go -+++ b/les/sync.go -@@ -36,7 +36,6 @@ func (pm *ProtocolManager) syncer() { +diff --git i/les/sync.go w/les/sync.go +index c3d37e2f3..fc1f076c7 100644 +--- i/les/sync.go ++++ w/les/sync.go +@@ -31,7 +31,6 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms //pm.fetcher.Start() //defer pm.fetcher.Stop() - defer pm.downloader.Terminate() - + // Wait for different events to fire synchronisation operations //forceSync := time.Tick(forceSyncCycle) -diff --git a/les/backend.go b/les/backend.go -index 6a324cb0..e3844bf8 100644 ---- a/les/backend.go -+++ b/les/backend.go -@@ -20,7 +20,6 @@ package les - import ( - "fmt" - "sync" -- "time" - - "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/common" -@@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error { - - s.eventMux.Stop() - -- time.Sleep(time.Millisecond * 200) - s.chainDb.Close() - close(s.shutdownChan) - diff --git a/vendor/github.com/ethereum/go-ethereum/eth/downloader/downloader.go b/vendor/github.com/ethereum/go-ethereum/eth/downloader/downloader.go index 7b9951ff2..b337f95c9 100644 --- a/vendor/github.com/ethereum/go-ethereum/eth/downloader/downloader.go +++ b/vendor/github.com/ethereum/go-ethereum/eth/downloader/downloader.go @@ -481,13 +481,16 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { + d.cancelLock.Lock() select { case <-d.cancelCh: + d.cancelLock.Unlock() return errCancel default: } errc := make(chan error, len(fetchers)) d.cancelWg.Add(len(fetchers)) + d.cancelLock.Unlock() for _, fn := range fetchers { fn := fn go func() { defer d.cancelWg.Done(); errc <- fn() }()