Fetch conversations once, clean up ConversationController API (#1420)

* Fetch conversations once, clean up ConversationController API

Race conditions around re-fetching have caused some problems recently,
so this removes the need to re-fetch conversations. They are fetched
once or saved once, and that is it. All interaction goes through the
ConversationController, which is the central source of truth.

We have two rules for Conversations:

1. If a conversation is in the ConversationController it doesn't need
   to be fetched, but its initial fetch/save might be in progress. You
   can wait for that fetch/save with conversation.initialPromise.
2. If a conversation is not already in the ConversationController, it's
   not yet in the database. It needs to be added to the
   ConversationController and saved to the database.

FREEBIE

* Remove Conversation.fetch() call in Message.handleDataMessage()

FREEBIE

* ConversationController.API cleanup: Fix two missing spots

FREEBIE
This commit is contained in:
Scott Nonnenberg 2017-09-01 09:10:41 -07:00 committed by GitHub
parent 51cd28bb4a
commit d8ce198f55
10 changed files with 276 additions and 294 deletions

View File

@ -193,7 +193,7 @@
return;
}
return ConversationController.findOrCreateById(id, 'private')
return ConversationController.getOrCreateAndWait(id, 'private')
.then(function(conversation) {
return new Promise(function(resolve, reject) {
conversation.save({
@ -229,7 +229,7 @@
var details = ev.groupDetails;
var id = details.id;
return ConversationController.findOrCreateById(id, 'group').then(function(conversation) {
return ConversationController.getOrCreateAndWait(id, 'group').then(function(conversation) {
var updates = {
name: details.name,
members: details.members,
@ -258,8 +258,19 @@
return;
}
return message.handleDataMessage(data.message, ev.confirm, {
initialLoadComplete: initialLoadComplete
var type, id;
if (data.message.group) {
type = 'group';
id = data.message.group.id;
} else {
type = 'private';
id = data.source;
}
return ConversationController.getOrCreateAndWait(id, type).then(function() {
return message.handleDataMessage(data.message, ev.confirm, {
initialLoadComplete: initialLoadComplete
});
});
});
}
@ -286,8 +297,19 @@
return;
}
return message.handleDataMessage(data.message, ev.confirm, {
initialLoadComplete: initialLoadComplete
var type, id;
if (data.message.group) {
type = 'group';
id = data.message.group.id;
} else {
type = 'private';
id = data.destination;
}
return ConversationController.getOrCreateAndWait(id, type).then(function() {
return message.handleDataMessage(data.message, ev.confirm, {
initialLoadComplete: initialLoadComplete
});
});
});
}
@ -374,7 +396,7 @@
return message.saveErrors(error).then(function() {
var id = message.get('conversationId');
return ConversationController.findOrCreateById(id, 'private').then(function(conversation) {
return ConversationController.getOrCreateAndWait(id, 'private').then(function(conversation) {
conversation.set({
active_at: Date.now(),
unreadCount: conversation.get('unreadCount') + 1
@ -455,7 +477,7 @@
console.log('got verified sync for', number, state,
ev.viaContactSync ? 'via contact sync' : '');
return ConversationController.findOrCreateById(number, 'private').then(function(contact) {
return ConversationController.getOrCreateAndWait(number, 'private').then(function(contact) {
var options = {
viaSyncMessage: true,
viaContactSync: ev.viaContactSync,

View File

@ -85,37 +85,46 @@
get: function(id) {
return conversations.get(id);
},
add: function(attrs) {
return conversations.add(attrs, {merge: true});
createTemporary: function(attributes) {
return conversations.add(attributes);
},
create: function(attrs) {
if (typeof attrs !== 'object') {
throw new Error('ConversationController.create requires an object, got', attrs);
getOrCreate: function(id, type) {
var conversation = conversations.get(id);
if (conversation) {
return conversation;
}
var conversation = conversations.add(attrs, {merge: true});
return conversation;
},
findOrCreateById: function(id, type) {
var conversation = conversations.add({
conversation = conversations.add({
id: id,
type: type
});
return new Promise(function(resolve, reject) {
conversation.fetch().then(function() {
conversation.initialPromise = new Promise(function(resolve, reject) {
var deferred = conversation.save();
if (!deferred) {
console.log('Conversation save failed! ', id, type);
return reject(new Error('getOrCreate: Conversation save failed'));
}
deferred.then(function() {
resolve(conversation);
}, function() {
var deferred = conversation.save();
if (!deferred) {
console.log('Conversation save failed! ', id, type);
return reject(new Error('findOrCreateById: Conversation save failed'));
}
deferred.then(function() {
resolve(conversation);
}, reject);
});
}, reject);
});
return conversation;
},
getOrCreateAndWait: function(id, type) {
var conversation = this.getOrCreate(id, type);
if (conversation) {
return conversation.initialPromise.then(function() {
return conversation;
});
}
return Promise.reject(
new Error('getOrCreateAndWait: did not get conversation')
);
},
getAllGroupsInvolvingId: function(id) {
var groups = new Whisper.GroupCollection();
@ -126,7 +135,7 @@
});
},
updateInbox: function() {
return conversations.fetchActive();
return conversations.fetch();
}
};
})();

View File

@ -13,13 +13,13 @@
}
signalProtocolStore.on('keychange', function(id) {
var conversation = ConversationController.add({id: id});
conversation.fetch().then(function() {
ConversationController.getOrCreateAndWait(id, 'private').then(function(conversation) {
conversation.addKeyChange(id);
});
ConversationController.getAllGroupsInvolvingId(id).then(function(groups) {
_.forEach(groups, function(group) {
group.addKeyChange(id);
ConversationController.getAllGroupsInvolvingId(id).then(function(groups) {
_.forEach(groups, function(group) {
group.addKeyChange(id);
});
});
});
});

View File

@ -59,6 +59,10 @@
this.ourNumber = textsecure.storage.user.getNumber();
this.verifiedEnum = textsecure.storage.protocol.VerifiedStatus;
// This may be overridden by ConversationController.getOrCreate, and signify
// our first save to the database. Or first fetch from the database.
this.initialPromise = Promise.resolve();
this.contactCollection = new Backbone.Collection();
this.messageCollection = new Whisper.MessageCollection([], {
conversation: this
@ -87,7 +91,7 @@
if (this.isPrivate()) {
return Promise.all([
this.safeGetVerified(),
this.safeFetch()
this.initialPromise,
]).then(function(results) {
var trust = results[0];
// we don't return here because we don't need to wait for this to finish
@ -103,12 +107,6 @@
}.bind(this)).then(this.onMemberVerifiedChange.bind(this));
}
},
safeFetch: function() {
// new Promise necessary because a fetch will fail if convo not in db yet
return new Promise(function(resolve) {
this.fetch().always(resolve);
}.bind(this));
},
setVerifiedDefault: function(options) {
var DEFAULT = this.verifiedEnum.DEFAULT;
return this.queueJob(function() {
@ -801,28 +799,23 @@
return _.contains(this.get('members'), number);
},
fetchContacts: function(options) {
return new Promise(function(resolve) {
if (this.isPrivate()) {
this.contactCollection.reset([this]);
resolve();
} else {
var promises = [];
var members = this.get('members') || [];
if (this.isPrivate()) {
this.contactCollection.reset([this]);
return Promise.resolve();
} else {
var members = this.get('members') || [];
var promises = members.map(function(number) {
return ConversationController.getOrCreateAndWait(number, 'private');
});
this.contactCollection.reset(
members.map(function(number) {
var c = ConversationController.create({
id : number,
type : 'private'
});
this.listenTo(c, 'change:verified', this.onMemberVerifiedChange);
promises.push(c.safeFetch());
return c;
}.bind(this))
);
resolve(Promise.all(promises));
}
}.bind(this));
return Promise.all(promises).then(function(contacts) {
_.forEach(contacts, function(contact) {
this.listenTo(contact, 'change:verified', this.onMemberVerifiedChange);
}.bind(this));
this.contactCollection.reset(contacts);
}.bind(this));
}
},
destroyMessages: function() {
@ -956,14 +949,11 @@
}
window.drawAttention();
var sender = ConversationController.create({
id: message.get('source'), type: 'private'
});
var conversationId = this.id;
return new Promise(function(resolve, reject) {
sender.fetch().then(function() {
sender.getNotificationIcon().then(function(iconUrl) {
ConversationController.getOrCreateAndWait(message.get('source'), 'private')
.then(function(sender) {
return sender.getNotificationIcon().then(function(iconUrl) {
console.log('adding notification');
Whisper.Notifications.add({
title : sender.getTitle(),
@ -973,11 +963,8 @@
conversationId : conversationId,
messageId : message.id
});
return resolve();
}, reject);
}, reject);
});
});
});
},
hashCode: function() {
if (this.hash === undefined) {
@ -1054,20 +1041,6 @@
}
}).always(resolve);
}.bind(this));
},
fetchActive: function() {
// Ensures all active conversations are included in this collection,
// and updates their attributes, but removes nothing.
return this.fetch({
index: {
name: 'inbox', // 'inbox' index on active_at
order: 'desc' // ORDER timestamp DESC
// TODO pagination/infinite scroll
// limit: 10, offset: page*10,
},
remove: false
});
}
});

View File

@ -147,19 +147,12 @@
return this.imageUrl;
},
getConversation: function() {
return ConversationController.add({
id: this.get('conversationId')
});
return ConversationController.get(this.get('conversationId'));
},
getExpirationTimerUpdateSource: function() {
if (this.isExpirationTimerUpdate()) {
var conversationId = this.get('expirationTimerUpdate').source;
var c = ConversationController.get(conversationId);
if (!c) {
c = ConversationController.create({id: conversationId, type: 'private'});
c.fetch();
}
return c;
return ConversationController.getOrCreate(conversationId, 'private');
}
},
getContact: function() {
@ -167,21 +160,12 @@
if (!this.isIncoming()) {
conversationId = textsecure.storage.user.getNumber();
}
var c = ConversationController.get(conversationId);
if (!c) {
c = ConversationController.create({id: conversationId, type: 'private'});
c.fetch();
}
return c;
return ConversationController.getOrCreate(conversationId, 'private');
},
getModelForKeyChange: function() {
var id = this.get('key_changed');
if (!this.modelForKeyChange) {
var c = ConversationController.get(id);
if (!c) {
c = ConversationController.create({ id: id, type: 'private' });
c.fetch();
}
var c = ConversationController.getOrCreate(id, 'private');
this.modelForKeyChange = c;
}
return this.modelForKeyChange;
@ -189,11 +173,7 @@
getModelForVerifiedChange: function() {
var id = this.get('verifiedChanged');
if (!this.modelForVerifiedChange) {
var c = ConversationController.get(id);
if (!c) {
c = ConversationController.create({ id: id, type: 'private' });
c.fetch();
}
var c = ConversationController.getOrCreate(id, 'private');
this.modelForVerifiedChange = c;
}
return this.modelForVerifiedChange;
@ -347,9 +327,11 @@
options = options || {};
_.defaults(options, {initialLoadComplete: true});
// This function can be called from the background script on an
// incoming message or from the frontend after the user accepts an
// identity key change.
// This function is called from the background script in a few scenarios:
// 1. on an incoming message
// 2. on a sent message sync'd from another device
// 3. in rare cases, an incoming message can be retried, though it will
// still through one of the previous two codepaths.
var message = this;
var source = message.get('source');
var type = message.get('type');
@ -360,176 +342,174 @@
}
console.log('queuing handleDataMessage', message.idForLogging());
var conversation = ConversationController.create({id: conversationId});
var conversation = ConversationController.get(conversationId);
return conversation.queueJob(function() {
return new Promise(function(resolve) {
conversation.fetch().always(function() {
console.log('starting handleDataMessage', message.idForLogging());
console.log('starting handleDataMessage', message.idForLogging());
var now = new Date().getTime();
var attributes = { type: 'private' };
if (dataMessage.group) {
var group_update = null;
attributes = {
type: 'group',
groupId: dataMessage.group.id,
};
if (dataMessage.group.type === textsecure.protobuf.GroupContext.Type.UPDATE) {
attributes = {
type : 'group',
groupId : dataMessage.group.id,
name : dataMessage.group.name,
avatar : dataMessage.group.avatar,
members : _.union(dataMessage.group.members, conversation.get('members')),
};
group_update = conversation.changedAttributes(_.pick(dataMessage.group, 'name', 'avatar')) || {};
var difference = _.difference(attributes.members, conversation.get('members'));
if (difference.length > 0) {
group_update.joined = difference;
}
if (conversation.get('left')) {
console.log('re-added to a left group');
attributes.left = false;
}
}
else if (dataMessage.group.type === textsecure.protobuf.GroupContext.Type.QUIT) {
if (source == textsecure.storage.user.getNumber()) {
attributes.left = true;
group_update = { left: "You" };
} else {
group_update = { left: source };
}
attributes.members = _.without(conversation.get('members'), source);
}
if (group_update !== null) {
message.set({group_update: group_update});
}
}
message.set({
body : dataMessage.body,
conversationId : conversation.id,
attachments : dataMessage.attachments,
decrypted_at : now,
flags : dataMessage.flags,
errors : []
});
if (type === 'outgoing') {
var receipts = Whisper.DeliveryReceipts.forMessage(conversation, message);
receipts.forEach(function(receipt) {
message.set({
delivered: (message.get('delivered') || 0) + 1
});
});
}
attributes.active_at = now;
conversation.set(attributes);
if (message.isExpirationTimerUpdate()) {
message.set({
expirationTimerUpdate: {
source : source,
expireTimer : dataMessage.expireTimer
}
});
conversation.set({expireTimer: dataMessage.expireTimer});
} else if (dataMessage.expireTimer) {
message.set({expireTimer: dataMessage.expireTimer});
}
if (!message.isEndSession() && !message.isGroupUpdate()) {
if (dataMessage.expireTimer) {
if (dataMessage.expireTimer !== conversation.get('expireTimer')) {
conversation.updateExpirationTimer(
dataMessage.expireTimer, source,
message.get('received_at'));
}
} else if (conversation.get('expireTimer')) {
conversation.updateExpirationTimer(null, source,
message.get('received_at'));
}
}
if (type === 'incoming') {
var readReceipt = Whisper.ReadReceipts.forMessage(message);
if (readReceipt) {
if (message.get('expireTimer') && !message.get('expirationStartTimestamp')) {
message.set('expirationStartTimestamp', readReceipt.get('read_at'));
}
}
if (readReceipt || message.isExpirationTimerUpdate()) {
message.unset('unread');
// This is primarily to allow the conversation to mark all older messages as
// read, as is done when we receive a read receipt for a message we already
// know about.
Whisper.ReadReceipts.notifyConversation(message);
} else {
conversation.set('unreadCount', conversation.get('unreadCount') + 1);
}
}
var conversation_timestamp = conversation.get('timestamp');
if (!conversation_timestamp || message.get('sent_at') > conversation_timestamp) {
conversation.set({
lastMessage : message.getNotificationText(),
timestamp: message.get('sent_at')
});
}
console.log('beginning saves in handleDataMessage', message.idForLogging());
var handleError = function(error) {
error = error && error.stack ? error.stack : error;
console.log('handleDataMessage', message.idForLogging(), 'error:', error);
return resolve();
var now = new Date().getTime();
var attributes = { type: 'private' };
if (dataMessage.group) {
var group_update = null;
attributes = {
type: 'group',
groupId: dataMessage.group.id,
};
if (dataMessage.group.type === textsecure.protobuf.GroupContext.Type.UPDATE) {
attributes = {
type : 'group',
groupId : dataMessage.group.id,
name : dataMessage.group.name,
avatar : dataMessage.group.avatar,
members : _.union(dataMessage.group.members, conversation.get('members')),
};
group_update = conversation.changedAttributes(_.pick(dataMessage.group, 'name', 'avatar')) || {};
var difference = _.difference(attributes.members, conversation.get('members'));
if (difference.length > 0) {
group_update.joined = difference;
}
if (conversation.get('left')) {
console.log('re-added to a left group');
attributes.left = false;
}
}
else if (dataMessage.group.type === textsecure.protobuf.GroupContext.Type.QUIT) {
if (source == textsecure.storage.user.getNumber()) {
attributes.left = true;
group_update = { left: "You" };
} else {
group_update = { left: source };
}
attributes.members = _.without(conversation.get('members'), source);
}
message.save().then(function() {
conversation.save().then(function() {
if (group_update !== null) {
message.set({group_update: group_update});
}
}
message.set({
body : dataMessage.body,
conversationId : conversation.id,
attachments : dataMessage.attachments,
decrypted_at : now,
flags : dataMessage.flags,
errors : []
});
if (type === 'outgoing') {
var receipts = Whisper.DeliveryReceipts.forMessage(conversation, message);
receipts.forEach(function(receipt) {
message.set({
delivered: (message.get('delivered') || 0) + 1
});
});
}
attributes.active_at = now;
conversation.set(attributes);
if (message.isExpirationTimerUpdate()) {
message.set({
expirationTimerUpdate: {
source : source,
expireTimer : dataMessage.expireTimer
}
});
conversation.set({expireTimer: dataMessage.expireTimer});
} else if (dataMessage.expireTimer) {
message.set({expireTimer: dataMessage.expireTimer});
}
if (!message.isEndSession() && !message.isGroupUpdate()) {
if (dataMessage.expireTimer) {
if (dataMessage.expireTimer !== conversation.get('expireTimer')) {
conversation.updateExpirationTimer(
dataMessage.expireTimer, source,
message.get('received_at'));
}
} else if (conversation.get('expireTimer')) {
conversation.updateExpirationTimer(null, source,
message.get('received_at'));
}
}
if (type === 'incoming') {
var readReceipt = Whisper.ReadReceipts.forMessage(message);
if (readReceipt) {
if (message.get('expireTimer') && !message.get('expirationStartTimestamp')) {
message.set('expirationStartTimestamp', readReceipt.get('read_at'));
}
}
if (readReceipt || message.isExpirationTimerUpdate()) {
message.unset('unread');
// This is primarily to allow the conversation to mark all older messages as
// read, as is done when we receive a read receipt for a message we already
// know about.
Whisper.ReadReceipts.notifyConversation(message);
} else {
conversation.set('unreadCount', conversation.get('unreadCount') + 1);
}
}
var conversation_timestamp = conversation.get('timestamp');
if (!conversation_timestamp || message.get('sent_at') > conversation_timestamp) {
conversation.set({
lastMessage : message.getNotificationText(),
timestamp: message.get('sent_at')
});
}
console.log('beginning saves in handleDataMessage', message.idForLogging());
var handleError = function(error) {
error = error && error.stack ? error.stack : error;
console.log('handleDataMessage', message.idForLogging(), 'error:', error);
return resolve();
};
message.save().then(function() {
conversation.save().then(function() {
try {
conversation.trigger('newmessage', message);
}
catch (e) {
return handleError(e);
}
// We fetch() here because, between the message.save() above and the previous
// line's trigger() call, we might have marked all messages unread in the
// database. This message might already be read!
var previousUnread = message.get('unread');
message.fetch().then(function() {
try {
conversation.trigger('newmessage', message);
if (previousUnread !== message.get('unread')) {
console.log('Caught race condition on new message read state! ' +
'Manually starting timers.');
// We call markRead() even though the message is already marked read
// because we need to start expiration timers, etc.
message.markRead();
}
if (message.get('unread') && options.initialLoadComplete) {
conversation.notify(message);
}
console.log('done with handleDataMessage', message.idForLogging());
confirm();
return resolve();
}
catch (e) {
return handleError(e);
handleError(e);
}
// We fetch() here because, between the message.save() above and the previous
// line's trigger() call, we might have marked all messages unread in the
// database. This message might already be read!
var previousUnread = message.get('unread');
message.fetch().then(function() {
try {
if (previousUnread !== message.get('unread')) {
console.log('Caught race condition on new message read state! ' +
'Manually starting timers.');
// We call markRead() even though the message is already marked read
// because we need to start expiration timers, etc.
message.markRead();
}
if (message.get('unread') && options.initialLoadComplete) {
conversation.notify(message);
}
}, function(error) {
try {
console.log('handleDataMessage: Message', message.idForLogging(), 'was deleted');
console.log('done with handleDataMessage', message.idForLogging());
confirm();
return resolve();
}
catch (e) {
handleError(e);
}
}, function(error) {
try {
console.log('handleDataMessage: Message', message.idForLogging(), 'was deleted');
confirm();
return resolve();
}
catch (e) {
handleError(e);
}
});
}, handleError);
confirm();
return resolve();
}
catch (e) {
handleError(e);
}
});
}, handleError);
});
}, handleError);
});
});
},

View File

@ -25,9 +25,7 @@
openInbox();
return;
}
var conversation = ConversationController.create({
id: last.get('conversationId')
});
var conversation = ConversationController.get(last.get('conversationId'));
openConversation(conversation);
this.clear();
},

View File

@ -88,7 +88,7 @@
// Creates a view to display a new contact
this.new_contact_view = new Whisper.NewContactView({
el: this.$new_contact,
model: ConversationController.create({
model: ConversationController.createTemporary({
type: 'private'
})
}).render();
@ -97,7 +97,7 @@
createConversation: function() {
var conversation = this.new_contact_view.model;
if (this.new_contact_view.model.isValid()) {
ConversationController.findOrCreateById(
ConversationController.getOrCreateAndWait(
this.new_contact_view.model.id,
'private'
).then(function(conversation) {

View File

@ -261,9 +261,11 @@
},
openConversation: function(e, conversation) {
this.searchView.hideHints();
conversation = ConversationController.create(conversation);
this.conversation_stack.open(conversation);
this.focusConversation();
if (conversation) {
conversation = ConversationController.get(conversation.id);
this.conversation_stack.open(conversation);
this.focusConversation();
}
},
toggleMenu: function() {
this.$('.global-menu .menu-list').toggle();

View File

@ -26,7 +26,7 @@ describe('KeyChangeListener', function() {
describe('When we have a conversation with this contact', function() {
var convo = new Whisper.Conversation({ id: phoneNumberWithKeyChange, type: 'private'});
before(function() {
ConversationController.add(convo);
ConversationController.createTemporary(convo);
return convo.save();
});
@ -51,7 +51,7 @@ describe('KeyChangeListener', function() {
describe('When we have a group with this contact', function() {
var convo = new Whisper.Conversation({ id: 'groupId', type: 'group', members: [phoneNumberWithKeyChange] });
before(function() {
ConversationController.add(convo);
ConversationController.createTemporary(convo);
return convo.save();
});
after(function() {

View File

@ -1,11 +1,9 @@
describe('MessageView', function() {
var conversations = new Whisper.ConversationCollection();
before(function(done) {
conversations.fetch().then(done);
storage.put('number_id', '+18088888888.1');
before(function() {
return storage.put('number_id', '+18088888888.1');
});
var convo = conversations.add({id: 'foo'});
var convo = ConversationController.createTemporary({id: 'foo'});
var message = convo.messageCollection.add({
conversationId: convo.id,
body: 'hello world',