Fix channel subscriber that was blocking sending new requests (#1465)

In RequestMessagesSync subscriber is listening to a feed where all whisper
events are posted. After we received event with a request hash - subscriber will
stop actively consuming messages from a feed, as a subscription channel will
get overflow and whole feed will get blocked.

Some events are posted to a feed before request is sent, so blocked feed results
in blocked sending.

Now we will unsubscribe after relevant event was received, and terminate subscriber
explicitly by timeout.
This commit is contained in:
Dmitry Shulyak 2019-05-20 11:10:26 +03:00 committed by GitHub
parent 4ab08629f6
commit d32cd18c09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 6 deletions

View file

@ -217,22 +217,24 @@ func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) (
shh := api.service.w
events := make(chan whisper.EnvelopeEvent, 10)
sub := shh.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
var (
requestID hexutil.Bytes
err error
retries int
)
for retries <= conf.MaxRetries {
sub := shh.SubscribeEnvelopeEvents(events)
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
timeout := r.Timeout
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
requestID, err = api.RequestMessages(context.Background(), r)
if err != nil {
sub.Unsubscribe()
return resp, err
}
mailServerResp, err := waitForExpiredOrCompleted(common.BytesToHash(requestID), events)
mailServerResp, err := waitForExpiredOrCompleted(common.BytesToHash(requestID), events, timeout)
sub.Unsubscribe()
if err == nil {
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
resp.Error = mailServerResp.Error
@ -244,9 +246,17 @@ func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) (
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
}
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent) (*whisper.MailServerResponse, error) {
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent, timeout time.Duration) (*whisper.MailServerResponse, error) {
expired := fmt.Errorf("request %x expired", requestID)
after := time.NewTimer(timeout)
defer after.Stop()
for {
ev := <-events
var ev whisper.EnvelopeEvent
select {
case ev = <-events:
case <-after.C:
return nil, expired
}
if ev.Hash != requestID {
continue
}
@ -258,7 +268,7 @@ func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.Envelo
}
return nil, errors.New("invalid event data type")
case whisper.EventMailServerRequestExpired:
return nil, errors.New("request expired")
return nil, expired
}
}
}

View file

@ -220,3 +220,20 @@ func TestSyncMessagesErrors(t *testing.T) {
})
}
}
func TestExpiredOrCompleted(t *testing.T) {
timeout := time.Millisecond
events := make(chan whisper.EnvelopeEvent)
errors := make(chan error, 1)
hash := common.Hash{1}
go func() {
_, err := waitForExpiredOrCompleted(hash, events, timeout)
errors <- err
}()
select {
case <-time.After(time.Second):
require.FailNow(t, "timed out waiting for waitForExpiredOrCompleted to complete")
case err := <-errors:
require.EqualError(t, err, fmt.Sprintf("request %x expired", hash))
}
}