Updated the JobRunner to have multiple job queues (needs more testing)

Added a backoff to the Poller retry
Updated the "blocking" behaviour of the JobRunner
Tweaked the Job dependency handling to better handle orphaned dependencies
Fixed an issue where the Conversation screen wasn't observing database changes
This commit is contained in:
Morgan Pretty 2022-05-28 17:25:38 +10:00
parent 45d0faee6a
commit 3514ed4f50
27 changed files with 930 additions and 912 deletions

View File

@ -430,7 +430,6 @@
C38EF2B3255B6D9C007E1867 /* UIViewController+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2B1255B6D9C007E1867 /* UIViewController+Utilities.swift */; };
C38EF2B4255B6D9C007E1867 /* UIView+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2B2255B6D9C007E1867 /* UIView+Utilities.swift */; };
C38EF30C255B6DBF007E1867 /* OWSScreenLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2E2255B6DB9007E1867 /* OWSScreenLock.swift */; };
C38EF317255B6DBF007E1867 /* DisplayableText.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2ED255B6DBB007E1867 /* DisplayableText.swift */; };
C38EF31A255B6DBF007E1867 /* OWSAnyTouchGestureRecognizer.m in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2F0255B6DBB007E1867 /* OWSAnyTouchGestureRecognizer.m */; };
C38EF31C255B6DBF007E1867 /* Searcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2F2255B6DBC007E1867 /* Searcher.swift */; };
C38EF31D255B6DBF007E1867 /* UIImage+OWS.swift in Sources */ = {isa = PBXBuildFile; fileRef = C38EF2F3255B6DBC007E1867 /* UIImage+OWS.swift */; };
@ -498,7 +497,6 @@
C3A01E05261D24C400290BEB /* public-loki-foundation.der in Resources */ = {isa = PBXBuildFile; fileRef = C3A01E02261D24C400290BEB /* public-loki-foundation.der */; };
C3A01E06261D24C400290BEB /* storage-seed-1.der in Resources */ = {isa = PBXBuildFile; fileRef = C3A01E03261D24C400290BEB /* storage-seed-1.der */; };
C3A01E07261D24C400290BEB /* storage-seed-3.der in Resources */ = {isa = PBXBuildFile; fileRef = C3A01E04261D24C400290BEB /* storage-seed-3.der */; };
C3A3A08F256E1728004D228D /* FullTextSearchFinder.swift in Sources */ = {isa = PBXBuildFile; fileRef = C33FDB7F255A581100E217F9 /* FullTextSearchFinder.swift */; };
C3A3A0FE256E1A3C004D228D /* TSDatabaseSecondaryIndexes.m in Sources */ = {isa = PBXBuildFile; fileRef = C33FDB20255A580900E217F9 /* TSDatabaseSecondaryIndexes.m */; };
C3A3A12B256E1AD5004D228D /* TSDatabaseSecondaryIndexes.h in Headers */ = {isa = PBXBuildFile; fileRef = C33FDB25255A580900E217F9 /* TSDatabaseSecondaryIndexes.h */; settings = {ATTRIBUTES = (Public, ); }; };
C3A3A171256E1D25004D228D /* SSKReachabilityManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3A3A170256E1D25004D228D /* SSKReachabilityManager.swift */; };
@ -681,6 +679,7 @@
FD848B8B283DC509000E298B /* PagedDatabaseObserver.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD848B8A283DC509000E298B /* PagedDatabaseObserver.swift */; };
FD848B8D283E0B26000E298B /* MessageInputTypes.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD848B8C283E0B26000E298B /* MessageInputTypes.swift */; };
FD848B8F283EF2A8000E298B /* UIScrollView+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD848B8E283EF2A8000E298B /* UIScrollView+Utilities.swift */; };
FD848B9328420164000E298B /* UnicodeScalar+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD848B9228420164000E298B /* UnicodeScalar+Utilities.swift */; };
FD859F0027C4691300510D0C /* MockDataGenerator.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD859EFF27C4691300510D0C /* MockDataGenerator.swift */; };
FD88BAD927A7439C00BBC442 /* MessageRequestsCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD88BAD827A7439C00BBC442 /* MessageRequestsCell.swift */; };
FD88BADB27A750F200BBC442 /* MessageRequestsMigration.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD88BADA27A750F200BBC442 /* MessageRequestsMigration.swift */; };
@ -1287,7 +1286,6 @@
C33FDB75255A581000E217F9 /* AppReadiness.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = AppReadiness.m; sourceTree = "<group>"; };
C33FDB77255A581000E217F9 /* NSUserDefaults+OWS.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = "NSUserDefaults+OWS.m"; sourceTree = "<group>"; };
C33FDB78255A581000E217F9 /* OWSOperation.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = OWSOperation.m; sourceTree = "<group>"; };
C33FDB7F255A581100E217F9 /* FullTextSearchFinder.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = FullTextSearchFinder.swift; sourceTree = "<group>"; };
C33FDB80255A581100E217F9 /* Notification+Loki.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Notification+Loki.swift"; sourceTree = "<group>"; };
C33FDB81255A581100E217F9 /* UIImage+OWS.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = "UIImage+OWS.m"; sourceTree = "<group>"; };
C33FDB85255A581100E217F9 /* AppContext.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = AppContext.m; sourceTree = "<group>"; };
@ -1379,7 +1377,6 @@
C38EF2B2255B6D9C007E1867 /* UIView+Utilities.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = "UIView+Utilities.swift"; path = "SignalUtilitiesKit/Utilities/UIView+Utilities.swift"; sourceTree = SOURCE_ROOT; };
C38EF2E2255B6DB9007E1867 /* OWSScreenLock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = OWSScreenLock.swift; path = "SignalUtilitiesKit/Screen Lock/OWSScreenLock.swift"; sourceTree = SOURCE_ROOT; };
C38EF2EC255B6DBA007E1867 /* ProximityMonitoringManager.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = ProximityMonitoringManager.swift; path = SessionMessagingKit/Utilities/ProximityMonitoringManager.swift; sourceTree = SOURCE_ROOT; };
C38EF2ED255B6DBB007E1867 /* DisplayableText.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = DisplayableText.swift; path = SignalUtilitiesKit/Utilities/DisplayableText.swift; sourceTree = SOURCE_ROOT; };
C38EF2EF255B6DBB007E1867 /* Weak.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Weak.swift; path = SessionUtilitiesKit/General/Weak.swift; sourceTree = SOURCE_ROOT; };
C38EF2F0255B6DBB007E1867 /* OWSAnyTouchGestureRecognizer.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OWSAnyTouchGestureRecognizer.m; path = SignalUtilitiesKit/Utilities/OWSAnyTouchGestureRecognizer.m; sourceTree = SOURCE_ROOT; };
C38EF2F1255B6DBB007E1867 /* OWSPreferences.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OWSPreferences.h; path = SessionMessagingKit/Utilities/OWSPreferences.h; sourceTree = SOURCE_ROOT; };
@ -1651,6 +1648,7 @@
FD848B8A283DC509000E298B /* PagedDatabaseObserver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PagedDatabaseObserver.swift; sourceTree = "<group>"; };
FD848B8C283E0B26000E298B /* MessageInputTypes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageInputTypes.swift; sourceTree = "<group>"; };
FD848B8E283EF2A8000E298B /* UIScrollView+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UIScrollView+Utilities.swift"; sourceTree = "<group>"; };
FD848B9228420164000E298B /* UnicodeScalar+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UnicodeScalar+Utilities.swift"; sourceTree = "<group>"; };
FD859EFF27C4691300510D0C /* MockDataGenerator.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockDataGenerator.swift; sourceTree = "<group>"; };
FD88BAD827A7439C00BBC442 /* MessageRequestsCell.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageRequestsCell.swift; sourceTree = "<group>"; };
FD88BADA27A750F200BBC442 /* MessageRequestsMigration.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageRequestsMigration.swift; sourceTree = "<group>"; };
@ -2227,6 +2225,7 @@
C33FDB51255A580D00E217F9 /* NSUserDefaults+OWS.h */,
C33FDB77255A581000E217F9 /* NSUserDefaults+OWS.m */,
C33FDB14255A580800E217F9 /* OWSMath.h */,
FD705A91278D051200F16121 /* ReusableView.swift */,
FD17D7AF27F4225C00122BE0 /* Set+Utilities.swift */,
C33FDB6B255A580F00E217F9 /* SNUserDefaults.swift */,
C33FDB3F255A580C00E217F9 /* String+SSK.swift */,
@ -2234,11 +2233,11 @@
FD705A8D278CE29800F16121 /* String+Utilities.swift */,
C38EF237255B6D65007E1867 /* UIDevice+featureSupport.swift */,
C35D0DB425AE5F1200B6BF49 /* UIEdgeInsets.swift */,
FD705A91278D051200F16121 /* ReusableView.swift */,
FDED2E3B282E1B5D00B2CD2A /* UICollectionView+ReusableView.swift */,
FD705A93278D052B00F16121 /* UITableView+ReusableView.swift */,
C38EF23D255B6D66007E1867 /* UIView+OWS.h */,
C38EF23E255B6D66007E1867 /* UIView+OWS.m */,
FD848B9228420164000E298B /* UnicodeScalar+Utilities.swift */,
C38EF2EF255B6DBB007E1867 /* Weak.swift */,
);
path = General;
@ -2879,7 +2878,6 @@
C37F53E8255BA9BB002AEA92 /* Environment.h */,
C37F5402255BA9ED002AEA92 /* Environment.m */,
C3BBE0C62554F1570050F1E3 /* FixedWidthInteger+BigEndian.swift */,
C33FDB7F255A581100E217F9 /* FullTextSearchFinder.swift */,
C3A71D0A2558989C0043A11F /* MessageWrapper.swift */,
C3A71D4E25589FF30043A11F /* NSData+messagePadding.h */,
C3A71D4825589FF20043A11F /* NSData+messagePadding.m */,
@ -3041,7 +3039,6 @@
C38EF302255B6DBE007E1867 /* OWSAnyTouchGestureRecognizer.h */,
C38EF2F0255B6DBB007E1867 /* OWSAnyTouchGestureRecognizer.m */,
FDCDB8DD2810F73B00352A0C /* Differentiable+Utilities.swift */,
C38EF2ED255B6DBB007E1867 /* DisplayableText.swift */,
C38EF3DC255B6DF1007E1867 /* DirectionalPanGestureRecognizer.swift */,
C38EF240255B6D67007E1867 /* UIView+OWS.swift */,
C38EF236255B6D65007E1867 /* UIViewController+OWS.h */,
@ -4305,7 +4302,6 @@
files = (
C38EF3FD255B6DF7007E1867 /* OWSTextView.m in Sources */,
C38EF3C6255B6DE7007E1867 /* ImageEditorModel.swift in Sources */,
C38EF317255B6DBF007E1867 /* DisplayableText.swift in Sources */,
C38EF3C3255B6DE7007E1867 /* ImageEditorTextItem.swift in Sources */,
FD28A4F227E990E800FF65E7 /* BlockingManagerRemovalMigration.swift in Sources */,
C33FDC7D255A582000E217F9 /* OWSDispatch.m in Sources */,
@ -4497,6 +4493,7 @@
FD17D7C727F5207C00122BE0 /* DatabaseMigrator+Utilities.swift in Sources */,
C3D9E3C925676AF30040E4F3 /* TSYapDatabaseObject.m in Sources */,
C352A3A62557B60D00338F3E /* TSRequest.m in Sources */,
FD848B9328420164000E298B /* UnicodeScalar+Utilities.swift in Sources */,
FD09796B27F6C67500936362 /* Failable.swift in Sources */,
FD705A92278D051200F16121 /* ReusableView.swift in Sources */,
FD17D7BA27F51F2100122BE0 /* TargetMigrations.swift in Sources */,
@ -4574,7 +4571,6 @@
C32C5A76256DBBCF003C73A2 /* SignalAttachment.swift in Sources */,
FDA8EB00280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift in Sources */,
FD09798927FD1C5A00936362 /* OpenGroup.swift in Sources */,
C3A3A08F256E1728004D228D /* FullTextSearchFinder.swift in Sources */,
FDF0B7472804F0CE004C14C5 /* DisappearingMessagesJob.swift in Sources */,
B8856D1A256F114D001CE70E /* ProximityMonitoringManager.swift in Sources */,
C3D9E52725677DF20040E4F3 /* ThumbnailService.swift in Sources */,

View File

@ -712,7 +712,6 @@ extension ConversationVC:
let locationInAlbumView: CGPoint = cell.convert(locationInCell, to: albumView)
guard let mediaView = albumView.mediaView(forLocation: locationInAlbumView) else { return }
switch mediaView.attachment.state {
case .pendingDownload, .downloading, .uploading:
// TODO: Tapped a failed incoming attachment
@ -779,14 +778,26 @@ extension ConversationVC:
navigationController?.present(shareVC, animated: true, completion: nil)
case .textOnlyMessage:
if let reply = viewItem.quotedReply {
// Scroll to the source of the reply
guard let indexPath = viewModel.ensureLoadWindowContainsQuotedReply(reply) else { return }
messagesTableView.scrollToRow(at: indexPath, at: UITableView.ScrollPosition.middle, animated: true)
} else if let message = viewItem.interaction as? TSIncomingMessage, let name = message.openGroupInvitationName,
let url = message.openGroupInvitationURL {
joinOpenGroup(name: name, url: url)
if let quote: Quote = cellViewModel.quote {
// Scroll to the original quoted message
let maybeOriginalInteractionId: Int64? = GRDBStorage.shared.read { db in
try quote.originalInteraction
.select(.id)
.asRequest(of: Int64.self)
.fetchOne(db)
}
guard let interactionId: Int64 = maybeOriginalInteractionId else { return }
self.scrollToInteractionIfNeeded(with: interactionId, highlight: true)
}
else if let linkPreview: LinkPreview = cellViewModel.linkPreview {
switch linkPreview.variant {
case .standard: openUrl(linkPreview.url)
case .openGroupInvitation: joinOpenGroup(name: linkPreview.title, url: linkPreview.url)
}
}
default: break
}
}

View File

@ -444,8 +444,17 @@ final class ConversationVC: BaseVC, OWSConversationSettingsViewDelegate, Convers
override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
// Perform the initial scroll and highlight if needed (if we started with a focused message
// this will have already been called to instantly snap to the destination but we don't
// trigger the highlight until after the screen has appeared to make it more obvious)
performInitialScrollIfNeeded()
// Flag that the initial layout has been completed (the flag blocks and unblocks a number
// of different behaviours)
//
// Note: This MUST be set after the above 'performInitialScrollIfNeeded' is called as it
// won't run if this flag is set to true
didFinishInitialLayout = true
viewModel.markAllAsRead()
if delayFirstResponder || isShowingSearchUI {
delayFirstResponder = false
@ -457,6 +466,8 @@ final class ConversationVC: BaseVC, OWSConversationSettingsViewDelegate, Convers
)?.becomeFirstResponder()
}
}
viewModel.markAllAsRead()
}
override func viewWillDisappear(_ animated: Bool) {
@ -1252,7 +1263,16 @@ final class ConversationVC: BaseVC, OWSConversationSettingsViewDelegate, Convers
// If we aren't animating or aren't highlighting then everything can be run immediately
guard isAnimated && highlight else {
self.tableView.scrollToRow(at: targetIndexPath, at: position, animated: isAnimated)
self.tableView.scrollToRow(
at: targetIndexPath,
at: position,
animated: (self.didFinishInitialLayout && isAnimated)
)
// Don't clear these values if we have't done the initial layout (we will call this
// method a second time to trigger the highlight after the screen appears)
guard self.didFinishInitialLayout else { return }
self.focusedInteractionId = nil
self.shouldHighlightNextScrollToInteraction = false
@ -1286,7 +1306,7 @@ final class ConversationVC: BaseVC, OWSConversationSettingsViewDelegate, Convers
.visibleCells
.first(where: { ($0 as? VisibleMessageCell)?.viewModel?.id == interactionId })
.asType(VisibleMessageCell.self)?
.highlight(interactionId: interactionId)
.highlight()
}
}
}

View File

@ -51,48 +51,65 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
self.focusedInteractionId = focusedInteractionId
self.pagedDataObserver = nil
DispatchQueue.global(qos: .default).async { [weak self] in
self?.pagedDataObserver = PagedDatabaseObserver(
pagedTable: Interaction.self,
pageSize: ConversationViewModel.pageSize,
idColumn: .id,
initialFocusedId: focusedInteractionId,
observedChanges: [
PagedData.ObservedChanges(
table: Interaction.self,
columns: Interaction.Columns
.allCases
.filter { $0 != .wasRead }
)
],
filterSQL: MessageCell.ViewModel.filterSQL(threadId: threadId),
orderSQL: MessageCell.ViewModel.orderSQL,
dataQuery: MessageCell.ViewModel.baseQuery(
orderSQL: MessageCell.ViewModel.orderSQL,
baseFilterSQL: MessageCell.ViewModel.filterSQL(threadId: threadId)
// Note: Since this references self we need to finish initializing before setting it, we
// also want to skip the initial query and trigger it async so that the push animation
// doesn't stutter (it should load basically immediately but without this there is a
// distinct stutter)
self.pagedDataObserver = PagedDatabaseObserver(
pagedTable: Interaction.self,
pageSize: ConversationViewModel.pageSize,
idColumn: .id,
observedChanges: [
PagedData.ObservedChanges(
table: Interaction.self,
columns: Interaction.Columns
.allCases
.filter { $0 != .wasRead }
),
associatedRecords: [
AssociatedRecord<MessageCell.AttachmentInteractionInfo, MessageCell.ViewModel>(
trackedAgainst: Attachment.self,
observedChanges: [
PagedData.ObservedChanges(
table: Attachment.self,
columns: [.state]
)
],
dataQuery: MessageCell.AttachmentInteractionInfo.baseQuery,
joinToPagedType: MessageCell.AttachmentInteractionInfo.joinToViewModelQuerySQL,
associateData: MessageCell.AttachmentInteractionInfo.createAssociateDataClosure()
)
],
onChangeUnsorted: { [weak self] updatedData, updatedPageInfo in
guard let updatedInteractionData: [SectionModel] = self?.process(data: updatedData, for: updatedPageInfo) else {
return
}
self?.onInteractionChange?(updatedInteractionData)
PagedData.ObservedChanges(
table: ThreadTypingIndicator.self,
columns: ThreadTypingIndicator.Columns.allCases
)
],
filterSQL: MessageCell.ViewModel.filterSQL(threadId: threadId),
orderSQL: MessageCell.ViewModel.orderSQL,
dataQuery: MessageCell.ViewModel.baseQuery(
orderSQL: MessageCell.ViewModel.orderSQL,
baseFilterSQL: MessageCell.ViewModel.filterSQL(threadId: threadId)
),
associatedRecords: [
AssociatedRecord<MessageCell.AttachmentInteractionInfo, MessageCell.ViewModel>(
trackedAgainst: Attachment.self,
observedChanges: [
PagedData.ObservedChanges(
table: Attachment.self,
columns: [.state]
)
],
dataQuery: MessageCell.AttachmentInteractionInfo.baseQuery,
joinToPagedType: MessageCell.AttachmentInteractionInfo.joinToViewModelQuerySQL,
associateData: MessageCell.AttachmentInteractionInfo.createAssociateDataClosure()
)
],
onChangeUnsorted: { [weak self] updatedData, updatedPageInfo in
guard let updatedInteractionData: [SectionModel] = self?.process(data: updatedData, for: updatedPageInfo) else {
return
}
)
self?.onInteractionChange?(updatedInteractionData)
}
)
// Run the initial query on a backgorund thread so we don't block the push transition
DispatchQueue.global(qos: .default).async { [weak self] in
// If we don't have a `initialFocusedId` then default to `.pageBefore` (it'll query
// from a `0` offset)
guard let initialFocusedId: Int64 = focusedInteractionId else {
self?.pagedDataObserver?.load(.pageBefore)
return
}
self?.pagedDataObserver?.load(.initialPageAround(id: initialFocusedId))
}
}

View File

@ -105,6 +105,12 @@ extension MessageCell {
/// This value will be used to populate the date header, if it's null then the header will be hidden
let dateForUI: Date?
/// This value specifies whether the body contains only emoji characters
let containsOnlyEmoji: Bool?
/// This value specifies the number of emoji characters the body contains
let glyphCount: Int?
/// This value indicates the variant of the previous ViewModel item, if it's null then there is no previous item
let previousVariant: Interaction.Variant?
@ -149,6 +155,8 @@ extension MessageCell {
senderName: self.senderName,
shouldShowProfile: self.shouldShowProfile,
dateForUI: self.dateForUI,
containsOnlyEmoji: self.containsOnlyEmoji,
glyphCount: self.glyphCount,
previousVariant: self.previousVariant,
positionInCluster: self.positionInCluster,
isOnlyMessageInCluster: self.isOnlyMessageInCluster,
@ -339,6 +347,8 @@ extension MessageCell {
Date(timeIntervalSince1970: (TimeInterval(self.timestampMs) / 1000)) :
nil
),
containsOnlyEmoji: self.body?.containsOnlyEmoji,
glyphCount: self.body?.glyphCount,
previousVariant: prevModel?.variant,
positionInCluster: positionInCluster,
isOnlyMessageInCluster: isOnlyMessageInCluster,
@ -413,6 +423,8 @@ public extension MessageCell.ViewModel {
self.senderName = nil
self.shouldShowProfile = false
self.dateForUI = nil
self.containsOnlyEmoji = nil
self.glyphCount = nil
self.previousVariant = nil
self.positionInCluster = .middle
self.isOnlyMessageInCluster = true

View File

@ -47,16 +47,21 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
}()
private lazy var moderatorIconImageView = UIImageView(image: #imageLiteral(resourceName: "Crown"))
lazy var bubbleBackgroundView: UIView = {
let result = UIView()
result.layer.cornerRadius = VisibleMessageCell.largeCornerRadius
return result
}()
lazy var bubbleView: UIView = {
let result = UIView()
result.clipsToBounds = true
result.layer.cornerRadius = VisibleMessageCell.largeCornerRadius
result.set(.width, greaterThanOrEqualTo: VisibleMessageCell.largeCornerRadius * 2)
return result
}()
private let bubbleViewMaskLayer = CAShapeLayer()
private lazy var headerView = UIView()
private lazy var authorLabel: UILabel = {
@ -147,11 +152,15 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
moderatorIconImageView.pin(.trailing, to: .trailing, of: profilePictureView, withInset: 1)
moderatorIconImageView.pin(.bottom, to: .bottom, of: profilePictureView, withInset: 4.5)
// Bubble background view (used for the 'highlighted' animation)
addSubview(bubbleBackgroundView)
// Bubble view
addSubview(bubbleView)
bubbleViewLeftConstraint1.isActive = true
bubbleViewTopConstraint.isActive = true
bubbleViewRightConstraint1.isActive = true
bubbleBackgroundView.pin(to: bubbleView)
// Timer view
addSubview(timerView)
@ -242,10 +251,16 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
cellViewModel.variant == .standardIncoming ||
cellViewModel.variant == .standardIncomingDeleted
) ? Colors.receivedMessageBackground : Colors.sentMessageBackground)
bubbleBackgroundView.backgroundColor = bubbleView.backgroundColor
updateBubbleViewCorners()
// Content view
populateContentView(for: cellViewModel, mediaCache: mediaCache, playbackInfo: playbackInfo, lastSearchText: lastSearchText)
populateContentView(
for: cellViewModel,
mediaCache: mediaCache,
playbackInfo: playbackInfo,
lastSearchText: lastSearchText
)
// Date break
headerViewTopConstraint.constant = (shouldInsetHeader ? Values.mediumSpacing : 1)
@ -399,7 +414,6 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
)
snContentView.addSubview(linkPreviewView)
linkPreviewView.pin(to: snContentView)
linkPreviewView.layer.mask = bubbleViewMaskLayer
self.bodyTextView = linkPreviewView.bodyTextView
case .openGroupInvitation:
@ -412,7 +426,6 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
snContentView.addSubview(openGroupInvitationView)
openGroupInvitationView.pin(to: snContentView)
openGroupInvitationView.layer.mask = bubbleViewMaskLayer
}
}
else {
@ -478,7 +491,6 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
albumView.set(.width, to: size.width)
albumView.set(.height, to: size.height)
albumView.loadMedia()
albumView.layer.mask = bubbleViewMaskLayer
stackView.addArrangedSubview(albumView)
// Body text view
@ -517,7 +529,6 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
snContentView.addSubview(voiceMessageView)
voiceMessageView.pin(to: snContentView)
voiceMessageView.layer.mask = bubbleViewMaskLayer
self.voiceMessageView = voiceMessageView
case .genericAttachment:
@ -561,16 +572,9 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
private func updateBubbleViewCorners() {
let cornersToRound: UIRectCorner = getCornersToRound()
let maskPath: UIBezierPath = UIBezierPath(
roundedRect: bubbleView.bounds,
byRoundingCorners: cornersToRound,
cornerRadii: CGSize(
width: VisibleMessageCell.largeCornerRadius,
height: VisibleMessageCell.largeCornerRadius
)
)
bubbleViewMaskLayer.path = maskPath.cgPath
bubbleBackgroundView.layer.cornerRadius = VisibleMessageCell.largeCornerRadius
bubbleBackgroundView.layer.maskedCorners = getCornerMask(from: cornersToRound)
bubbleView.layer.cornerRadius = VisibleMessageCell.largeCornerRadius
bubbleView.layer.maskedCorners = getCornerMask(from: cornersToRound)
}
@ -644,12 +648,23 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
// FIXME: This will have issues with themes
let shawdowColour = (isLightMode ? UIColor.black.cgColor : Colors.accent.cgColor)
let opacity: Float = (isLightMode ? 0.5 : 1)
bubbleView.setShadow(radius: 10, opacity: opacity, offset: .zero, color: shawdowColour)
DispatchQueue.main.async {
UIView.animate(withDuration: 1.6) {
self.bubbleView.setShadow(radius: 0, opacity: 0, offset: .zero, color: UIColor.clear.cgColor)
}
DispatchQueue.main.async { [weak self] in
let oldMasksToBounds: Bool = (self?.layer.masksToBounds ?? false)
self?.layer.masksToBounds = false
self?.bubbleBackgroundView.setShadow(radius: 10, opacity: opacity, offset: .zero, color: shawdowColour)
UIView.animate(
withDuration: 1.6,
delay: 0,
options: .curveEaseInOut,
animations: {
self?.bubbleBackgroundView.setShadow(radius: 0, opacity: 0, offset: .zero, color: UIColor.clear.cgColor)
},
completion: { _ in
self?.layer.masksToBounds = oldMasksToBounds
}
)
}
}
@ -784,11 +799,14 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
private static func getFontSize(for cellViewModel: MessageCell.ViewModel) -> CGFloat {
let baselineFontSize = Values.mediumFontSize
switch viewItem.displayableBodyText?.jumbomojiCount {
case 1: return baselineFontSize + 30
case 2: return baselineFontSize + 24
case 3, 4, 5: return baselineFontSize + 18
default: return baselineFontSize
guard cellViewModel.containsOnlyEmoji == true else { return baselineFontSize }
switch (cellViewModel.glyphCount ?? 0) {
case 1: return baselineFontSize + 30
case 2: return baselineFontSize + 24
case 3, 4, 5: return baselineFontSize + 18
default: return baselineFontSize
}
}
@ -915,19 +933,34 @@ final class VisibleMessageCell: MessageCell, UITextViewDelegate, BodyTextViewDel
]
)
)
if let searchText = searchText, searchText.count >= ConversationSearchController.kMinimumSearchTextLength {
let normalizedSearchText = FullTextSearchFinder.normalize(text: searchText)
do {
let regex = try NSRegularExpression(pattern: NSRegularExpression.escapedPattern(for: normalizedSearchText), options: .caseInsensitive)
let matches = regex.matches(in: attributedText.string, options: .withoutAnchoringBounds, range: NSRange(location: 0, length: (attributedText.string as NSString).length))
for match in matches {
guard match.range.location + match.range.length < attributedText.length else { continue }
attributedText.addAttribute(.backgroundColor, value: UIColor.white, range: match.range)
attributedText.addAttribute(.foregroundColor, value: UIColor.black, range: match.range)
// If there is a valid search term then highlight each part that matched
if let searchText = searchText, searchText.count >= ConversationSearchController.minimumSearchTextLength {
let normalizedBody: String = attributedText.string.lowercased()
ConversationCell.ViewModel.searchTermParts(searchText)
.map { part -> String in
guard part.hasPrefix("\"") && part.hasSuffix("\"") else { return part }
return String(part[part.index(after: part.startIndex)..<part.endIndex])
}
.forEach { part in
// Highlight all ranges of the text (Note: The search logic only finds results that start
// with the term so we use the regex below to ensure we only highlight those cases)
normalizedBody
.ranges(
of: (CurrentAppContext().isRTL ?
"\(part.lowercased())(^|[ ])" :
"(^|[ ])\(part.lowercased())"
),
options: [.regularExpression]
)
.forEach { range in
let legacyRange: NSRange = NSRange(range, in: normalizedBody)
attributedText.addAttribute(.backgroundColor, value: UIColor.white, range: legacyRange)
attributedText.addAttribute(.foregroundColor, value: UIColor.black, range: legacyRange)
}
}
} catch {
// Do nothing
}
}
result.attributedText = attributedText

View File

@ -54,13 +54,15 @@ public class MediaGalleryViewModel {
guard isPagedData else { return }
var hasSavedIntialUpdate: Bool = false
// Note: Since this references self we need to finish initializing before setting it, we
// also want to skip the initial query and trigger it async so that the push animation
// doesn't stutter (it should load basically immediately but without this there is a
// distinct stutter)
let filterSQL: SQL = Item.filterSQL(threadId: threadId)
self.pagedDataObserver = PagedDatabaseObserver(
pagedTable: Attachment.self,
pageSize: pageSize,
idColumn: .id,
initialFocusedId: focusedAttachmentId,
observedChanges: [
PagedData.ObservedChanges(
table: Attachment.self,
@ -76,17 +78,21 @@ public class MediaGalleryViewModel {
return
}
// If we haven't stored the data for the initial fetch then do so now (no need
// to call 'onGalleryChange' in this case as it will always be null)
guard hasSavedIntialUpdate else {
self?.updateGalleryData(updatedGalleryData)
hasSavedIntialUpdate = true
return
}
self?.onGalleryChange?(updatedGalleryData)
}
)
// Run the initial query on a backgorund thread so we don't block the push transition
DispatchQueue.global(qos: .default).async { [weak self] in
// If we don't have a `initialFocusedId` then default to `.pageBefore` (it'll query
// from a `0` offset)
guard let initialFocusedId: String = focusedAttachmentId else {
self?.pagedDataObserver?.load(.pageBefore)
return
}
self?.pagedDataObserver?.load(.initialPageAround(id: initialFocusedId))
}
}
// MARK: - Data

View File

@ -19,6 +19,7 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
private let viewModel: MediaGalleryViewModel
private var hasLoadedInitialData: Bool = false
private var didFinishInitialLayout: Bool = false
private var isAutoLoadingNextPage: Bool = false
private var currentTargetOffset: CGPoint?
@ -155,7 +156,12 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
super.viewWillAppear(animated)
startObservingChanges()
triggerInitialDataLoadIfNeeded()
}
public override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
self.didFinishInitialLayout = true
}
public override func viewWillDisappear(_ animated: Bool) {
@ -184,25 +190,18 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
// MARK: - Updating
private func triggerInitialDataLoadIfNeeded() {
private func performInitialScrollIfNeeded() {
// Ensure this hasn't run before and that we have data (The 'galleryData' will always
// contain something as the 'empty' state is a section within 'galleryData')
guard !self.hasLoadedInitialData && !self.viewModel.galleryData.isEmpty else { return }
guard !self.didFinishInitialLayout && self.hasLoadedInitialData else { return }
// If we have a focused item then we want to scroll to it
guard let focusedIndexPath: IndexPath = self.viewModel.focusedIndexPath else {
self.hasLoadedInitialData = true
return
}
guard let focusedIndexPath: IndexPath = self.viewModel.focusedIndexPath else { return }
Logger.debug("scrolling to focused item at indexPath: \(focusedIndexPath)")
self.view.layoutIfNeeded()
self.collectionView.scrollToItem(at: focusedIndexPath, at: .centeredVertically, animated: false)
// Note: If we have a 'focusedIndexPath' then we want to leave this until last so we can avoid
// triggering page loads due to default content offsets
self.hasLoadedInitialData = true
// Now that the data has loaded we need to check if either of the "load more" sections are
// visible and trigger them if so
//
@ -261,9 +260,12 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
// Ensure the first load runs without animations (if we don't do this the cells will animate
// in from a frame of CGRect.zero)
guard hasLoadedInitialData else {
self.hasLoadedInitialData = true
self.viewModel.updateGalleryData(updatedGalleryData)
UIView.performWithoutAnimation {
handleUpdates(updatedGalleryData)
triggerInitialDataLoadIfNeeded()
self.collectionView.reloadData()
self.performInitialScrollIfNeeded()
}
return
}
@ -406,7 +408,7 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour
public func collectionView(_ collectionView: UICollectionView, willDisplaySupplementaryView view: UICollectionReusableView, forElementKind elementKind: String, at indexPath: IndexPath) {
// Want to ensure the initial content load has completed before we try to load any more data
guard self.hasLoadedInitialData else { return }
guard self.didFinishInitialLayout else { return }
let section: MediaGalleryViewModel.SectionModel = self.viewModel.galleryData[indexPath.section]

View File

@ -184,7 +184,7 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
// it must be escaped.
// see https://developer.apple.com/documentation/uikit/uilocalnotification/1616646-alertbody
// for more details.
let messageText: String? = DisplayableText.filterNotificationText(rawMessageText)
let messageText: String? = String.filterNotificationText(rawMessageText)
// Don't fire the notification if the current user isn't mentioned
// and isOnlyNotifyingForMentions is on.

View File

@ -3,6 +3,7 @@
import UIKit
import PromiseKit
import SessionUtilitiesKit
import SessionSnodeKit
final class LinkDeviceVC : BaseVC, UIPageViewControllerDataSource, UIPageViewControllerDelegate, OWSQRScannerDelegate {
private let pageVC = UIPageViewController(transitionStyle: .scroll, navigationOrientation: .horizontal, options: nil)
@ -136,7 +137,12 @@ final class LinkDeviceVC : BaseVC, UIPageViewControllerDataSource, UIPageViewCon
}
let (ed25519KeyPair, x25519KeyPair) = try! Identity.generate(from: seed)
Onboarding.Flow.link.preregister(with: seed, ed25519KeyPair: ed25519KeyPair, x25519KeyPair: x25519KeyPair)
Identity.didRegister()
// Now that we have registered get the Snode pool
GetSnodePoolJob.run()
NotificationCenter.default.addObserver(self, selector: #selector(handleInitialConfigurationMessageReceived), name: .initialConfigurationMessageReceived, object: nil)
ModalActivityIndicatorViewController.present(fromViewController: navigationController!) { [weak self] modal in
self?.activityIndicatorModal = modal

View File

@ -1,6 +1,9 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import UIKit
import PromiseKit
import SessionMessagingKit
import SessionSnodeKit
final class PNModeVC : BaseVC, OptionViewDelegate {
@ -96,10 +99,15 @@ final class PNModeVC : BaseVC, OptionViewDelegate {
return present(alert, animated: true, completion: nil)
}
UserDefaults.standard[.isUsingFullAPNs] = (selectedOptionView == apnsOptionView)
Identity.didRegister()
let homeVC = HomeVC()
navigationController!.setViewControllers([ homeVC ], animated: true)
Identity.didRegister()
// Go to the home screen
let homeVC: HomeVC = HomeVC()
self.navigationController?.setViewControllers([ homeVC ], animated: true)
// Now that we have registered get the Snode pool and sync push tokens
GetSnodePoolJob.run()
SyncPushTokensJob.run(uploadOnlyIfStale: false)
}
}

View File

@ -15,20 +15,22 @@ enum _002_SetupStandardJobs: Migration {
// Start by adding the jobs that don't have collections (in the jobs like these
// will be added via migrations)
try autoreleasepool {
// TODO: Add additional jobs from the AppDelegate
_ = try Job(
variant: .disappearingMessages,
behaviour: .recurringOnLaunchBlockingOncePerSession
behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true
).inserted(db)
_ = try Job(
variant: .failedMessages,
behaviour: .recurringOnLaunchBlocking
behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true
).inserted(db)
_ = try Job(
variant: .failedAttachmentDownloads,
behaviour: .recurringOnLaunchBlocking
behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true
).inserted(db)
_ = try Job(

View File

@ -8,9 +8,9 @@ import SessionUtilitiesKit
import SessionSnodeKit
public enum GarbageCollectionJob: JobExecutor {
public static var maxFailureCount: Int = 10
public static var requiresThreadId: Bool = true
public static let requiresInteractionId: Bool = false // Some messages don't have interactions
public static var maxFailureCount: Int = -1
public static var requiresThreadId: Bool = false
public static let requiresInteractionId: Bool = false
public static func run(
_ job: Job,

View File

@ -17,6 +17,8 @@ public final class Poller : NSObject {
private static let pollInterval: TimeInterval = 1.5
private static let retryInterval: TimeInterval = 0.25
private static let maxRetryInterval: TimeInterval = 15
/// After polling a given snode this many times we always switch to a new one.
///
/// The reason for doing this is that sometimes a snode will be giving us successful responses while
@ -53,7 +55,7 @@ public final class Poller : NSObject {
// MARK: - Private API
private func setUpPolling() {
private func setUpPolling(delay: TimeInterval = Poller.retryInterval) {
guard isPolling.wrappedValue else { return }
Threading.pollerQueue.async {
@ -66,13 +68,21 @@ public final class Poller : NSObject {
return promise
}
.ensure(on: Threading.pollerQueue) { [weak self] in // Timers don't do well on background queues
.done(on: Threading.pollerQueue) { [weak self] in
guard self?.isPolling.wrappedValue == true else { return }
Timer.scheduledTimerOnMainThread(withTimeInterval: Poller.retryInterval, repeats: false) { _ in
self?.setUpPolling()
}
}
.catch(on: Threading.pollerQueue) { [weak self] _ in
guard self?.isPolling.wrappedValue == true else { return }
let nextDelay: TimeInterval = min(Poller.maxRetryInterval, (delay * 1.2))
Timer.scheduledTimerOnMainThread(withTimeInterval: nextDelay, repeats: false) { _ in
self?.setUpPolling()
}
}
}
}

View File

@ -12,26 +12,6 @@ public protocol SessionMessagingKitStorageProtocol {
func write(with block: @escaping (Any) -> Void, completion: @escaping () -> Void) -> Promise<Void>
func writeSync(with block: @escaping (Any) -> Void)
// MARK: - Closed Groups
func getUserClosedGroupPublicKeys() -> Set<String>
func getUserClosedGroupPublicKeys(using transaction: YapDatabaseReadTransaction) -> Set<String>
func getZombieMembers(for groupPublicKey: String) -> Set<String>
func setZombieMembers(for groupPublicKey: String, to zombies: Set<String>, using transaction: Any)
func isClosedGroup(_ publicKey: String) -> Bool
func isClosedGroup(_ publicKey: String, using transaction: YapDatabaseReadTransaction) -> Bool
// MARK: - Jobs
func persist(_ job: Job, using transaction: Any)
func markJobAsSucceeded(_ job: Job, using transaction: Any)
func markJobAsFailed(_ job: Job, using transaction: Any)
func getAllPendingJobs(of type: Job.Type) -> [Job]
func getAttachmentUploadJob(for attachmentID: String) -> AttachmentUploadJob?
func getMessageSendJob(for messageSendJobID: String) -> MessageSendJob?
func resumeMessageSendJobIfNeeded(_ messageSendJobID: String)
func isJobCanceled(_ job: Job) -> Bool
// MARK: - Authorization
func getAuthToken(for room: String, on server: String) -> String?
@ -71,21 +51,6 @@ public protocol SessionMessagingKitStorageProtocol {
// MARK: - Open Group Metadata
func setUserCount(to newValue: UInt64, forV2OpenGroupWithID openGroupID: String, using transaction: Any)
// MARK: - Message Handling
func getReceivedMessageTimestamps(using transaction: Any) -> [UInt64]
func addReceivedMessageTimestamp(_ timestamp: UInt64, using transaction: Any)
/// Returns the ID of the thread.
func getOrCreateThread(for publicKey: String, groupPublicKey: String?, openGroupID: String?, using transaction: Any) -> String?
/// Returns the ID of the `TSIncomingMessage` that was constructed.
func persist(_ message: VisibleMessage, quotedMessage: TSQuotedMessage?, linkPreview: OWSLinkPreview?, groupPublicKey: String?, openGroupID: String?, using transaction: Any) -> String?
/// Returns the IDs of the saved attachments.
func persist(_ attachments: [VisibleMessage.Attachment], using transaction: Any) -> [String]
/// Also touches the associated message.
func setAttachmentState(to state: TSAttachmentPointerState, for pointer: TSAttachmentPointer, associatedWith tsIncomingMessageID: String, using transaction: Any)
/// Also touches the associated message.
func persist(_ stream: TSAttachmentStream, associatedWith tsIncomingMessageID: String, using transaction: Any)
}
extension Storage: SessionMessagingKitStorageProtocol {}

View File

@ -13,7 +13,8 @@ enum _002_SetupStandardJobs: Migration {
try autoreleasepool {
_ = try Job(
variant: .getSnodePool,
behaviour: .recurringOnActiveBlocking
behaviour: .recurringOnActive,
shouldBlockFirstRunEachSession: true
).inserted(db)
}
}

View File

@ -16,9 +16,35 @@ public enum GetSnodePoolJob: JobExecutor {
failure: @escaping (Job, Error?, Bool) -> (),
deferred: @escaping (Job) -> ()
) {
// If the user doesn't exist then don't do anything (when the user registers we run this
// job directly)
guard Identity.userExists() else {
deferred(job)
return
}
// If we already have cached Snodes then we still want to trigger the 'SnodeAPI.getSnodePool'
// but we want to succeed this job immediately (since it's marked as blocking), this allows us
// to block if we have no Snode pool and prevent other jobs from failing but avoids having to
// wait if we already have a potentially valid snode pool
guard !SnodeAPI.hasCachedSnodesInclusingExpired() else {
SnodeAPI.getSnodePool().retainUntilComplete()
success(job, false)
return
}
SnodeAPI.getSnodePool()
.done { _ in success(job, false) }
.catch { error in failure(job, error, false) }
.retainUntilComplete()
}
public static func run() {
GetSnodePoolJob.run(
Job(variant: .getSnodePool),
success: { _, _ in },
failure: { _, _, _ in },
deferred: { _ in }
)
}
}

View File

@ -38,6 +38,9 @@ public final class SnodeAPI : NSObject {
public typealias RawResponsePromise = Promise<RawResponse>
// MARK: Snode Pool Interaction
private static var hasInsufficientSnodes: Bool { snodePool.count < minSnodePoolCount }
private static func loadSnodePoolIfNeeded() {
guard !hasLoadedSnodePool else { return }
@ -250,9 +253,10 @@ public final class SnodeAPI : NSObject {
// MARK: Public API
@objc(getSnodePool)
public static func objc_getSnodePool() -> AnyPromise {
AnyPromise.from(getSnodePool())
public static func hasCachedSnodesInclusingExpired() -> Bool {
loadSnodePoolIfNeeded()
return !hasInsufficientSnodes
}
public static func getSnodePool() -> Promise<Set<Snode>> {
@ -261,8 +265,7 @@ public final class SnodeAPI : NSObject {
let hasSnodePoolExpired = given(GRDBStorage.shared[.lastSnodePoolRefreshDate]) {
now.timeIntervalSince($0) > 2 * 60 * 60
}.defaulting(to: true)
let snodePool = SnodeAPI.snodePool
let hasInsufficientSnodes = (snodePool.count < minSnodePoolCount)
let snodePool: Set<Snode> = SnodeAPI.snodePool
if hasInsufficientSnodes || hasSnodePoolExpired {
if let getSnodePoolPromise = getSnodePoolPromise { return getSnodePoolPromise }

View File

@ -28,6 +28,10 @@ enum _001_InitialSetupMigration: Migration {
t.column(.behaviour, .integer)
.notNull()
.indexed() // Quicker querying
t.column(.shouldBlockFirstRunEachSession, .boolean)
.notNull()
.indexed() // Quicker querying
.defaults(to: false)
t.column(.nextRunTimestamp, .double)
.notNull()
.indexed() // Quicker querying
@ -44,9 +48,8 @@ enum _001_InitialSetupMigration: Migration {
.notNull()
.references(Job.self, onDelete: .cascade) // Delete if Job deleted
t.column(.dependantId, .integer)
.notNull()
.indexed() // Quicker querying
.references(Job.self, onDelete: .cascade) // Delete if Job deleted
.references(Job.self, onDelete: .setNull) // Delete if Job deleted
t.primaryKey([.jobId, .dependantId])
}

View File

@ -16,18 +16,19 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
case failureCount
case variant
case behaviour
case shouldBlockFirstRunEachSession
case nextRunTimestamp
case threadId
case interactionId
case details
}
public enum Variant: Int, Codable, DatabaseValueConvertible {
public enum Variant: Int, Codable, DatabaseValueConvertible, CaseIterable {
/// This is a recurring job that handles the removal of disappearing messages and is triggered
/// at the timestamp of the next disappearing message
case disappearingMessages
/// This is a recurring job that ensures the app retrieves a service node pool on active
/// This is a recurring job that ensures the app retrieves a service node pool on become active
///
/// **Note:** This is a blocking job so it will run before any other jobs and prevent them from
/// running until it's complete
@ -87,7 +88,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
case attachmentDownload
}
public enum Behaviour: Int, Codable, DatabaseValueConvertible {
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {
/// This job will run once and then be removed from the jobs table
case runOnce
@ -102,22 +103,9 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
/// gets set
case recurringOnLaunch
/// This job will run once each launch and may run again during the same session if `nextRunTimestamp`
/// gets set, it also must complete before any other jobs can run
case recurringOnLaunchBlocking
/// This job will run once each launch and may run again during the same session if `nextRunTimestamp`
/// gets set, it also must complete before any other jobs can run
case recurringOnLaunchBlockingOncePerSession
/// This job will run once each whenever the app becomes active (launch and return from background) and
/// may run again during the same session if `nextRunTimestamp` gets set
case recurringOnActive
/// This job will run once each whenever the app becomes active (launch and return from background) and
/// may run again during the same session if `nextRunTimestamp` gets set, it also must complete before
/// any other jobs can run
case recurringOnActiveBlocking
}
/// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into
@ -130,9 +118,16 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
/// The type of job
public let variant: Variant
/// The type of job
/// How the job should behave
public let behaviour: Behaviour
/// When the app starts or returns from the background this flag controls whether the job should prevent other
/// jobs from starting until after it completes
///
/// **Note:** `OnLaunch` blocking jobs will be started on launch and all others will be triggered when becoming
/// active but the "blocking" behaviour will only occur if there are no other jobs already running
public let shouldBlockFirstRunEachSession: Bool
/// Seconds since epoch to indicate the next datetime that this job should run
public let nextRunTimestamp: TimeInterval
@ -174,6 +169,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt,
variant: Variant,
behaviour: Behaviour,
shouldBlockFirstRunEachSession: Bool,
nextRunTimestamp: TimeInterval,
threadId: String?,
interactionId: Int64?,
@ -183,6 +179,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
self.failureCount = failureCount
self.variant = variant
self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession
self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId
self.interactionId = interactionId
@ -193,6 +190,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt = 0,
variant: Variant,
behaviour: Behaviour = .runOnce,
shouldBlockFirstRunEachSession: Bool = false,
nextRunTimestamp: TimeInterval = 0,
threadId: String? = nil,
interactionId: Int64? = nil
@ -200,6 +198,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
self.failureCount = failureCount
self.variant = variant
self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession
self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId
self.interactionId = interactionId
@ -210,6 +209,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt = 0,
variant: Variant,
behaviour: Behaviour = .runOnce,
shouldBlockFirstRunEachSession: Bool = false,
nextRunTimestamp: TimeInterval = 0,
threadId: String? = nil,
interactionId: Int64? = nil,
@ -225,6 +225,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
self.failureCount = failureCount
self.variant = variant
self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession
self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId
self.interactionId = interactionId
@ -236,23 +237,14 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
public mutating func didInsert(with rowID: Int64, for column: String?) {
self.id = rowID
}
public func delete(_ db: Database) throws -> Bool {
// Delete any dependencies
try dependantJobs
.deleteAll(db)
return try performDelete(db)
}
}
// MARK: - GRDB Interactions
extension Job {
internal static func filterPendingJobs(excludeFutureJobs: Bool = true) -> QueryInterfaceRequest<Job> {
internal static func filterPendingJobs(variants: [Variant], excludeFutureJobs: Bool = true) -> QueryInterfaceRequest<Job> {
let query: QueryInterfaceRequest<Job> = Job
.filter(
// TODO: Should this include other behaviours? (what happens if one of the other types fails???? Just leave it until the next launch/active???) Set a 'failureCount' and use that to determine if it should run? (reset on success)
// Retrieve all 'runOnce' and 'recurring' jobs
[
Job.Behaviour.runOnce,
@ -262,13 +254,12 @@ extension Job {
// 'nextRunTimestamp'
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.recurringOnLaunchBlocking,
Job.Behaviour.recurringOnActive,
Job.Behaviour.recurringOnActiveBlocking
Job.Behaviour.recurringOnActive
].contains(Job.Columns.behaviour) &&
Job.Columns.nextRunTimestamp > 0
)
)
.filter(variants.contains(Job.Columns.variant))
.order(Job.Columns.nextRunTimestamp)
.order(Job.Columns.id)
@ -284,30 +275,20 @@ extension Job {
// MARK: - Convenience
public extension Job {
var isBlocking: Bool {
switch self.behaviour {
case .recurringOnLaunchBlocking,
.recurringOnLaunchBlockingOncePerSession,
.recurringOnActiveBlocking:
return true
default: return false
}
}
func with(
failureCount: UInt = 0,
nextRunTimestamp: TimeInterval
) -> Job {
return Job(
id: id,
id: self.id,
failureCount: failureCount,
variant: variant,
behaviour: behaviour,
variant: self.variant,
behaviour: self.behaviour,
shouldBlockFirstRunEachSession: self.shouldBlockFirstRunEachSession,
nextRunTimestamp: nextRunTimestamp,
threadId: threadId,
interactionId: interactionId,
details: details
threadId: self.threadId,
interactionId: self.interactionId,
details: self.details
)
}
@ -315,13 +296,14 @@ public extension Job {
guard let detailsData: Data = try? JSONEncoder().encode(details) else { return nil }
return Job(
id: id,
failureCount: failureCount,
variant: variant,
behaviour: behaviour,
nextRunTimestamp: nextRunTimestamp,
threadId: threadId,
interactionId: interactionId,
id: self.id,
failureCount: self.failureCount,
variant: self.variant,
behaviour: self.behaviour,
shouldBlockFirstRunEachSession: self.shouldBlockFirstRunEachSession,
nextRunTimestamp: self.nextRunTimestamp,
threadId: self.threadId,
interactionId: self.interactionId,
details: detailsData
)
}

View File

@ -15,8 +15,16 @@ public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, Tabl
case dependantId
}
/// The is the id of the main job
public let jobId: Int64
public let dependantId: Int64
/// The is the id of the job that the main job is dependant on
///
/// **Note:** If this is `null` it means the dependant job has been deleted (but the dependency wasn't
/// removed) this generally means a job has been directly deleted without it's dependencies getting cleaned
/// up - If we find a job that has a dependency with no `dependantId` then it's likely an invalid job and
/// should be removed
public let dependantId: Int64?
// MARK: - Initialization

View File

@ -33,7 +33,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// MARK: - Initialization
fileprivate init(
public init(
pagedTable: ObservedTable.Type,
pageSize: Int,
idColumn: ObservedTable.Columns,
@ -43,8 +43,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
orderSQL: SQL,
dataQuery: @escaping (SQL?, SQL?) -> AdaptedFetchRequest<SQLRequest<T>>,
associatedRecords: [ErasedAssociatedRecord] = [],
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> (),
initialQueryTarget: PagedData.PageInfo.InternalTarget?
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> ()
) {
let associatedTables: Set<String> = associatedRecords.map { $0.databaseTableName }.asSet()
assert(!associatedTables.contains(pagedTable.databaseTableName), "The paged table cannot also exist as an associatedRecord")
@ -80,11 +79,6 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.filter { $0.events.contains(.delete) }
.map { $0.databaseTableName }
.asSet()
// Run the initial query if there is one
guard let initialQueryTarget: PagedData.PageInfo.InternalTarget = initialQueryTarget else { return }
self.load(initialQueryTarget)
}
// MARK: - TransactionObserver
@ -483,69 +477,18 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// MARK: - Convenience
public extension PagedDatabaseObserver {
fileprivate static func initialQueryTarget<ID: SQLExpressible>(
for initialFocusedId: ID?,
skipInitialQuery: Bool
) -> PagedData.PageInfo.InternalTarget? {
// Determine if we want to laod the first page immediately (this is generally needed
// to prevent transitions from looking buggy)
guard !skipInitialQuery else { return nil }
switch initialFocusedId {
case .some(let targetId): return .initialPageAround(id: targetId.sqlExpression)
// If we don't have a `initialFocusedId` then default to `.pageBefore` (it'll query
// from a `0` offset
case .none: return .pageBefore
}
}
convenience init(
pagedTable: ObservedTable.Type,
pageSize: Int,
idColumn: ObservedTable.Columns,
initialFocusedId: ObservedTable.ID? = nil,
observedChanges: [PagedData.ObservedChanges],
joinSQL: SQL? = nil,
filterSQL: SQL,
orderSQL: SQL,
dataQuery: @escaping (SQL?, SQL?) -> AdaptedFetchRequest<SQLRequest<T>>,
associatedRecords: [ErasedAssociatedRecord] = [],
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> (),
skipInitialQuery: Bool = false
) where ObservedTable.ID: SQLExpressible {
self.init(
pagedTable: pagedTable,
pageSize: pageSize,
idColumn: idColumn,
observedChanges: observedChanges,
joinSQL: joinSQL,
filterSQL: filterSQL,
orderSQL: orderSQL,
dataQuery: dataQuery,
associatedRecords: associatedRecords,
onChangeUnsorted: onChangeUnsorted,
initialQueryTarget: PagedDatabaseObserver.initialQueryTarget(
for: initialFocusedId,
skipInitialQuery: skipInitialQuery
)
)
}
convenience init(
pagedTable: ObservedTable.Type,
pageSize: Int,
idColumn: ObservedTable.Columns,
initialFocusedId: ObservedTable.ID? = nil,
observedChanges: [PagedData.ObservedChanges],
joinSQL: SQL? = nil,
filterSQL: SQL,
orderSQL: SQL,
dataQuery: @escaping (SQL?, SQL?) -> SQLRequest<T>,
associatedRecords: [ErasedAssociatedRecord] = [],
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> (),
skipInitialQuery: Bool = false
) where ObservedTable.ID: SQLExpressible {
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> ()
) {
self.init(
pagedTable: pagedTable,
pageSize: pageSize,
@ -558,77 +501,7 @@ public extension PagedDatabaseObserver {
dataQuery(additionalFilters, limit).adapted { _ in ScopeAdapter([:]) }
},
associatedRecords: associatedRecords,
onChangeUnsorted: onChangeUnsorted,
initialQueryTarget: PagedDatabaseObserver.initialQueryTarget(
for: initialFocusedId,
skipInitialQuery: skipInitialQuery
)
)
}
convenience init<ID>(
pagedTable: ObservedTable.Type,
pageSize: Int,
idColumn: ObservedTable.Columns,
initialFocusedId: ID? = nil,
observedChanges: [PagedData.ObservedChanges],
joinSQL: SQL? = nil,
filterSQL: SQL,
orderSQL: SQL,
dataQuery: @escaping (SQL?, SQL?) -> AdaptedFetchRequest<SQLRequest<T>>,
associatedRecords: [ErasedAssociatedRecord] = [],
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> (),
skipInitialQuery: Bool = false
) where ObservedTable.ID == Optional<ID>, ID: SQLExpressible {
self.init(
pagedTable: pagedTable,
pageSize: pageSize,
idColumn: idColumn,
observedChanges: observedChanges,
joinSQL: joinSQL,
filterSQL: filterSQL,
orderSQL: orderSQL,
dataQuery: dataQuery,
associatedRecords: associatedRecords,
onChangeUnsorted: onChangeUnsorted,
initialQueryTarget: PagedDatabaseObserver.initialQueryTarget(
for: initialFocusedId,
skipInitialQuery: skipInitialQuery
)
)
}
convenience init<ID>(
pagedTable: ObservedTable.Type,
pageSize: Int,
idColumn: ObservedTable.Columns,
initialFocusedId: ID? = nil,
observedChanges: [PagedData.ObservedChanges],
joinSQL: SQL? = nil,
filterSQL: SQL,
orderSQL: SQL,
dataQuery: @escaping (SQL?, SQL?) -> SQLRequest<T>,
associatedRecords: [ErasedAssociatedRecord] = [],
onChangeUnsorted: @escaping ([T], PagedData.PageInfo) -> (),
skipInitialQuery: Bool = false
) where ObservedTable.ID == Optional<ID>, ID: SQLExpressible {
self.init(
pagedTable: pagedTable,
pageSize: pageSize,
idColumn: idColumn,
observedChanges: observedChanges,
joinSQL: joinSQL,
filterSQL: filterSQL,
orderSQL: orderSQL,
dataQuery: { additionalFilters, limit in
dataQuery(additionalFilters, limit).adapted { _ in ScopeAdapter([:]) }
},
associatedRecords: associatedRecords,
onChangeUnsorted: onChangeUnsorted,
initialQueryTarget: PagedDatabaseObserver.initialQueryTarget(
for: initialFocusedId,
skipInitialQuery: skipInitialQuery
)
onChangeUnsorted: onChangeUnsorted
)
}

View File

@ -3,6 +3,31 @@
import SignalCoreKit
public extension String {
var glyphCount: Int {
let richText = NSAttributedString(string: self)
let line = CTLineCreateWithAttributedString(richText)
return CTLineGetGlyphCount(line)
}
var isSingleEmoji: Bool {
return (glyphCount == 1 && containsEmoji)
}
var containsEmoji: Bool {
return unicodeScalars.contains { $0.isEmoji }
}
var containsOnlyEmoji: Bool {
return (
!isEmpty &&
!unicodeScalars.contains(where: {
!$0.isEmoji &&
!$0.isZeroWidthJoiner
})
)
}
func localized() -> String {
// If the localized string matches the key provided then the localisation failed
let localizedString = NSLocalizedString(self, comment: "")
@ -28,4 +53,15 @@ public extension String {
return ranges
}
static func filterNotificationText(_ text: String?) -> String? {
guard let text = text?.filterStringForDisplay() else { return nil }
// iOS strips anything that looks like a printf formatting character from
// the notification body, so if we want to dispay a literal "%" in a notification
// it must be escaped.
// see https://developer.apple.com/documentation/uikit/uilocalnotification/1616646-alertbody
// for more details.
return text.replacingOccurrences(of: "%", with: "%%")
}
}

View File

@ -0,0 +1,121 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
public extension UnicodeScalar {
class EmojiRange {
// rangeStart and rangeEnd are inclusive.
let rangeStart: UInt32
let rangeEnd: UInt32
// MARK: - Initializers
init(rangeStart: UInt32, rangeEnd: UInt32) {
self.rangeStart = rangeStart
self.rangeEnd = rangeEnd
}
}
// From:
// https://www.unicode.org/Public/emoji/
// Current Version:
// https://www.unicode.org/Public/emoji/6.0/emoji-data.txt
//
// These ranges can be code-generated using:
//
// * Scripts/emoji-data.txt
// * Scripts/emoji_ranges.py
static let kEmojiRanges = [
// NOTE: Don't treat Pound Sign # as Jumbomoji.
// EmojiRange(rangeStart:0x23, rangeEnd:0x23),
// NOTE: Don't treat Asterisk * as Jumbomoji.
// EmojiRange(rangeStart:0x2A, rangeEnd:0x2A),
// NOTE: Don't treat Digits 0..9 as Jumbomoji.
// EmojiRange(rangeStart:0x30, rangeEnd:0x39),
// NOTE: Don't treat Copyright Symbol © as Jumbomoji.
// EmojiRange(rangeStart:0xA9, rangeEnd:0xA9),
// NOTE: Don't treat Trademark Sign ® as Jumbomoji.
// EmojiRange(rangeStart:0xAE, rangeEnd:0xAE),
EmojiRange(rangeStart: 0x200D, rangeEnd: 0x200D),
EmojiRange(rangeStart: 0x203C, rangeEnd: 0x203C),
EmojiRange(rangeStart: 0x2049, rangeEnd: 0x2049),
EmojiRange(rangeStart: 0x20D0, rangeEnd: 0x20FF),
EmojiRange(rangeStart: 0x2122, rangeEnd: 0x2122),
EmojiRange(rangeStart: 0x2139, rangeEnd: 0x2139),
EmojiRange(rangeStart: 0x2194, rangeEnd: 0x2199),
EmojiRange(rangeStart: 0x21A9, rangeEnd: 0x21AA),
EmojiRange(rangeStart: 0x231A, rangeEnd: 0x231B),
EmojiRange(rangeStart: 0x2328, rangeEnd: 0x2328),
EmojiRange(rangeStart: 0x2388, rangeEnd: 0x2388),
EmojiRange(rangeStart: 0x23CF, rangeEnd: 0x23CF),
EmojiRange(rangeStart: 0x23E9, rangeEnd: 0x23F3),
EmojiRange(rangeStart: 0x23F8, rangeEnd: 0x23FA),
EmojiRange(rangeStart: 0x24C2, rangeEnd: 0x24C2),
EmojiRange(rangeStart: 0x25AA, rangeEnd: 0x25AB),
EmojiRange(rangeStart: 0x25B6, rangeEnd: 0x25B6),
EmojiRange(rangeStart: 0x25C0, rangeEnd: 0x25C0),
EmojiRange(rangeStart: 0x25FB, rangeEnd: 0x25FE),
EmojiRange(rangeStart: 0x2600, rangeEnd: 0x27BF),
EmojiRange(rangeStart: 0x2934, rangeEnd: 0x2935),
EmojiRange(rangeStart: 0x2B05, rangeEnd: 0x2B07),
EmojiRange(rangeStart: 0x2B1B, rangeEnd: 0x2B1C),
EmojiRange(rangeStart: 0x2B50, rangeEnd: 0x2B50),
EmojiRange(rangeStart: 0x2B55, rangeEnd: 0x2B55),
EmojiRange(rangeStart: 0x3030, rangeEnd: 0x3030),
EmojiRange(rangeStart: 0x303D, rangeEnd: 0x303D),
EmojiRange(rangeStart: 0x3297, rangeEnd: 0x3297),
EmojiRange(rangeStart: 0x3299, rangeEnd: 0x3299),
EmojiRange(rangeStart: 0xFE00, rangeEnd: 0xFE0F),
EmojiRange(rangeStart: 0x1F000, rangeEnd: 0x1F0FF),
EmojiRange(rangeStart: 0x1F10D, rangeEnd: 0x1F10F),
EmojiRange(rangeStart: 0x1F12F, rangeEnd: 0x1F12F),
EmojiRange(rangeStart: 0x1F16C, rangeEnd: 0x1F171),
EmojiRange(rangeStart: 0x1F17E, rangeEnd: 0x1F17F),
EmojiRange(rangeStart: 0x1F18E, rangeEnd: 0x1F18E),
EmojiRange(rangeStart: 0x1F191, rangeEnd: 0x1F19A),
EmojiRange(rangeStart: 0x1F1AD, rangeEnd: 0x1F1FF),
EmojiRange(rangeStart: 0x1F201, rangeEnd: 0x1F20F),
EmojiRange(rangeStart: 0x1F21A, rangeEnd: 0x1F21A),
EmojiRange(rangeStart: 0x1F22F, rangeEnd: 0x1F22F),
EmojiRange(rangeStart: 0x1F232, rangeEnd: 0x1F23A),
EmojiRange(rangeStart: 0x1F23C, rangeEnd: 0x1F23F),
EmojiRange(rangeStart: 0x1F249, rangeEnd: 0x1F64F),
EmojiRange(rangeStart: 0x1F680, rangeEnd: 0x1F6FF),
EmojiRange(rangeStart: 0x1F774, rangeEnd: 0x1F77F),
EmojiRange(rangeStart: 0x1F7D5, rangeEnd: 0x1F7FF),
EmojiRange(rangeStart: 0x1F80C, rangeEnd: 0x1F80F),
EmojiRange(rangeStart: 0x1F848, rangeEnd: 0x1F84F),
EmojiRange(rangeStart: 0x1F85A, rangeEnd: 0x1F85F),
EmojiRange(rangeStart: 0x1F888, rangeEnd: 0x1F88F),
EmojiRange(rangeStart: 0x1F8AE, rangeEnd: 0x1FFFD),
EmojiRange(rangeStart: 0xE0020, rangeEnd: 0xE007F)
]
var isEmoji: Bool {
// Binary search
var left: Int = 0
var right = Int(UnicodeScalar.kEmojiRanges.count - 1)
while true {
let mid = (left + right) / 2
let midRange = UnicodeScalar.kEmojiRanges[mid]
if value < midRange.rangeStart {
if mid == left {
return false
}
right = mid - 1
} else if value > midRange.rangeEnd {
if mid == right {
return false
}
left = mid + 1
} else {
return true
}
}
}
var isZeroWidthJoiner: Bool {
return value == 8205
}
}

View File

@ -35,48 +35,64 @@ public protocol JobExecutor {
}
public final class JobRunner {
private class Trigger {
private var timer: Timer?
private static let blockingQueue: Atomic<JobQueue?> = Atomic(
JobQueue(
type: .blocking,
qos: .userInitiated,
jobVariants: [],
onQueueDrained: {
// Once all blocking jobs have been completed we want to start running
// the remaining job queues
queues.wrappedValue.forEach { _, queue in queue.start() }
}
)
)
private static let queues: Atomic<[Job.Variant: JobQueue]> = {
var jobVariants: Set<Job.Variant> = Job.Variant.allCases.asSet()
static func create(timestamp: TimeInterval) -> Trigger? {
// Setup the trigger (wait at least 1 second before triggering)
let trigger: Trigger = Trigger()
trigger.timer = Timer.scheduledTimer(
timeInterval: max(1, (timestamp - Date().timeIntervalSince1970)),
target: self,
selector: #selector(start),
userInfo: nil,
repeats: false
)
return trigger
}
let messageSendQueue: JobQueue = JobQueue(
type: .messageSend,
qos: .default,
jobVariants: [
jobVariants.remove(.attachmentUpload),
jobVariants.remove(.messageSend),
jobVariants.remove(.notifyPushServer)// TODO: Read receipts
].compactMap { $0 }
)
let messageReceiveQueue: JobQueue = JobQueue(
type: .messageReceive,
qos: .default,
jobVariants: [
jobVariants.remove(.messageReceive)
].compactMap { $0 }
)
let attachmentDownloadQueue: JobQueue = JobQueue(
type: .attachmentDownload,
qos: .utility,
jobVariants: [
jobVariants.remove(.attachmentDownload)
].compactMap { $0 }
)
let generalQueue: JobQueue = JobQueue(
type: .general(number: 0),
qos: .utility,
jobVariants: Array(jobVariants)
)
deinit { timer?.invalidate() }
@objc func start() {
JobRunner.start()
}
}
// TODO: Could this be a bottleneck? (single serial queue to process all these jobs? Group by thread?).
// TODO: Multi-thread support.
private static let queueKey: DispatchSpecificKey = DispatchSpecificKey<String>()
private static let queueContext: String = "JobRunner"
private static let internalQueue: DispatchQueue = {
let result: DispatchQueue = DispatchQueue(label: queueContext)
result.setSpecific(key: queueKey, value: queueContext)
return result
return Atomic([
messageSendQueue,
messageReceiveQueue,
attachmentDownloadQueue,
generalQueue
].reduce(into: [:]) { prev, next in
next.jobVariants.forEach { variant in
prev[variant] = next
}
})
}()
internal static var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:])
private static var nextTrigger: Atomic<Trigger?> = Atomic(nil)
private static var isRunning: Atomic<Bool> = Atomic(false)
private static var jobQueue: Atomic<[Job]> = Atomic([])
private static var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
// MARK: - Configuration
@ -98,20 +114,11 @@ public final class JobRunner {
return
}
// Check if the job should be added to the queue
guard
canStartJob,
updatedJob.behaviour != .runOnceNextLaunch,
updatedJob.nextRunTimestamp <= Date().timeIntervalSince1970
else { return }
jobQueue.mutate { $0.append(updatedJob) }
queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) }
// Start the job runner if needed
db.afterNextTransactionCommit { _ in
if !isRunning.wrappedValue {
start()
}
queues.wrappedValue[updatedJob.variant]?.start()
}
}
@ -122,29 +129,8 @@ public final class JobRunner {
/// is in the future then the job won't be started
public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) {
guard let job: Job = job else { return } // Ignore null jobs
guard let jobId: Int64 = job.id else {
add(db, job: job, canStartJob: canStartJob)
return
}
// Lock the queue while checking the index and inserting to ensure we don't run into
// any multi-threading shenanigans
//
// Note: currently running jobs are removed from the queue so we don't need to check
// the 'jobsCurrentlyRunning' set
var didUpdateExistingJob: Bool = false
jobQueue.mutate { queue in
if let jobIndex: Array<Job>.Index = queue.firstIndex(where: { $0.id == jobId }) {
queue[jobIndex] = job
didUpdateExistingJob = true
}
}
// If we didn't update an existing job then we need to add it to the queue
guard !didUpdateExistingJob else { return }
add(db, job: job, canStartJob: canStartJob)
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
}
@discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> Job? {
@ -162,18 +148,7 @@ public final class JobRunner {
return nil
}
// Insert the job before the current job (re-adding the current job to
// the start of the queue if it's not in there) - this will mean the new
// job will run and then the otherJob will run (or run again) once it's
// done
jobQueue.mutate {
guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else {
$0.insert(contentsOf: [updatedJob, otherJob], at: 0)
return
}
$0.insert(updatedJob, at: otherJobIndex)
}
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
return updatedJob
}
@ -181,85 +156,303 @@ public final class JobRunner {
public static func appDidFinishLaunching() {
// Note: 'appDidBecomeActive' will run on first launch anyway so we can
// leave those jobs out and can wait until then to start the JobRunner
let maybeJobsToRun: [Job]? = GRDBStorage.shared.read { db in
try Job
.filter(
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.recurringOnLaunchBlocking,
Job.Behaviour.recurringOnLaunchBlockingOncePerSession,
Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour)
)
.order(Job.Columns.id)
.fetchAll(db)
}
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = GRDBStorage.shared
.read { db in
let blockingJobs: [Job] = try Job
.filter(
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour)
)
.filter(Job.Columns.shouldBlockFirstRunEachSession == true)
.order(Job.Columns.id)
.fetchAll(db)
let nonblockingJobs: [Job] = try Job
.filter(
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour)
)
.filter(Job.Columns.shouldBlockFirstRunEachSession == false)
.order(Job.Columns.id)
.fetchAll(db)
return (blockingJobs, nonblockingJobs)
}
.defaulting(to: ([], []))
guard let jobsToRun: [Job] = maybeJobsToRun else { return }
guard !jobsToRun.blocking.isEmpty || !jobsToRun.nonBlocking.isEmpty else { return }
jobQueue.mutate {
// Insert any blocking jobs after any existing blocking jobs then add
// the remaining jobs to the end of the queue
let lastBlockingIndex = $0.lastIndex(where: { $0.isBlocking })
.defaulting(to: $0.startIndex.advanced(by: -1))
.advanced(by: 1)
$0.insert(
contentsOf: jobsToRun.filter { $0.isBlocking },
at: lastBlockingIndex
)
$0.append(
contentsOf: jobsToRun.filter { !$0.isBlocking }
)
// Add and start any blocking jobs
blockingQueue.wrappedValue?.appDidFinishLaunching(with: jobsToRun.blocking, canStart: true)
// Add any non-blocking jobs (we don't start these incase there are blocking "on active"
// jobs as well)
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant)
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
jobsByVariant.forEach { variant, jobs in
jobQueues[variant]?.appDidFinishLaunching(with: jobs, canStart: false)
}
}
public static func appDidBecomeActive() {
let maybeJobsToRun: [Job]? = GRDBStorage.shared.read { db in
try Job
.filter(
[
Job.Behaviour.recurringOnActive,
Job.Behaviour.recurringOnActiveBlocking
].contains(Job.Columns.behaviour)
)
.order(Job.Columns.id)
.fetchAll(db)
}
// Note: When becoming active we want to start all non-on-launch blocking jobs as
// long as there are no other jobs already running
let alreadyRunningOtherJobs: Bool = queues.wrappedValue
.contains(where: { _, queue -> Bool in queue.isRunning.wrappedValue })
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = GRDBStorage.shared
.read { db in
guard !alreadyRunningOtherJobs else {
let onActiveJobs: [Job] = try Job
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
.order(Job.Columns.id)
.fetchAll(db)
return ([], onActiveJobs)
}
let blockingJobs: [Job] = try Job
.filter(
Job.Behaviour.allCases
.filter {
$0 != .recurringOnLaunch &&
$0 != .runOnceNextLaunch
}
.contains(Job.Columns.behaviour)
)
.filter(Job.Columns.shouldBlockFirstRunEachSession == true)
.order(Job.Columns.id)
.fetchAll(db)
let nonBlockingJobs: [Job] = try Job
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
.filter(Job.Columns.shouldBlockFirstRunEachSession == false)
.order(Job.Columns.id)
.fetchAll(db)
return (blockingJobs, nonBlockingJobs)
}
.defaulting(to: ([], []))
guard let jobsToRun: [Job] = maybeJobsToRun else { return }
guard !jobsToRun.blocking.isEmpty || !jobsToRun.nonBlocking.isEmpty else { return }
jobQueue.mutate {
// Insert any blocking jobs after any existing blocking jobs then add
// the remaining jobs to the end of the queue
let lastBlockingIndex = $0.lastIndex(where: { $0.isBlocking })
.defaulting(to: $0.startIndex.advanced(by: -1))
.advanced(by: 1)
$0.insert(
contentsOf: jobsToRun.filter { $0.isBlocking },
at: lastBlockingIndex
// Add and start any blocking jobs
blockingQueue.wrappedValue?.appDidFinishLaunching(with: jobsToRun.blocking, canStart: true)
let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant)
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
jobsByVariant.forEach { variant, jobs in
jobQueues[variant]?.appDidBecomeActive(
with: jobs,
canStart: !blockingQueueIsRunning
)
$0.append(
contentsOf: jobsToRun.filter { !$0.isBlocking }
)
}
// Start the job runner if needed
if !isRunning.wrappedValue {
start()
}
}
public static func isCurrentlyRunning(_ job: Job?) -> Bool {
guard let job: Job = job, let jobId: Int64 = job.id else { return false }
return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true)
}
// MARK: - Convenience
fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...
// try 1 delay: 0.5s
// try 2 delay: 1s
// ...
// try 5 delay: 16s
// ...
// try 11 delay: 512s
let maxBackoff: Double = 10 * 60 // 10 minutes
return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount)))
}
}
// MARK: - JobQueue
private final class JobQueue {
fileprivate enum QueueType: Hashable {
case blocking
case general(number: Int)
case messageSend
case messageReceive
case attachmentDownload
var name: String {
switch self {
case .blocking: return "Blocking"
case .general(let number): return "General-\(number)"
case .messageSend: return "MessageSend"
case .messageReceive: return "MessageReceive"
case .attachmentDownload: return "AttachmentDownload"
}
}
}
private class Trigger {
private weak var queue: JobQueue?
private var timer: Timer?
static func create(queue: JobQueue, timestamp: TimeInterval) -> Trigger? {
// Setup the trigger (wait at least 1 second before triggering)
let trigger: Trigger = Trigger()
trigger.queue = queue
trigger.timer = Timer.scheduledTimer(
timeInterval: max(1, (timestamp - Date().timeIntervalSince1970)),
target: self,
selector: #selector(start),
userInfo: nil,
repeats: false
)
return trigger
}
deinit { timer?.invalidate() }
@objc func start() {
queue?.start()
}
}
private let type: QueueType
private let qosClass: DispatchQoS
private let queueKey: DispatchSpecificKey = DispatchSpecificKey<String>()
private let queueContext: String
/// The specific types of jobs this queue manages, if this is left empty it will handle all jobs not handled by other queues
fileprivate let jobVariants: [Job.Variant]
private let onQueueDrained: (() -> ())?
private lazy var internalQueue: DispatchQueue = {
let result: DispatchQueue = DispatchQueue(
label: self.queueContext,
qos: self.qosClass,
attributes: [],
autoreleaseFrequency: .inherit,
target: nil
)
result.setSpecific(key: queueKey, value: queueContext)
return result
}()
private var nextTrigger: Atomic<Trigger?> = Atomic(nil)
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty }
// MARK: - Initialization
init(type: QueueType, qos: DispatchQoS, jobVariants: [Job.Variant], onQueueDrained: (() -> ())? = nil) {
self.type = type
self.queueContext = "JobQueue-\(type.name)"
self.qosClass = qos
self.jobVariants = jobVariants
self.onQueueDrained = onQueueDrained
}
// MARK: - Execution
fileprivate func add(_ job: Job, canStartJob: Bool = true) {
// Check if the job should be added to the queue
guard
canStartJob,
job.behaviour != .runOnceNextLaunch,
job.nextRunTimestamp <= Date().timeIntervalSince1970
else { return }
queue.mutate { $0.append(job) }
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
/// the JobRunner
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
fileprivate func upsert(_ job: Job, canStartJob: Bool = true) {
guard let jobId: Int64 = job.id else {
add(job, canStartJob: canStartJob)
return
}
// Lock the queue while checking the index and inserting to ensure we don't run into
// any multi-threading shenanigans
//
// Note: currently running jobs are removed from the queue so we don't need to check
// the 'jobsCurrentlyRunning' set
var didUpdateExistingJob: Bool = false
queue.mutate { queue in
if let jobIndex: Array<Job>.Index = queue.firstIndex(where: { $0.id == jobId }) {
queue[jobIndex] = job
didUpdateExistingJob = true
}
}
// If we didn't update an existing job then we need to add it to the queue
guard !didUpdateExistingJob else { return }
add(job, canStartJob: canStartJob)
}
fileprivate func insert(_ job: Job, before otherJob: Job) {
// Insert the job before the current job (re-adding the current job to
// the start of the queue if it's not in there) - this will mean the new
// job will run and then the otherJob will run (or run again) once it's
// done
queue.mutate {
guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else {
$0.insert(contentsOf: [job, otherJob], at: 0)
return
}
$0.insert(job, at: otherJobIndex)
}
}
fileprivate func appDidFinishLaunching(with jobs: [Job], canStart: Bool) {
queue.mutate { $0.append(contentsOf: jobs) }
// Start the job runner if needed
if canStart && !isRunning.wrappedValue {
start()
}
}
fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) {
queue.mutate { queue in
// Avoid re-adding jobs to the queue that are already in it (this can
// happen if the user sends the app to the background before the 'onActive'
// jobs and then brings it back to the foreground)
let jobsNotAlreadyInQueue: [Job] = jobs
.filter { job in !queue.contains(where: { $0.id == job.id }) }
queue.append(contentsOf: jobsNotAlreadyInQueue)
}
// Start the job runner if needed
if canStart && !isRunning.wrappedValue {
start()
}
}
fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool {
return jobsCurrentlyRunning.wrappedValue.contains(jobId)
}
// MARK: - Job Running
public static func start() {
fileprivate func start() {
// We only want the JobRunner to run in the main app
guard CurrentAppContext().isMainApp else { return }
guard !isRunning.wrappedValue else { return }
@ -267,25 +460,29 @@ public final class JobRunner {
// The JobRunner runs synchronously we need to ensure this doesn't start
// on the main thread (if it is on the main thread then swap to a different thread)
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async {
start()
}// TODO: Want to have multiple threads for this (attachment download should be separate - do we even use attachment upload anymore???)
internalQueue.async { [weak self] in
self?.start()
}
return
}
// Get any pending jobs
let maybeJobsToRun: [Job]? = GRDBStorage.shared.read { db in
try Job// TODO: Test this
.filterPendingJobs()
let jobsToRun: [Job] = GRDBStorage.shared.read { db in
try Job.filterPendingJobs(variants: jobVariants)
.fetchAll(db)
}
.defaulting(to: [])
// Determine the number of jobs to run
var jobCount: Int = 0
jobQueue.mutate { queue in
queue.mutate { queue in
// Avoid re-adding jobs to the queue that are already in it
let jobsNotAlreadyInQueue: [Job] = jobsToRun
.filter { job in !queue.contains(where: { $0.id == job.id }) }
// Add the jobs to the queue
if let jobsToRun: [Job] = maybeJobsToRun {
if !jobsNotAlreadyInQueue.isEmpty {
queue.append(contentsOf: jobsToRun)
}
@ -301,35 +498,35 @@ public final class JobRunner {
}
// Run the first job in the queue
SNLog("[JobRunner] Starting with (\(jobCount) job\(jobCount != 1 ? "s" : ""))")
SNLog("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))")
runNextJob()
}
private static func runNextJob() {
private func runNextJob() {
// Ensure this is running on the correct queue
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async {
runNextJob()
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
guard let (nextJob, numJobsRemaining): (Job, Int) = jobQueue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else {
guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else {
isRunning.mutate { $0 = false }
scheduleNextSoonestJob()
return
}
guard let jobExecutor: JobExecutor.Type = executorMap.wrappedValue[nextJob.variant] else {
SNLog("[JobRunner] Unable to run \(nextJob.variant) job due to missing executor")
guard let jobExecutor: JobExecutor.Type = JobRunner.executorMap.wrappedValue[nextJob.variant] else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing executor")
handleJobFailed(nextJob, error: JobRunnerError.executorMissing, permanentFailure: true)
return
}
guard !jobExecutor.requiresThreadId || nextJob.threadId != nil else {
SNLog("[JobRunner] Unable to run \(nextJob.variant) job due to missing required threadId")
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required threadId")
handleJobFailed(nextJob, error: JobRunnerError.requiredThreadIdMissing, permanentFailure: true)
return
}
guard !jobExecutor.requiresInteractionId || nextJob.interactionId != nil else {
SNLog("[JobRunner] Unable to run \(nextJob.variant) job due to missing required interactionId")
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required interactionId")
handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true)
return
}
@ -341,24 +538,35 @@ public final class JobRunner {
}
// Check if the next job has any dependencies
let jobDependencies: [Job] = GRDBStorage.shared
.read { db in try nextJob.dependencies.fetchAll(db) }
.defaulting(to: [])
guard jobDependencies.isEmpty else {
SNLog("[JobRunner] Found job with \(jobDependencies.count) dependencies, running those first")
let dependencyInfo: (expectedCount: Int, jobs: [Job]) = GRDBStorage.shared.read { db in
let numExpectedDependencies: Int = try JobDependencies
.filter(JobDependencies.Columns.jobId == nextJob.id)
.fetchCount(db)
let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db)
let jobDependencyIds: [Int64] = jobDependencies
return (numExpectedDependencies, jobDependencies)
}
.defaulting(to: (0, []))
guard dependencyInfo.jobs.count == dependencyInfo.expectedCount else {
SNLog("[JobRunner] \(queueContext) found job with missing dependencies, removing the job")
handleJobFailed(nextJob, error: JobRunnerError.missingDependencies, permanentFailure: true)
return
}
guard dependencyInfo.jobs.isEmpty else {
SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first")
let jobDependencyIds: [Int64] = dependencyInfo.jobs
.compactMap { $0.id }
let jobIdsNotInQueue: Set<Int64> = jobDependencyIds
.asSet()
.subtracting(jobQueue.wrappedValue.compactMap { $0.id })
.subtracting(queue.wrappedValue.compactMap { $0.id })
// If there are dependencies which aren't in the queue we should just append them
guard !jobIdsNotInQueue.isEmpty else {
jobQueue.mutate { queue in
queue.mutate { queue in
queue.append(
contentsOf: jobDependencies
contentsOf: dependencyInfo.jobs
.filter { jobIdsNotInQueue.contains($0.id ?? -1) }
)
queue.append(nextJob)
@ -368,7 +576,7 @@ public final class JobRunner {
}
// Otherwise re-add the current job after it's dependencies
jobQueue.mutate { queue in
queue.mutate { queue in
guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else {
queue.append(nextJob)
return
@ -388,7 +596,7 @@ public final class JobRunner {
nextTrigger.mutate { $0 = nil }
isRunning.mutate { $0 = true }
jobsCurrentlyRunning.mutate { $0 = $0.inserting(nextJob.id) }
SNLog("[JobRunner] Start job (\(numJobsRemaining) remaining)")
SNLog("[JobRunner] \(queueContext) started job (\(numJobsRemaining) remaining)")
jobExecutor.run(
nextJob,
@ -398,41 +606,41 @@ public final class JobRunner {
)
}
private static func scheduleNextSoonestJob() {
let nextJobTimestamp: TimeInterval? = GRDBStorage.shared
.read { db in
try TimeInterval
.fetchOne(
db,
Job
.filterPendingJobs(excludeFutureJobs: false)
.select(.nextRunTimestamp)
)
}
private func scheduleNextSoonestJob() {
let nextJobTimestamp: TimeInterval? = GRDBStorage.shared.read { db in
try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false)
.select(.nextRunTimestamp)
.asRequest(of: TimeInterval.self)
.fetchOne(db)
}
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp else { return }
// If there are no remaining jobs the trigger the 'onQueueDrained' callback and stop
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp else {
self.onQueueDrained?()
return
}
// If the next job isn't scheduled in the future then just restart the JobRunner immediately
let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - Date().timeIntervalSince1970)
guard secondsUntilNextJob > 0 else {
SNLog("[JobRunner] Restarting immediately for job scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")) ago")
SNLog("[JobRunner] Restarting \(queueContext) immediately for job scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")) ago")
internalQueue.async {
JobRunner.start()
internalQueue.async { [weak self] in
self?.start()
}
return
}
// Setup a trigger
SNLog("[JobRunner] Stopping until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s"))")
nextTrigger.mutate { $0 = Trigger.create(timestamp: nextJobTimestamp) }
SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s"))")
nextTrigger.mutate { $0 = Trigger.create(queue: self, timestamp: nextJobTimestamp) }
}
// MARK: - Handling Results
/// This function is called when a job succeeds
private static func handleJobSucceeded(_ job: Job, shouldStop: Bool) {
private func handleJobSucceeded(_ job: Job, shouldStop: Bool) {
switch job.behaviour {
case .runOnce, .runOnceNextLaunch:
GRDBStorage.shared.write { db in
@ -465,73 +673,53 @@ public final class JobRunner {
.with(nextRunTimestamp: (Date().timeIntervalSince1970 + 1))
.saved(db)
}
case .recurringOnLaunchBlockingOncePerSession:
perSessionJobsCompleted.mutate { $0 = $0.inserting(job.id) }
default: break
}
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set and start the next one
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
internalQueue.async {
runNextJob()
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
/// This function is called when a job fails, if it's wasn't a permanent failure then the 'failureCount' for the job will be incremented and it'll
/// be re-run after a retry interval has passed
private static func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
guard GRDBStorage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else {
SNLog("[JobRunner] \(job.variant) job canceled")
SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
internalQueue.async {
runNextJob()
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
switch job.behaviour {
// If a "blocking" job failed then rerun it immediately
case .recurringOnLaunchBlocking, .recurringOnActiveBlocking:
SNLog("[JobRunner] blocking \(job.variant) job failed; retrying immediately")
jobQueue.mutate({ $0.insert(job, at: 0) })
internalQueue.async {
runNextJob()
}
return
// If this is the blocking queue and a "blocking" job failed then rerun it immediately
if self.type == .blocking && job.shouldBlockFirstRunEachSession {
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
queue.mutate { $0.insert(job, at: 0) }
// For "blocking once per session" jobs only rerun it immediately if it hasn't already
// run this session
case .recurringOnLaunchBlockingOncePerSession:
guard !perSessionJobsCompleted.wrappedValue.contains(job.id ?? -1) else { break }
SNLog("[JobRunner] blocking \(job.variant) job failed; retrying immediately")
perSessionJobsCompleted.mutate { $0 = $0.inserting(job.id) }
jobQueue.mutate({ $0.insert(job, at: 0) })
internalQueue.async {
runNextJob()
}
return
default: break
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
// Get the max failure count for the job (a value of '-1' means it will retry indefinitely)
let maxFailureCount: Int = (JobRunner.executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0)
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
GRDBStorage.shared.write { db in
// Get the max failure count for the job (a value of '-1' means it will retry indefinitely)
let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0)
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + getRetryInterval(for: job))
guard
!permanentFailure &&
maxFailureCount >= 0 &&
job.failureCount + 1 < maxFailureCount
else {
SNLog("[JobRunner] \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")")
SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")")
// If the job permanently failed or we have performed all of our retry attempts
// then delete the job (it'll probably never succeed)
@ -539,7 +727,7 @@ public final class JobRunner {
return
}
SNLog("[JobRunner] \(job.variant) job failed; scheduling retry (failure count is \(job.failureCount + 1))")
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; scheduling retry (failure count is \(job.failureCount + 1))")
_ = try job
.with(
@ -566,38 +754,24 @@ public final class JobRunner {
// Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying
// to run dependecies indefinitely
if !dependantJobIds.isEmpty {
jobQueue.mutate { queue in
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
}
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
internalQueue.async {
runNextJob()
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
/// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant
/// on other jobs, and it should automatically manage those dependencies)
private static func handleJobDeferred(_ job: Job) {
private func handleJobDeferred(_ job: Job) {
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
internalQueue.async {
runNextJob()
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
// MARK: - Convenience
private static func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...
// try 1 delay: 0.5s
// try 2 delay: 1s
// ...
// try 5 delay: 16s
// ...
// try 11 delay: 512s
let maxBackoff: Double = 10 * 60 // 10 minutes
return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount)))
}
}

View File

@ -10,4 +10,5 @@ public enum JobRunnerError: Error {
case requiredInteractionIdMissing
case missingRequiredDetails
case missingDependencies
}

View File

@ -1,298 +0,0 @@
//
// Copyright (c) 2019 Open Whisper Systems. All rights reserved.
//
import Foundation
extension UnicodeScalar {
class EmojiRange {
// rangeStart and rangeEnd are inclusive.
let rangeStart: UInt32
let rangeEnd: UInt32
// MARK: Initializers
init(rangeStart: UInt32, rangeEnd: UInt32) {
self.rangeStart = rangeStart
self.rangeEnd = rangeEnd
}
}
// From:
// https://www.unicode.org/Public/emoji/
// Current Version:
// https://www.unicode.org/Public/emoji/6.0/emoji-data.txt
//
// These ranges can be code-generated using:
//
// * Scripts/emoji-data.txt
// * Scripts/emoji_ranges.py
static let kEmojiRanges = [
// NOTE: Don't treat Pound Sign # as Jumbomoji.
// EmojiRange(rangeStart:0x23, rangeEnd:0x23),
// NOTE: Don't treat Asterisk * as Jumbomoji.
// EmojiRange(rangeStart:0x2A, rangeEnd:0x2A),
// NOTE: Don't treat Digits 0..9 as Jumbomoji.
// EmojiRange(rangeStart:0x30, rangeEnd:0x39),
// NOTE: Don't treat Copyright Symbol © as Jumbomoji.
// EmojiRange(rangeStart:0xA9, rangeEnd:0xA9),
// NOTE: Don't treat Trademark Sign ® as Jumbomoji.
// EmojiRange(rangeStart:0xAE, rangeEnd:0xAE),
EmojiRange(rangeStart: 0x200D, rangeEnd: 0x200D),
EmojiRange(rangeStart: 0x203C, rangeEnd: 0x203C),
EmojiRange(rangeStart: 0x2049, rangeEnd: 0x2049),
EmojiRange(rangeStart: 0x20D0, rangeEnd: 0x20FF),
EmojiRange(rangeStart: 0x2122, rangeEnd: 0x2122),
EmojiRange(rangeStart: 0x2139, rangeEnd: 0x2139),
EmojiRange(rangeStart: 0x2194, rangeEnd: 0x2199),
EmojiRange(rangeStart: 0x21A9, rangeEnd: 0x21AA),
EmojiRange(rangeStart: 0x231A, rangeEnd: 0x231B),
EmojiRange(rangeStart: 0x2328, rangeEnd: 0x2328),
EmojiRange(rangeStart: 0x2388, rangeEnd: 0x2388),
EmojiRange(rangeStart: 0x23CF, rangeEnd: 0x23CF),
EmojiRange(rangeStart: 0x23E9, rangeEnd: 0x23F3),
EmojiRange(rangeStart: 0x23F8, rangeEnd: 0x23FA),
EmojiRange(rangeStart: 0x24C2, rangeEnd: 0x24C2),
EmojiRange(rangeStart: 0x25AA, rangeEnd: 0x25AB),
EmojiRange(rangeStart: 0x25B6, rangeEnd: 0x25B6),
EmojiRange(rangeStart: 0x25C0, rangeEnd: 0x25C0),
EmojiRange(rangeStart: 0x25FB, rangeEnd: 0x25FE),
EmojiRange(rangeStart: 0x2600, rangeEnd: 0x27BF),
EmojiRange(rangeStart: 0x2934, rangeEnd: 0x2935),
EmojiRange(rangeStart: 0x2B05, rangeEnd: 0x2B07),
EmojiRange(rangeStart: 0x2B1B, rangeEnd: 0x2B1C),
EmojiRange(rangeStart: 0x2B50, rangeEnd: 0x2B50),
EmojiRange(rangeStart: 0x2B55, rangeEnd: 0x2B55),
EmojiRange(rangeStart: 0x3030, rangeEnd: 0x3030),
EmojiRange(rangeStart: 0x303D, rangeEnd: 0x303D),
EmojiRange(rangeStart: 0x3297, rangeEnd: 0x3297),
EmojiRange(rangeStart: 0x3299, rangeEnd: 0x3299),
EmojiRange(rangeStart: 0xFE00, rangeEnd: 0xFE0F),
EmojiRange(rangeStart: 0x1F000, rangeEnd: 0x1F0FF),
EmojiRange(rangeStart: 0x1F10D, rangeEnd: 0x1F10F),
EmojiRange(rangeStart: 0x1F12F, rangeEnd: 0x1F12F),
EmojiRange(rangeStart: 0x1F16C, rangeEnd: 0x1F171),
EmojiRange(rangeStart: 0x1F17E, rangeEnd: 0x1F17F),
EmojiRange(rangeStart: 0x1F18E, rangeEnd: 0x1F18E),
EmojiRange(rangeStart: 0x1F191, rangeEnd: 0x1F19A),
EmojiRange(rangeStart: 0x1F1AD, rangeEnd: 0x1F1FF),
EmojiRange(rangeStart: 0x1F201, rangeEnd: 0x1F20F),
EmojiRange(rangeStart: 0x1F21A, rangeEnd: 0x1F21A),
EmojiRange(rangeStart: 0x1F22F, rangeEnd: 0x1F22F),
EmojiRange(rangeStart: 0x1F232, rangeEnd: 0x1F23A),
EmojiRange(rangeStart: 0x1F23C, rangeEnd: 0x1F23F),
EmojiRange(rangeStart: 0x1F249, rangeEnd: 0x1F64F),
EmojiRange(rangeStart: 0x1F680, rangeEnd: 0x1F6FF),
EmojiRange(rangeStart: 0x1F774, rangeEnd: 0x1F77F),
EmojiRange(rangeStart: 0x1F7D5, rangeEnd: 0x1F7FF),
EmojiRange(rangeStart: 0x1F80C, rangeEnd: 0x1F80F),
EmojiRange(rangeStart: 0x1F848, rangeEnd: 0x1F84F),
EmojiRange(rangeStart: 0x1F85A, rangeEnd: 0x1F85F),
EmojiRange(rangeStart: 0x1F888, rangeEnd: 0x1F88F),
EmojiRange(rangeStart: 0x1F8AE, rangeEnd: 0x1FFFD),
EmojiRange(rangeStart: 0xE0020, rangeEnd: 0xE007F)
]
var isEmoji: Bool {
// Binary search.
var left: Int = 0
var right = Int(UnicodeScalar.kEmojiRanges.count - 1)
while true {
let mid = (left + right) / 2
let midRange = UnicodeScalar.kEmojiRanges[mid]
if value < midRange.rangeStart {
if mid == left {
return false
}
right = mid - 1
} else if value > midRange.rangeEnd {
if mid == right {
return false
}
left = mid + 1
} else {
return true
}
}
}
var isZeroWidthJoiner: Bool {
return value == 8205
}
}
extension String {
var glyphCount: Int {
let richText = NSAttributedString(string: self)
let line = CTLineCreateWithAttributedString(richText)
return CTLineGetGlyphCount(line)
}
var isSingleEmoji: Bool {
return glyphCount == 1 && containsEmoji
}
var containsEmoji: Bool {
return unicodeScalars.contains { $0.isEmoji }
}
var containsOnlyEmoji: Bool {
return !isEmpty
&& !unicodeScalars.contains(where: {
!$0.isEmoji
&& !$0.isZeroWidthJoiner
})
}
}
@objc public class DisplayableText: NSObject {
@objc public let fullText: String
@objc public let displayText: String
@objc public let isTextTruncated: Bool
@objc public let jumbomojiCount: UInt
@objc
public static let kMaxJumbomojiCount: UInt = 5
// This value is a bit arbitrary since we don't need to be 100% correct about
// rendering "Jumbomoji". It allows us to place an upper bound on worst-case
// performacne.
@objc
public static let kMaxCharactersPerEmojiCount: UInt = 10
// MARK: Initializers
@objc
public init(fullText: String, displayText: String, isTextTruncated: Bool) {
self.fullText = fullText
self.displayText = displayText
self.isTextTruncated = isTextTruncated
self.jumbomojiCount = DisplayableText.jumbomojiCount(in: fullText)
}
// MARK: Emoji
// If the string is...
//
// * Non-empty
// * Only contains emoji
// * Contains <= kMaxJumbomojiCount emoji
//
// ...return the number of emoji (to be treated as "Jumbomoji") in the string.
private class func jumbomojiCount(in string: String) -> UInt {
if string == "" {
return 0
}
if string.count > Int(kMaxJumbomojiCount * kMaxCharactersPerEmojiCount) {
return 0
}
guard string.containsOnlyEmoji else {
return 0
}
let emojiCount = string.glyphCount
if UInt(emojiCount) > kMaxJumbomojiCount {
return 0
}
return UInt(emojiCount)
}
// For perf we use a static linkDetector. It doesn't change and building DataDetectors is
// surprisingly expensive. This should be fine, since NSDataDetector is an NSRegularExpression
// and NSRegularExpressions are thread safe.
private static let linkDetector: NSDataDetector? = {
return try? NSDataDetector(types: NSTextCheckingResult.CheckingType.link.rawValue)
}()
private static let hostRegex: NSRegularExpression? = {
let pattern = "^(?:https?:\\/\\/)?([^:\\/\\s]+)(.*)?$"
return try? NSRegularExpression(pattern: pattern)
}()
@objc
public lazy var shouldAllowLinkification: Bool = {
guard let linkDetector: NSDataDetector = DisplayableText.linkDetector else {
owsFailDebug("linkDetector was unexpectedly nil")
return false
}
func isValidLink(linkText: String) -> Bool {
guard let hostRegex = DisplayableText.hostRegex else {
owsFailDebug("hostRegex was unexpectedly nil")
return false
}
guard let hostText = hostRegex.parseFirstMatch(inText: linkText) else {
owsFailDebug("hostText was unexpectedly nil")
return false
}
let strippedHost = hostText.replacingOccurrences(of: ".", with: "") as NSString
if strippedHost.isOnlyASCII {
return true
} else if strippedHost.hasAnyASCII {
// mix of ascii and non-ascii is invalid
return false
} else {
// IDN
return true
}
}
for match in linkDetector.matches(in: fullText, options: [], range: NSRange(location: 0, length: fullText.utf16.count)) {
guard let matchURL: URL = match.url else {
continue
}
// We extract the exact text from the `fullText` rather than use match.url.host
// because match.url.host actually escapes non-ascii domains into puny-code.
//
// But what we really want is to check the text which will ultimately be presented to
// the user.
let rawTextOfMatch = (fullText as NSString).substring(with: match.range)
guard isValidLink(linkText: rawTextOfMatch) else {
return false
}
}
return true
}()
// MARK: Filter Methods
@objc
public class func filterNotificationText(_ text: String?) -> String? {
guard let text = text?.filterStringForDisplay() else {
return nil
}
// iOS strips anything that looks like a printf formatting character from
// the notification body, so if we want to dispay a literal "%" in a notification
// it must be escaped.
// see https://developer.apple.com/documentation/uikit/uilocalnotification/1616646-alertbody
// for more details.
return text.replacingOccurrences(of: "%", with: "%%")
}
@objc
public class func displayableText(_ rawText: String) -> DisplayableText {
// Only show up to N characters of text.
let kMaxTextDisplayLength = 512
let fullText = rawText.filterStringForDisplay()
var isTextTruncated = false
var displayText = fullText
if displayText.count > kMaxTextDisplayLength {
// Trim whitespace before _AND_ after slicing the snipper from the string.
let snippet = String(displayText.prefix(kMaxTextDisplayLength)).ows_stripped()
displayText = String(format: NSLocalizedString("OVERSIZE_TEXT_DISPLAY_FORMAT", comment:
"A display format for oversize text messages."),
snippet)
isTextTruncated = true
}
let displayableText = DisplayableText(fullText: fullText, displayText: displayText, isTextTruncated: isTextTruncated)
return displayableText
}
}