Add ActivityPub Person follow from distant (#8720)

This PR is part of #4767. It

1. adds the ability to follow a local person from a distant federation server (see tests/integration/api_activitypub_person_inbox_follow_test.go)
2. streamlines the router code (refactor the person conversion & handling of inbox requests in service direction, unifies service call signature & error handling)
3. introduces queues for decoupling outgoing communication (delivery retry to cope network issues or distant service downtimes) and
4. adds minor fixes to integration tests (test timeout & invalid inbox activities)

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/8720
Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
Co-authored-by: Michael Jerger <michael.jerger@meissa-gmbh.de>
Co-committed-by: Michael Jerger <michael.jerger@meissa-gmbh.de>
This commit is contained in:
Michael Jerger 2025-08-03 11:55:01 +02:00 committed by Earl Warren
commit 7566ebfba7
13 changed files with 422 additions and 66 deletions

View file

@ -0,0 +1,76 @@
// Copyright 2024 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package federation
import (
"fmt"
"io"
"forgejo.org/models/user"
"forgejo.org/modules/activitypub"
"forgejo.org/modules/graceful"
"forgejo.org/modules/log"
"forgejo.org/modules/process"
"forgejo.org/modules/queue"
)
type deliveryQueueItem struct {
Doer *user.User
InboxURL string
Payload []byte
DeliveryCount int
}
var deliveryQueue *queue.WorkerPoolQueue[deliveryQueueItem]
func initDeliveryQueue() error {
deliveryQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "activitypub_inbox_delivery", deliveryQueueHandler)
if deliveryQueue == nil {
return fmt.Errorf("unable to create activitypub_inbox_delivery queue")
}
go graceful.GetManager().RunWithCancel(deliveryQueue)
return nil
}
func deliveryQueueHandler(items ...deliveryQueueItem) (unhandled []deliveryQueueItem) {
for _, item := range items {
item.DeliveryCount++
err := deliverToInbox(item)
if err != nil && item.DeliveryCount < 10 {
unhandled = append(unhandled, item)
}
}
return unhandled
}
func deliverToInbox(item deliveryQueueItem) error {
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(),
fmt.Sprintf("Delivering an Activity via user[%d] (%s), to %s", item.Doer.ID, item.Doer.Name, item.InboxURL))
defer finished()
clientFactory, err := activitypub.GetClientFactory(ctx)
if err != nil {
return err
}
apclient, err := clientFactory.WithKeys(ctx, item.Doer, item.Doer.APActorID()+"#main-key")
if err != nil {
return err
}
log.Debug("Delivering %s to %s", item.Payload, item.InboxURL)
res, err := apclient.Post(item.Payload, item.InboxURL)
if err != nil {
return err
}
if res.StatusCode >= 400 {
defer res.Body.Close()
body, _ := io.ReadAll(io.LimitReader(res.Body, 16*1024))
log.Warn("Delivering to %s failed: %d %s, %v times", item.InboxURL, res.StatusCode, string(body), item.DeliveryCount)
return fmt.Errorf("delivery failed")
}
return nil
}

View file

@ -23,7 +23,10 @@ import (
)
func Init() error {
return nil
if !setting.Federation.Enabled {
return nil
}
return initDeliveryQueue()
}
func FindOrCreateFederationHost(ctx context.Context, actorURI string) (*forgefed.FederationHost, error) {

View file

@ -0,0 +1,73 @@
// Copyright 2024 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package federation
import (
"context"
"fmt"
"net/http"
"forgejo.org/models/user"
"forgejo.org/modules/forgefed"
"forgejo.org/modules/log"
ap "github.com/go-ap/activitypub"
"github.com/go-ap/jsonld"
)
func processPersonFollow(ctx context.Context, ctxUser *user.User, activity *ap.Activity) (ServiceResult, error) {
follow, err := forgefed.NewForgeFollowFromAp(*activity)
if err != nil {
log.Error("Invalid follow activity: %s", err)
return ServiceResult{}, NewErrNotAcceptablef("Invalid follow activity: %v", err)
}
actorURI := follow.Actor.GetLink().String()
_, federatedUser, federationHost, err := FindOrCreateFederatedUser(ctx, actorURI)
if err != nil {
log.Error("Error finding or creating federated user (%s): %v", actorURI, err)
return ServiceResult{}, NewErrNotAcceptablef("Federated user not found: %v", err)
}
following, err := user.IsFollowingAp(ctx, ctxUser, federatedUser)
if err != nil {
log.Error("forgefed.IsFollowing: %v", err)
return ServiceResult{}, NewErrNotAcceptablef("forgefed.IsFollowing: %v", err)
}
if following {
// If the user is already following, we're good, nothing to do.
log.Trace("Local user[%d] is already following federated user[%d]", ctxUser.ID, federatedUser.ID)
return NewServiceResultStatusOnly(http.StatusNoContent), nil
}
follower, err := user.AddFollower(ctx, ctxUser, federatedUser)
if err != nil {
log.Error("Unable to add follower: %v", err)
return ServiceResult{}, NewErrNotAcceptablef("Unable to add follower: %v", err)
}
accept := ap.AcceptNew(ap.IRI(fmt.Sprintf(
"%s#accepts/follow/%d", ctxUser.APActorID(), follower.ID,
)), follow)
accept.Actor = ap.IRI(ctxUser.APActorID())
payload, err := jsonld.WithContext(jsonld.IRI(ap.ActivityBaseURI)).Marshal(accept)
if err != nil {
log.Error("Unable to Marshal JSON: %v", err)
return ServiceResult{}, NewErrInternalf("MarshalJSON: %v", err)
}
hostURL := federationHost.AsURL()
if err := deliveryQueue.Push(deliveryQueueItem{
InboxURL: hostURL.JoinPath(federatedUser.InboxPath).String(),
Doer: ctxUser,
Payload: payload,
}); err != nil {
log.Error("Unable to push to pending queue: %v", err)
return ServiceResult{}, NewErrInternalf("Unable to push to pending queue: %v", err)
}
// Respond back with an accept
result := NewServiceResultWithBytes(http.StatusAccepted, []byte(`{"status":"Accepted"}`))
return result, nil
}

View file

@ -0,0 +1,47 @@
// Copyright 2024 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package federation
import (
"context"
"net/http"
"forgejo.org/models/user"
"forgejo.org/modules/log"
ap "github.com/go-ap/activitypub"
)
func processPersonInboxUndo(ctx context.Context, ctxUser *user.User, activity *ap.Activity) (ServiceResult, error) {
if activity.Object.GetType() != ap.FollowType {
log.Error("Invalid object type for Undo activity: %v", activity.Object.GetType())
return ServiceResult{}, NewErrNotAcceptablef("Invalid object type for Undo activity: %v", activity.Object.GetType())
}
actorURI := activity.Actor.GetLink().String()
_, federatedUser, _, err := findFederatedUser(ctx, actorURI)
if err != nil {
log.Error("User not found: %v", err)
return ServiceResult{}, NewErrInternalf("User not found: %v", err)
}
if federatedUser != nil {
following, err := user.IsFollowingAp(ctx, ctxUser, federatedUser)
if err != nil {
log.Error("forgefed.IsFollowing: %v", err)
return ServiceResult{}, NewErrInternalf("forgefed.IsFollowing: %v", err)
}
if !following {
// The local user is not following the federated one, nothing to do.
log.Trace("Local user[%d] is not following federated user[%d]", ctxUser.ID, federatedUser.ID)
return NewServiceResultStatusOnly(http.StatusNoContent), nil
}
if err := user.RemoveFollower(ctx, ctxUser, federatedUser); err != nil {
log.Error("Unable to remove follower", err)
return ServiceResult{}, NewErrInternalf("Unable to remove follower: %v", err)
}
}
return NewServiceResultStatusOnly(http.StatusNoContent), nil
}

View file

@ -0,0 +1,25 @@
// Copyright 2024 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package federation
import (
"context"
"forgejo.org/models/user"
"forgejo.org/modules/log"
ap "github.com/go-ap/activitypub"
)
func ProcessPersonInbox(ctx context.Context, user *user.User, activity *ap.Activity) (ServiceResult, error) {
switch activity.Type {
case ap.FollowType:
return processPersonFollow(ctx, user, activity)
case ap.UndoType:
return processPersonInboxUndo(ctx, user, activity)
}
log.Error("Unsupported PersonInbox activity: %v", activity.Type)
return ServiceResult{}, NewErrNotAcceptablef("unsupported activity: %v", activity.Type)
}