[bugfix] Invalidate timeline entries for status when stats change (#1879)
This commit is contained in:
parent
84e1c7a7c4
commit
5e2897e35c
12 changed files with 531 additions and 130 deletions
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/superseriousbusiness/gotosocial/internal/db"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/state"
|
||||
|
@ -145,6 +146,48 @@ func (s *statusFaveDB) GetStatusFavesForStatus(ctx context.Context, statusID str
|
|||
return faves, nil
|
||||
}
|
||||
|
||||
func (s *statusFaveDB) PopulateStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) error {
|
||||
var (
|
||||
err error
|
||||
errs = make(gtserror.MultiError, 0, 3)
|
||||
)
|
||||
|
||||
if statusFave.Account == nil {
|
||||
// StatusFave author is not set, fetch from database.
|
||||
statusFave.Account, err = s.state.DB.GetAccountByID(
|
||||
gtscontext.SetBarebones(ctx),
|
||||
statusFave.AccountID,
|
||||
)
|
||||
if err != nil {
|
||||
errs.Append(fmt.Errorf("error populating status fave author: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if statusFave.TargetAccount == nil {
|
||||
// StatusFave target account is not set, fetch from database.
|
||||
statusFave.TargetAccount, err = s.state.DB.GetAccountByID(
|
||||
gtscontext.SetBarebones(ctx),
|
||||
statusFave.TargetAccountID,
|
||||
)
|
||||
if err != nil {
|
||||
errs.Append(fmt.Errorf("error populating status fave target account: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if statusFave.Status == nil {
|
||||
// StatusFave status is not set, fetch from database.
|
||||
statusFave.Status, err = s.state.DB.GetStatusByID(
|
||||
gtscontext.SetBarebones(ctx),
|
||||
statusFave.StatusID,
|
||||
)
|
||||
if err != nil {
|
||||
errs.Append(fmt.Errorf("error populating status fave status: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
return errs.Combine()
|
||||
}
|
||||
|
||||
func (s *statusFaveDB) PutStatusFave(ctx context.Context, fave *gtsmodel.StatusFave) db.Error {
|
||||
return s.state.Caches.GTS.StatusFave().Store(fave, func() error {
|
||||
_, err := s.conn.
|
||||
|
|
|
@ -35,6 +35,9 @@ type StatusFave interface {
|
|||
// This slice will be unfiltered, not taking account of blocks and whatnot, so filter it before serving it back to a user.
|
||||
GetStatusFavesForStatus(ctx context.Context, statusID string) ([]*gtsmodel.StatusFave, Error)
|
||||
|
||||
// PopulateStatusFave ensures that all sub-models of a fave are populated (account, status, etc).
|
||||
PopulateStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) error
|
||||
|
||||
// PutStatusFave inserts the given statusFave into the database.
|
||||
PutStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) Error
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/superseriousbusiness/activity/pub"
|
||||
"github.com/superseriousbusiness/activity/streams"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/ap"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
||||
|
@ -157,14 +158,24 @@ func (p *Processor) processCreateAccountFromClientAPI(ctx context.Context, clien
|
|||
func (p *Processor) processCreateStatusFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
status, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("note was not parseable as *gtsmodel.Status")
|
||||
return gtserror.New("status was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
if err := p.timelineAndNotifyStatus(ctx, status); err != nil {
|
||||
return err
|
||||
return gtserror.Newf("error timelining status: %w", err)
|
||||
}
|
||||
|
||||
return p.federateStatus(ctx, status)
|
||||
if status.InReplyToID != "" {
|
||||
// Interaction counts changed on the replied status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.InReplyToID)
|
||||
}
|
||||
|
||||
if err := p.federateStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("error federating status: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processCreateFollowRequestFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
|
@ -181,33 +192,50 @@ func (p *Processor) processCreateFollowRequestFromClientAPI(ctx context.Context,
|
|||
}
|
||||
|
||||
func (p *Processor) processCreateFaveFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
fave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
statusFave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
if !ok {
|
||||
return errors.New("fave was not parseable as *gtsmodel.StatusFave")
|
||||
return gtserror.New("statusFave was not parseable as *gtsmodel.StatusFave")
|
||||
}
|
||||
|
||||
if err := p.notifyFave(ctx, fave); err != nil {
|
||||
return err
|
||||
if err := p.notifyFave(ctx, statusFave); err != nil {
|
||||
return gtserror.Newf("error notifying status fave: %w", err)
|
||||
}
|
||||
|
||||
return p.federateFave(ctx, fave, clientMsg.OriginAccount, clientMsg.TargetAccount)
|
||||
// Interaction counts changed on the faved status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
|
||||
|
||||
if err := p.federateFave(ctx, statusFave, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil {
|
||||
return gtserror.Newf("error federating status fave: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processCreateAnnounceFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
boostWrapperStatus, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
status, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("boost was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
if err := p.timelineAndNotifyStatus(ctx, boostWrapperStatus); err != nil {
|
||||
return err
|
||||
// Timeline and notify.
|
||||
if err := p.timelineAndNotifyStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("error timelining boost: %w", err)
|
||||
}
|
||||
|
||||
if err := p.notifyAnnounce(ctx, boostWrapperStatus); err != nil {
|
||||
return err
|
||||
if err := p.notifyAnnounce(ctx, status); err != nil {
|
||||
return gtserror.Newf("error notifying boost: %w", err)
|
||||
}
|
||||
|
||||
return p.federateAnnounce(ctx, boostWrapperStatus, clientMsg.OriginAccount, clientMsg.TargetAccount)
|
||||
// Interaction counts changed on the boosted status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.BoostOfID)
|
||||
|
||||
if err := p.federateAnnounce(ctx, status, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil {
|
||||
return gtserror.Newf("error federating boost: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processCreateBlockFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
|
@ -293,50 +321,76 @@ func (p *Processor) processUndoBlockFromClientAPI(ctx context.Context, clientMsg
|
|||
}
|
||||
|
||||
func (p *Processor) processUndoFaveFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
fave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
statusFave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
if !ok {
|
||||
return errors.New("undo was not parseable as *gtsmodel.StatusFave")
|
||||
return gtserror.New("statusFave was not parseable as *gtsmodel.StatusFave")
|
||||
}
|
||||
return p.federateUnfave(ctx, fave, clientMsg.OriginAccount, clientMsg.TargetAccount)
|
||||
|
||||
// Interaction counts changed on the faved status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
|
||||
|
||||
if err := p.federateUnfave(ctx, statusFave, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil {
|
||||
return gtserror.Newf("error federating status unfave: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processUndoAnnounceFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
boost, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
status, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("undo was not parseable as *gtsmodel.Status")
|
||||
return errors.New("boost was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
if err := p.state.DB.DeleteStatusByID(ctx, boost.ID); err != nil {
|
||||
return err
|
||||
if err := p.state.DB.DeleteStatusByID(ctx, status.ID); err != nil {
|
||||
return gtserror.Newf("db error deleting boost: %w", err)
|
||||
}
|
||||
|
||||
if err := p.deleteStatusFromTimelines(ctx, boost); err != nil {
|
||||
return err
|
||||
if err := p.deleteStatusFromTimelines(ctx, status.ID); err != nil {
|
||||
return gtserror.Newf("error removing boost from timelines: %w", err)
|
||||
}
|
||||
|
||||
return p.federateUnannounce(ctx, boost, clientMsg.OriginAccount, clientMsg.TargetAccount)
|
||||
// Interaction counts changed on the boosted status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.BoostOfID)
|
||||
|
||||
if err := p.federateUnannounce(ctx, status, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil {
|
||||
return gtserror.Newf("error federating status unboost: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processDeleteStatusFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
statusToDelete, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
status, ok := clientMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("note was not parseable as *gtsmodel.Status")
|
||||
return gtserror.New("status was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
if statusToDelete.Account == nil {
|
||||
statusToDelete.Account = clientMsg.OriginAccount
|
||||
if err := p.state.DB.PopulateStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("db error populating status: %w", err)
|
||||
}
|
||||
|
||||
// don't delete attachments, just unattach them;
|
||||
// since this request comes from the client API
|
||||
// and the poster might want to use the attachments
|
||||
// again in a new post
|
||||
// Don't delete attachments, just unattach them: this
|
||||
// request comes from the client API and the poster
|
||||
// may want to use attachments again in a new post.
|
||||
deleteAttachments := false
|
||||
if err := p.wipeStatus(ctx, statusToDelete, deleteAttachments); err != nil {
|
||||
return err
|
||||
if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil {
|
||||
return gtserror.Newf("error wiping status: %w", err)
|
||||
}
|
||||
|
||||
return p.federateStatusDelete(ctx, statusToDelete)
|
||||
if status.InReplyToID != "" {
|
||||
// Interaction counts changed on the replied status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.InReplyToID)
|
||||
}
|
||||
|
||||
if err := p.federateStatusDelete(ctx, status); err != nil {
|
||||
return gtserror.Newf("error federating status delete: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processDeleteAccountFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error {
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/id"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/stream"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/timeline"
|
||||
)
|
||||
|
@ -419,7 +420,7 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta
|
|||
// delete all boosts for this status + remove them from timelines
|
||||
if boosts, err := p.state.DB.GetStatusReblogs(ctx, statusToDelete); err == nil {
|
||||
for _, b := range boosts {
|
||||
if err := p.deleteStatusFromTimelines(ctx, b); err != nil {
|
||||
if err := p.deleteStatusFromTimelines(ctx, b.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.state.DB.DeleteStatusByID(ctx, b.ID); err != nil {
|
||||
|
@ -429,7 +430,7 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta
|
|||
}
|
||||
|
||||
// delete this status from any and all timelines
|
||||
if err := p.deleteStatusFromTimelines(ctx, statusToDelete); err != nil {
|
||||
if err := p.deleteStatusFromTimelines(ctx, statusToDelete.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -439,16 +440,36 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta
|
|||
|
||||
// deleteStatusFromTimelines completely removes the given status from all timelines.
|
||||
// It will also stream deletion of the status to all open streams.
|
||||
func (p *Processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error {
|
||||
if err := p.state.Timelines.Home.WipeItemFromAllTimelines(ctx, status.ID); err != nil {
|
||||
func (p *Processor) deleteStatusFromTimelines(ctx context.Context, statusID string) error {
|
||||
if err := p.state.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.state.Timelines.List.WipeItemFromAllTimelines(ctx, status.ID); err != nil {
|
||||
if err := p.state.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.stream.Delete(status.ID)
|
||||
return p.stream.Delete(statusID)
|
||||
}
|
||||
|
||||
// invalidateStatusFromTimelines does cache invalidation on the given status by
|
||||
// unpreparing it from all timelines, forcing it to be prepared again (with updated
|
||||
// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
|
||||
// both for the status itself, and for any boosts of the status.
|
||||
func (p *Processor) invalidateStatusFromTimelines(ctx context.Context, statusID string) {
|
||||
if err := p.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
|
||||
log.
|
||||
WithContext(ctx).
|
||||
WithField("statusID", statusID).
|
||||
Errorf("error unpreparing status from home timelines: %v", err)
|
||||
}
|
||||
|
||||
if err := p.state.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
|
||||
log.
|
||||
WithContext(ctx).
|
||||
WithField("statusID", statusID).
|
||||
Errorf("error unpreparing status from list timelines: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"codeberg.org/gruf/go-kv"
|
||||
"codeberg.org/gruf/go-logger/v2/level"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/ap"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/id"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
|
@ -120,7 +121,7 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa
|
|||
|
||||
// there's a gts model already pinned to the message, it should be a status
|
||||
if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok {
|
||||
return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status")
|
||||
return gtserror.New("Note was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
// Since this was a create originating AP object
|
||||
|
@ -140,7 +141,7 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa
|
|||
} else {
|
||||
// no model pinned, we need to dereference based on the IRI
|
||||
if federatorMsg.APIri == nil {
|
||||
return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference")
|
||||
return gtserror.New("status was not pinned to federatorMsg, and neither was an IRI for us to dereference")
|
||||
}
|
||||
|
||||
status, _, err = p.federator.GetStatusByURI(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri)
|
||||
|
@ -167,44 +168,35 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa
|
|||
}
|
||||
}
|
||||
|
||||
return p.timelineAndNotifyStatus(ctx, status)
|
||||
if status.InReplyToID != "" {
|
||||
// Interaction counts changed on the replied status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.InReplyToID)
|
||||
}
|
||||
|
||||
// processCreateFaveFromFederator handles Activity Create and Object Like
|
||||
if err := p.timelineAndNotifyStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("error timelining status: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processCreateFaveFromFederator handles Activity Create with Object Like.
|
||||
func (p *Processor) processCreateFaveFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
|
||||
incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
statusFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
|
||||
if !ok {
|
||||
return errors.New("like was not parseable as *gtsmodel.StatusFave")
|
||||
return gtserror.New("Like was not parseable as *gtsmodel.StatusFave")
|
||||
}
|
||||
|
||||
// make sure the account is pinned
|
||||
if incomingFave.Account == nil {
|
||||
a, err := p.state.DB.GetAccountByID(ctx, incomingFave.AccountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
incomingFave.Account = a
|
||||
if err := p.notifyFave(ctx, statusFave); err != nil {
|
||||
return gtserror.Newf("error notifying status fave: %w", err)
|
||||
}
|
||||
|
||||
// Get the remote account to make sure the avi and header are cached.
|
||||
if incomingFave.Account.Domain != "" {
|
||||
remoteAccountID, err := url.Parse(incomingFave.Account.URI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Interaction counts changed on the faved status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
|
||||
|
||||
a, _, err := p.federator.GetAccountByURI(ctx,
|
||||
federatorMsg.ReceivingAccount.Username,
|
||||
remoteAccountID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
incomingFave.Account = a
|
||||
}
|
||||
|
||||
return p.notifyFave(ctx, incomingFave)
|
||||
return nil
|
||||
}
|
||||
|
||||
// processCreateFollowRequestFromFederator handles Activity Create and Object Follow
|
||||
|
@ -267,59 +259,43 @@ func (p *Processor) processCreateFollowRequestFromFederator(ctx context.Context,
|
|||
return p.notifyFollow(ctx, follow, followRequest.TargetAccount)
|
||||
}
|
||||
|
||||
// processCreateAnnounceFromFederator handles Activity Create and Object Announce
|
||||
// processCreateAnnounceFromFederator handles Activity Create with Object Announce.
|
||||
func (p *Processor) processCreateAnnounceFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
|
||||
incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
|
||||
status, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("announce was not parseable as *gtsmodel.Status")
|
||||
return gtserror.New("Announce was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
// make sure the account is pinned
|
||||
if incomingAnnounce.Account == nil {
|
||||
a, err := p.state.DB.GetAccountByID(ctx, incomingAnnounce.AccountID)
|
||||
// Dereference status that this status boosts.
|
||||
if err := p.federator.DereferenceAnnounce(ctx, status, federatorMsg.ReceivingAccount.Username); err != nil {
|
||||
return gtserror.Newf("error dereferencing announce: %w", err)
|
||||
}
|
||||
|
||||
// Generate an ID for the boost wrapper status.
|
||||
statusID, err := id.NewULIDFromTime(status.CreatedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
return gtserror.Newf("error generating id: %w", err)
|
||||
}
|
||||
incomingAnnounce.Account = a
|
||||
status.ID = statusID
|
||||
|
||||
// Store, timeline, and notify.
|
||||
if err := p.state.DB.PutStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("db error inserting status: %w", err)
|
||||
}
|
||||
|
||||
// Get the remote account to make sure the avi and header are cached.
|
||||
if incomingAnnounce.Account.Domain != "" {
|
||||
remoteAccountID, err := url.Parse(incomingAnnounce.Account.URI)
|
||||
if err != nil {
|
||||
return err
|
||||
if err := p.timelineAndNotifyStatus(ctx, status); err != nil {
|
||||
return gtserror.Newf("error timelining status: %w", err)
|
||||
}
|
||||
|
||||
a, _, err := p.federator.GetAccountByURI(ctx,
|
||||
federatorMsg.ReceivingAccount.Username,
|
||||
remoteAccountID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
if err := p.notifyAnnounce(ctx, status); err != nil {
|
||||
return gtserror.Newf("error notifying status: %w", err)
|
||||
}
|
||||
|
||||
incomingAnnounce.Account = a
|
||||
}
|
||||
// Interaction counts changed on the boosted status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.ID)
|
||||
|
||||
if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
|
||||
return fmt.Errorf("error dereferencing announce from federator: %s", err)
|
||||
}
|
||||
|
||||
incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
incomingAnnounce.ID = incomingAnnounceID
|
||||
|
||||
if err := p.state.DB.PutStatus(ctx, incomingAnnounce); err != nil {
|
||||
return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
|
||||
}
|
||||
|
||||
if err := p.timelineAndNotifyStatus(ctx, incomingAnnounce); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.notifyAnnounce(ctx, incomingAnnounce)
|
||||
return nil
|
||||
}
|
||||
|
||||
// processCreateBlockFromFederator handles Activity Create and Object Block
|
||||
|
@ -384,16 +360,26 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder
|
|||
|
||||
// processDeleteStatusFromFederator handles Activity Delete and Object Note
|
||||
func (p *Processor) processDeleteStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
|
||||
statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
|
||||
status, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
|
||||
if !ok {
|
||||
return errors.New("note was not parseable as *gtsmodel.Status")
|
||||
return errors.New("Note was not parseable as *gtsmodel.Status")
|
||||
}
|
||||
|
||||
// delete attachments from this status since this request
|
||||
// Delete attachments from this status, since this request
|
||||
// comes from the federating API, and there's no way the
|
||||
// poster can do a delete + redraft for it on our instance
|
||||
// poster can do a delete + redraft for it on our instance.
|
||||
deleteAttachments := true
|
||||
return p.wipeStatus(ctx, statusToDelete, deleteAttachments)
|
||||
if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil {
|
||||
return gtserror.Newf("error wiping status: %w", err)
|
||||
}
|
||||
|
||||
if status.InReplyToID != "" {
|
||||
// Interaction counts changed on the replied status;
|
||||
// uncache the prepared version from all timelines.
|
||||
p.invalidateStatusFromTimelines(ctx, status.InReplyToID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processDeleteAccountFromFederator handles Activity Delete and Object Profile
|
||||
|
|
|
@ -53,7 +53,12 @@ func (p *Processor) BookmarkCreate(ctx context.Context, requestingAccount *gtsmo
|
|||
}
|
||||
|
||||
if err := p.state.DB.PutStatusBookmark(ctx, gtsBookmark); err != nil {
|
||||
err = fmt.Errorf("BookmarkCreate: error putting bookmark in database: %w", err)
|
||||
err = gtserror.Newf("error putting bookmark in database: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil {
|
||||
err = gtserror.Newf("error invalidating status from timelines: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
|
@ -74,7 +79,12 @@ func (p *Processor) BookmarkRemove(ctx context.Context, requestingAccount *gtsmo
|
|||
|
||||
// We have a bookmark to remove.
|
||||
if err := p.state.DB.DeleteStatusBookmark(ctx, existingBookmarkID); err != nil {
|
||||
err = fmt.Errorf("BookmarkRemove: error removing status bookmark: %w", err)
|
||||
err = gtserror.Newf("error removing status bookmark: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil {
|
||||
err = gtserror.Newf("error invalidating status from timelines: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -21,15 +21,17 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"codeberg.org/gruf/go-kv"
|
||||
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
)
|
||||
|
||||
func (p *Processor) apiStatus(ctx context.Context, targetStatus *gtsmodel.Status, requestingAccount *gtsmodel.Account) (*apimodel.Status, gtserror.WithCode) {
|
||||
apiStatus, err := p.tc.StatusToAPIStatus(ctx, targetStatus, requestingAccount)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error converting status %s to frontend representation: %w", targetStatus.ID, err)
|
||||
err = gtserror.Newf("error converting status %s to frontend representation: %w", targetStatus.ID, err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
|
@ -66,3 +68,36 @@ func (p *Processor) getVisibleStatus(ctx context.Context, requestingAccount *gts
|
|||
|
||||
return targetStatus, nil
|
||||
}
|
||||
|
||||
// invalidateStatus is a shortcut function for invalidating the prepared/cached
|
||||
// representation one status in the home timeline and all list timelines of the
|
||||
// given accountID. It should only be called in cases where a status update
|
||||
// does *not* need to be passed into the processor via the worker queue, since
|
||||
// such invalidation will, in that case, be handled by the processor instead.
|
||||
func (p *Processor) invalidateStatus(ctx context.Context, accountID string, statusID string) error {
|
||||
// Get lists first + bail if this fails.
|
||||
lists, err := p.state.DB.GetListsForAccountID(ctx, accountID)
|
||||
if err != nil {
|
||||
return gtserror.Newf("db error getting lists for account %s: %w", accountID, err)
|
||||
}
|
||||
|
||||
l := log.WithContext(ctx).WithFields(kv.Fields{
|
||||
{"accountID", accountID},
|
||||
{"statusID", statusID},
|
||||
}...)
|
||||
|
||||
// Unprepare item from home + list timelines, just log
|
||||
// if something goes wrong since this is not a showstopper.
|
||||
|
||||
if err := p.state.Timelines.Home.UnprepareItem(ctx, accountID, statusID); err != nil {
|
||||
l.Errorf("error unpreparing item from home timeline: %v", err)
|
||||
}
|
||||
|
||||
for _, list := range lists {
|
||||
if err := p.state.Timelines.List.UnprepareItem(ctx, list.ID, statusID); err != nil {
|
||||
l.Errorf("error unpreparing item from list timeline %s: %v", list.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -95,7 +95,13 @@ func (p *Processor) PinCreate(ctx context.Context, requestingAccount *gtsmodel.A
|
|||
|
||||
targetStatus.PinnedAt = time.Now()
|
||||
if err := p.state.DB.UpdateStatus(ctx, targetStatus, "pinned_at"); err != nil {
|
||||
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error pinning status: %w", err))
|
||||
err = gtserror.Newf("db error pinning status: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil {
|
||||
err = gtserror.Newf("error invalidating status from timelines: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
return p.apiStatus(ctx, targetStatus, requestingAccount)
|
||||
|
@ -118,11 +124,19 @@ func (p *Processor) PinRemove(ctx context.Context, requestingAccount *gtsmodel.A
|
|||
return nil, errWithCode
|
||||
}
|
||||
|
||||
if !targetStatus.PinnedAt.IsZero() {
|
||||
if targetStatus.PinnedAt.IsZero() {
|
||||
return p.apiStatus(ctx, targetStatus, requestingAccount)
|
||||
}
|
||||
|
||||
targetStatus.PinnedAt = time.Time{}
|
||||
if err := p.state.DB.UpdateStatus(ctx, targetStatus, "pinned_at"); err != nil {
|
||||
return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error unpinning status: %w", err))
|
||||
err = gtserror.Newf("db error unpinning status: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil {
|
||||
err = gtserror.Newf("error invalidating status from timelines: %w", err)
|
||||
return nil, gtserror.NewErrorInternalError(err)
|
||||
}
|
||||
|
||||
return p.apiStatus(ctx, targetStatus, requestingAccount)
|
||||
|
|
|
@ -75,6 +75,14 @@ type Manager interface {
|
|||
// WipeStatusesFromAccountID removes all items by the given accountID from the given timeline.
|
||||
WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error
|
||||
|
||||
// UnprepareItem unprepares/uncaches the prepared version fo the given itemID from the given timelineID.
|
||||
// Use this for cache invalidation when the prepared representation of an item has changed.
|
||||
UnprepareItem(ctx context.Context, timelineID string, itemID string) error
|
||||
|
||||
// UnprepareItemFromAllTimelines unprepares/uncaches the prepared version of the given itemID from all timelines.
|
||||
// Use this for cache invalidation when the prepared representation of an item has changed.
|
||||
UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error
|
||||
|
||||
// Prune manually triggers a prune operation for the given timelineID.
|
||||
Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error)
|
||||
|
||||
|
@ -193,7 +201,7 @@ func (m *manager) WipeItemFromAllTimelines(ctx context.Context, itemID string) e
|
|||
})
|
||||
|
||||
if len(errors) > 0 {
|
||||
return gtserror.Newf("one or more errors wiping status %s: %w", itemID, errors.Combine())
|
||||
return gtserror.Newf("error(s) wiping status %s: %w", itemID, errors.Combine())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -204,6 +212,31 @@ func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineID string,
|
|||
return err
|
||||
}
|
||||
|
||||
func (m *manager) UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error {
|
||||
errors := gtserror.MultiError{}
|
||||
|
||||
// Work through all timelines held by this
|
||||
// manager, and call Unprepare for each.
|
||||
m.timelines.Range(func(_ any, v any) bool {
|
||||
// nolint:forcetypeassert
|
||||
if err := v.(Timeline).Unprepare(ctx, itemID); err != nil {
|
||||
errors.Append(err)
|
||||
}
|
||||
|
||||
return true // always continue range
|
||||
})
|
||||
|
||||
if len(errors) > 0 {
|
||||
return gtserror.Newf("error(s) unpreparing status %s: %w", itemID, errors.Combine())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) UnprepareItem(ctx context.Context, timelineID string, itemID string) error {
|
||||
return m.getOrCreateTimeline(ctx, timelineID).Unprepare(ctx, itemID)
|
||||
}
|
||||
|
||||
func (m *manager) Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) {
|
||||
return m.getOrCreateTimeline(ctx, timelineID).Prune(desiredPreparedItemsLength, desiredIndexedItemsLength), nil
|
||||
}
|
||||
|
|
|
@ -78,12 +78,22 @@ type Timeline interface {
|
|||
INDEXING + PREPARATION FUNCTIONS
|
||||
*/
|
||||
|
||||
// IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its id, and then immediately prepares it.
|
||||
// IndexAndPrepareOne puts a item into the timeline at the appropriate place
|
||||
// according to its id, and then immediately prepares it.
|
||||
//
|
||||
// The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false
|
||||
// if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline.
|
||||
// The returned bool indicates whether or not the item was actually inserted
|
||||
// into the timeline. This will be false if the item is a boost and the original
|
||||
// item, or a boost of it, already exists recently in the timeline.
|
||||
IndexAndPrepareOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error)
|
||||
|
||||
// Unprepare clears the prepared version of the given item (and any boosts
|
||||
// thereof) from the timeline, but leaves the indexed version in place.
|
||||
//
|
||||
// This is useful for cache invalidation when the prepared version of the
|
||||
// item has changed for some reason (edits, updates, etc), but the item does
|
||||
// not need to be removed: it will be prepared again next time Get is called.
|
||||
Unprepare(ctx context.Context, itemID string) error
|
||||
|
||||
/*
|
||||
INFO FUNCTIONS
|
||||
*/
|
||||
|
|
50
internal/timeline/unprepare.go
Normal file
50
internal/timeline/unprepare.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package timeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
func (t *timeline) Unprepare(ctx context.Context, itemID string) error {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
if t.items == nil || t.items.data == nil {
|
||||
// Nothing to do.
|
||||
return nil
|
||||
}
|
||||
|
||||
for e := t.items.data.Front(); e != nil; e = e.Next() {
|
||||
entry := e.Value.(*indexedItemsEntry) // nolint:forcetypeassert
|
||||
|
||||
if entry.itemID != itemID && entry.boostOfID != itemID {
|
||||
// Not relevant.
|
||||
continue
|
||||
}
|
||||
|
||||
if entry.prepared == nil {
|
||||
// It's already unprepared (mood).
|
||||
continue
|
||||
}
|
||||
|
||||
entry.prepared = nil // <- eat this up please garbage collector nom nom nom
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
142
internal/timeline/unprepare_test.go
Normal file
142
internal/timeline/unprepare_test.go
Normal file
|
@ -0,0 +1,142 @@
|
|||
// GoToSocial
|
||||
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package timeline_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/id"
|
||||
)
|
||||
|
||||
type UnprepareTestSuite struct {
|
||||
TimelineStandardTestSuite
|
||||
}
|
||||
|
||||
func (suite *UnprepareTestSuite) TestUnprepareFromFave() {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
testAccount = suite.testAccounts["local_account_1"]
|
||||
maxID = ""
|
||||
sinceID = ""
|
||||
minID = ""
|
||||
limit = 1
|
||||
local = false
|
||||
)
|
||||
|
||||
suite.fillTimeline(testAccount.ID)
|
||||
|
||||
// Get first status from the top (no params).
|
||||
statuses, err := suite.state.Timelines.Home.GetTimeline(
|
||||
ctx,
|
||||
testAccount.ID,
|
||||
maxID,
|
||||
sinceID,
|
||||
minID,
|
||||
limit,
|
||||
local,
|
||||
)
|
||||
if err != nil {
|
||||
suite.FailNow(err.Error())
|
||||
}
|
||||
|
||||
if len(statuses) != 1 {
|
||||
suite.FailNow("couldn't get top status")
|
||||
}
|
||||
|
||||
targetStatus := statuses[0].(*apimodel.Status)
|
||||
|
||||
// Check fave stats of the top status.
|
||||
suite.Equal(0, targetStatus.FavouritesCount)
|
||||
suite.False(targetStatus.Favourited)
|
||||
|
||||
// Fave the top status from testAccount.
|
||||
if err := suite.state.DB.PutStatusFave(ctx, >smodel.StatusFave{
|
||||
ID: id.NewULID(),
|
||||
AccountID: testAccount.ID,
|
||||
TargetAccountID: targetStatus.Account.ID,
|
||||
StatusID: targetStatus.ID,
|
||||
URI: "https://example.org/some/activity/path",
|
||||
}); err != nil {
|
||||
suite.FailNow(err.Error())
|
||||
}
|
||||
|
||||
// Repeat call to get first status from the top.
|
||||
// Get first status from the top (no params).
|
||||
statuses, err = suite.state.Timelines.Home.GetTimeline(
|
||||
ctx,
|
||||
testAccount.ID,
|
||||
maxID,
|
||||
sinceID,
|
||||
minID,
|
||||
limit,
|
||||
local,
|
||||
)
|
||||
if err != nil {
|
||||
suite.FailNow(err.Error())
|
||||
}
|
||||
|
||||
if len(statuses) != 1 {
|
||||
suite.FailNow("couldn't get top status")
|
||||
}
|
||||
|
||||
targetStatus = statuses[0].(*apimodel.Status)
|
||||
|
||||
// We haven't yet uncached/unprepared the status,
|
||||
// we've only inserted the fave, so counts should
|
||||
// stay the same...
|
||||
suite.Equal(0, targetStatus.FavouritesCount)
|
||||
suite.False(targetStatus.Favourited)
|
||||
|
||||
// Now call unprepare.
|
||||
suite.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, targetStatus.ID)
|
||||
|
||||
// Now a Get should trigger a fresh prepare of the
|
||||
// target status, and the counts should be updated.
|
||||
// Repeat call to get first status from the top.
|
||||
// Get first status from the top (no params).
|
||||
statuses, err = suite.state.Timelines.Home.GetTimeline(
|
||||
ctx,
|
||||
testAccount.ID,
|
||||
maxID,
|
||||
sinceID,
|
||||
minID,
|
||||
limit,
|
||||
local,
|
||||
)
|
||||
if err != nil {
|
||||
suite.FailNow(err.Error())
|
||||
}
|
||||
|
||||
if len(statuses) != 1 {
|
||||
suite.FailNow("couldn't get top status")
|
||||
}
|
||||
|
||||
targetStatus = statuses[0].(*apimodel.Status)
|
||||
|
||||
suite.Equal(1, targetStatus.FavouritesCount)
|
||||
suite.True(targetStatus.Favourited)
|
||||
}
|
||||
|
||||
func TestUnprepareTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(UnprepareTestSuite))
|
||||
}
|
Loading…
Reference in a new issue