diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 9700ce559..f9cf8928f 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -4139,21 +4139,22 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC progressUpdates := make(chan *discord.ImportProgress) done := make(chan struct{}) - cancel := make(chan string) + cancel := make(chan []string) var newChat *Chat - m.startPublishImportProgressInterval(progressUpdates, cancel, done) + m.startPublishImportChannelProgressInterval(progressUpdates, cancel, done) importProgress := &discord.ImportProgress{} importProgress.Init(totalImportChunkCount, []discord.ImportTask{ discord.ChannelsCreationTask, discord.ImportMessagesTask, discord.DownloadAssetsTask, + discord.InitCommunityTask, }) - importProgress.CommunityID = request.DiscordChannelID - importProgress.CommunityName = request.Name + importProgress.ChannelID = request.DiscordChannelID + importProgress.ChannelName = request.Name // initial progress immediately if err := request.Validate(); err != nil { @@ -4161,14 +4162,14 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } // Here's 3 steps: Find the corrent channel in files, get the community and create the channel - progressValue := calculateProgress(3, 1, (float32(1) / 3)) + progressValue := float32(0.3) - m.publishImportProgress(importProgress) + m.publishChannelImportProgress(importProgress) community, err := m.GetCommunityByID(request.CommunityID) @@ -4177,7 +4178,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } @@ -4186,7 +4187,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } @@ -4203,7 +4204,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC } importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } @@ -4224,15 +4225,22 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } } - progressValue := calculateProgress(3, 2, (float32(2) / 3)) + progressValue := float32(0.6) importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) progressUpdates <- importProgress + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} + return + } + if len(channel.Channel.ID) == 0 { // skip this file and try to find in the next file continue @@ -4266,12 +4274,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC errmsg := err.Error() if errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + fmt.Println(errmsg) } importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} return } @@ -4280,16 +4289,23 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC newChat = CreateCommunityChat(request.CommunityID.String(), chatID, chat, m.getTimesource()) } - progressValue = calculateProgress(3, 3, (float32(3) / 3)) + progressValue = float32(1.0) importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) progressUpdates <- importProgress + } else { + // When channel with current discord id already exist we should skip import + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error("Channel already imported to this community")) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID} + return } if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4316,15 +4332,14 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID - + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4360,7 +4375,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4368,7 +4383,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4394,7 +4409,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4402,9 +4417,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID - cancel <- request.DiscordChannelID - + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } } @@ -4428,7 +4441,6 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC discord.Warning(errmsg), ) progressUpdates <- importProgress - cancel <- request.DiscordChannelID return } @@ -4437,7 +4449,6 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if err != nil { importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) progressUpdates <- importProgress - cancel <- request.DiscordChannelID return } @@ -4448,7 +4459,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.DownloadAssetsTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4464,7 +4475,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.DownloadAssetsTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4495,7 +4506,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.DownloadAssetsTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4511,7 +4522,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.DownloadAssetsTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4522,19 +4533,17 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) err := m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { - m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) - importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) importProgress.Stop() progressUpdates <- importProgress - cancel <- request.DiscordChannelID - return + continue } if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.DownloadAssetsTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4612,10 +4621,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) } } + + importProgress.UpdateTaskProgress(discord.InitCommunityTask, float32(0.0)) + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { importProgress.StopTask(discord.InitCommunityTask) progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID} return } @@ -4628,11 +4640,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) importProgress.Stop() progressUpdates <- importProgress - cancel <- request.DiscordChannelID + cancel <- []string{string(request.CommunityID), request.DiscordChannelID} return } - m.config.messengerSignalsHandler.DiscordCommunityImportFinished(request.DiscordChannelID) + importProgress.UpdateTaskProgress(discord.InitCommunityTask, float32(1.0)) + + m.config.messengerSignalsHandler.DiscordChannelImportFinished(string(request.CommunityID), newChat.ID) close(done) }() } @@ -5525,13 +5539,7 @@ func (m *Messenger) cleanUpImport(communityID string) { } func (m *Messenger) cleanUpImportChannel(communityID string, channelID string) { - community, err := m.communitiesManager.GetByIDString(communityID) - if err != nil { - m.logger.Error("clean up failed, couldn't delete community", zap.Error(err)) - return - } - - _, err = community.DeleteChat(channelID) + _, err := m.DeleteCommunityChat(types.HexBytes(communityID), channelID) if err != nil { m.logger.Error("clean up failed, couldn't delete community chat", zap.Error(err)) return @@ -5548,6 +5556,10 @@ func (m *Messenger) publishImportProgress(progress *discord.ImportProgress) { m.config.messengerSignalsHandler.DiscordCommunityImportProgress(progress) } +func (m *Messenger) publishChannelImportProgress(progress *discord.ImportProgress) { + m.config.messengerSignalsHandler.DiscordChannelImportProgress(progress) +} + func (m *Messenger) startPublishImportProgressInterval(c chan *discord.ImportProgress, cancel chan string, done chan struct{}) { var currentProgress *discord.ImportProgress @@ -5587,6 +5599,50 @@ func (m *Messenger) startPublishImportProgressInterval(c chan *discord.ImportPro }() } +func (m *Messenger) startPublishImportChannelProgressInterval(c chan *discord.ImportProgress, cancel chan []string, done chan struct{}) { + + var currentProgress *discord.ImportProgress + + go func() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if currentProgress != nil { + m.publishChannelImportProgress(currentProgress) + if currentProgress.Stopped { + return + } + } + case progressUpdate := <-c: + currentProgress = progressUpdate + case <-done: + if currentProgress != nil { + m.publishChannelImportProgress(currentProgress) + } + return + case ids := <-cancel: + if currentProgress != nil { + m.publishImportProgress(currentProgress) + } + if len(ids) > 0 { + communityID := ids[0] + channelID := ids[1] + discordChannelID := ids[2] + m.cleanUpImportChannel(communityID, channelID) + m.config.messengerSignalsHandler.DiscordChannelImportCancelled(discordChannelID) + } + return + case <-m.quit: + m.cleanUpImports() + return + } + } + }() +} + func (m *Messenger) pinMessagesToWakuMessages(pinMessages []*common.PinMessage, c *communities.Community) ([]*types.Message, error) { wakuMessages := make([]*types.Message, 0) for _, msg := range pinMessages { diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index f268f46e7..e63e6aee8 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -55,6 +55,9 @@ type MessengerSignalsHandler interface { DiscordCommunityImportProgress(importProgress *discord.ImportProgress) DiscordCommunityImportFinished(communityID string) DiscordCommunityImportCancelled(communityID string) + DiscordChannelImportProgress(importProgress *discord.ImportProgress) + DiscordChannelImportFinished(communityID string, channelID string) + DiscordChannelImportCancelled(channelID string) SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse) SendWakuBackedUpProfile(response *wakusync.WakuBackedUpDataResponse) SendWakuBackedUpSettings(response *wakusync.WakuBackedUpDataResponse) diff --git a/services/ext/signal.go b/services/ext/signal.go index 31e1d8672..7ed07bc20 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -137,14 +137,26 @@ func (m *MessengerSignalsHandler) DiscordCommunityImportProgress(importProgress signal.SendDiscordCommunityImportProgress(importProgress) } +func (m *MessengerSignalsHandler) DiscordChannelImportProgress(importProgress *discord.ImportProgress) { + signal.SendDiscordChannelImportProgress(importProgress) +} + func (m *MessengerSignalsHandler) DiscordCommunityImportFinished(id string) { signal.SendDiscordCommunityImportFinished(id) } +func (m *MessengerSignalsHandler) DiscordChannelImportFinished(communityID string, channelID string) { + signal.SendDiscordChannelImportFinished(communityID, channelID) +} + func (m *MessengerSignalsHandler) DiscordCommunityImportCancelled(id string) { signal.SendDiscordCommunityImportCancelled(id) } +func (m *MessengerSignalsHandler) DiscordChannelImportCancelled(id string) { + signal.SendDiscordChannelImportCancelled(id) +} + func (m *MessengerSignalsHandler) SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse) { signal.SendWakuFetchingBackupProgress(response) } diff --git a/signal/events_discord_import.go b/signal/events_discord_import.go index 92ce7e1b1..3ad5b7dbd 100644 --- a/signal/events_discord_import.go +++ b/signal/events_discord_import.go @@ -21,6 +21,18 @@ const ( // EventDiscordCommunityImportCancelled triggered when importing // the discord community was cancelled EventDiscordCommunityImportCancelled = "community.discordCommunityImportCancelled" + + // EventDiscordChannelImportProgress is triggered during the import + // of a discord community channel as it progresses + EventDiscordChannelImportProgress = "community.discordChannelImportProgress" + + // EventDiscordChannelImportFinished triggered when importing + // the discord community channel into status was successful + EventDiscordChannelImportFinished = "community.discordChannelImportFinished" + + // EventDiscordChannelImportCancelled triggered when importing + // the discord community channel was cancelled + EventDiscordChannelImportCancelled = "community.discordChannelImportCancelled" ) type DiscordCategoriesAndChannelsExtractedSignal struct { @@ -42,6 +54,19 @@ type DiscordCommunityImportCancelledSignal struct { CommunityID string `json:"communityId"` } +type DiscordChannelImportProgressSignal struct { + ImportProgress *discord.ImportProgress `json:"importProgress"` +} + +type DiscordChannelImportFinishedSignal struct { + CommunityID string `json:"communityId"` + ChannelID string `json:"channelId"` +} + +type DiscordChannelImportCancelledSignal struct { + ChannelID string `json:"channelId"` +} + func SendDiscordCategoriesAndChannelsExtracted(categories []*discord.Category, channels []*discord.Channel, oldestMessageTimestamp int64, errors map[string]*discord.ImportError) { send(EventDiscordCategoriesAndChannelsExtracted, DiscordCategoriesAndChannelsExtractedSignal{ Categories: categories, @@ -57,14 +82,33 @@ func SendDiscordCommunityImportProgress(importProgress *discord.ImportProgress) }) } +func SendDiscordChannelImportProgress(importProgress *discord.ImportProgress) { + send(EventDiscordChannelImportProgress, DiscordChannelImportProgressSignal{ + ImportProgress: importProgress, + }) +} + func SendDiscordCommunityImportFinished(communityID string) { send(EventDiscordCommunityImportFinished, DiscordCommunityImportFinishedSignal{ CommunityID: communityID, }) } +func SendDiscordChannelImportFinished(communityID string, channelID string) { + send(EventDiscordChannelImportFinished, DiscordChannelImportFinishedSignal{ + CommunityID: communityID, + ChannelID: channelID, + }) +} + func SendDiscordCommunityImportCancelled(communityID string) { send(EventDiscordCommunityImportCancelled, DiscordCommunityImportCancelledSignal{ CommunityID: communityID, }) } + +func SendDiscordChannelImportCancelled(channelID string) { + send(EventDiscordChannelImportCancelled, DiscordChannelImportCancelledSignal{ + ChannelID: channelID, + }) +}