forgejo/services/actions/schedule_tasks.go
Earl Warren b0b6bd3658 fix: a corrupted Forgejo Actions scheduled workflow is disabled (#8942)
The following errors are specific to the scheduled workflow being handled. They do not imply the remaining scheduled workflows cannot be scheduled successfully.

- Failure to create a scheduled task which is most likely caused   by an invalid YAML file.
- Failure to parse the scheduling specs which can be caused by a   number of formating errors.

Instead of returning on error, the corrupted workflow is disabled.

Also display more informative error messages so that the failed workflow can be identified from the logged error.

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [x] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [ ] I do not want this change to show in the release notes.
- [x] I want the title to show in the release notes with a link to this pull request.
- [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title.

<!--start release-notes-assistant-->

## Release notes
<!--URL:https://codeberg.org/forgejo/forgejo-->
- Bug fixes
  - [PR](https://codeberg.org/forgejo/forgejo/pulls/8942): <!--number 8942 --><!--line 0 --><!--description YSBjb3JydXB0ZWQgRm9yZ2VqbyBBY3Rpb25zIHNjaGVkdWxlZCB3b3JrZmxvdyBpcyBkaXNhYmxlZA==-->a corrupted Forgejo Actions scheduled workflow is disabled<!--description-->
<!--end release-notes-assistant-->

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/8942
Reviewed-by: Michael Kriese <michael.kriese@gmx.de>
Co-authored-by: Earl Warren <contact@earl-warren.org>
Co-committed-by: Earl Warren <contact@earl-warren.org>
2025-08-18 22:45:10 +02:00

272 lines
8.2 KiB
Go

// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"bytes"
"context"
"errors"
"fmt"
"time"
actions_model "forgejo.org/models/actions"
"forgejo.org/models/db"
repo_model "forgejo.org/models/repo"
"forgejo.org/models/unit"
"forgejo.org/modules/log"
"forgejo.org/modules/timeutil"
webhook_module "forgejo.org/modules/webhook"
"code.forgejo.org/forgejo/runner/v9/act/jobparser"
act_model "code.forgejo.org/forgejo/runner/v9/act/model"
"github.com/robfig/cron/v3"
"xorm.io/builder"
)
// StartScheduleTasks start the task
func StartScheduleTasks(ctx context.Context) error {
return startTasks(ctx)
}
// startTasks retrieves specifications in pages, creates a schedule task for each specification,
// and updates the specification's next run time and previous run time.
// The function returns an error if there's an issue with finding or updating the specifications.
func startTasks(ctx context.Context) error {
// Set the page size
pageSize := 50
// Retrieve specs in pages until all specs have been retrieved
now := time.Now()
for page := 1; ; page++ {
// Retrieve the specs for the current page
specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
ListOptions: db.ListOptions{
Page: page,
PageSize: pageSize,
},
Next: now.Unix(),
})
if err != nil {
return fmt.Errorf("find specs: %w", err)
}
if err := specs.LoadRepos(ctx); err != nil {
return fmt.Errorf("LoadRepos: %w", err)
}
// Loop through each spec and create a schedule task for it
for _, row := range specs {
// cancel running jobs if the event is push
if row.Schedule.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := CancelPreviousJobs(
ctx,
row.RepoID,
row.Schedule.Ref,
row.Schedule.WorkflowID,
webhook_module.HookEventSchedule,
); err != nil {
log.Error("CancelPreviousJobs: %v", err)
}
}
if row.Repo.IsArchived {
// Skip if the repo is archived
continue
}
cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
if err != nil {
if repo_model.IsErrUnitTypeNotExist(err) {
// Skip the actions unit of this repo is disabled.
continue
}
return fmt.Errorf("GetUnit: %w", err)
}
actionConfig := cfg.ActionsConfig()
if actionConfig.IsWorkflowDisabled(row.Schedule.WorkflowID) {
continue
}
createAndSchedule := func(row *actions_model.ActionScheduleSpec) (cron.Schedule, error) {
if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
return nil, fmt.Errorf("CreateScheduleTask: %v", err)
}
// Parse the spec
schedule, err := row.Parse()
if err != nil {
return nil, fmt.Errorf("Parse(Spec=%v): %v", row.Spec, err)
}
return schedule, nil
}
schedule, err := createAndSchedule(row)
if err != nil {
log.Error("RepoID=%v WorkflowID=%v: %v", row.Schedule.RepoID, row.Schedule.WorkflowID, err)
actionConfig.DisableWorkflow(row.Schedule.WorkflowID)
if err := repo_model.UpdateRepoUnit(ctx, cfg); err != nil {
log.Error("RepoID=%v WorkflowID=%v: CreateScheduleTask: %v", row.Schedule.RepoID, row.Schedule.WorkflowID, err)
return err
}
continue
}
// Update the spec's next run time and previous run time
row.Prev = row.Next
row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
log.Error("UpdateScheduleSpec: %v", err)
return err
}
}
// Stop if all specs have been retrieved
if len(specs) < pageSize {
break
}
}
return nil
}
// CreateScheduleTask creates a scheduled task from a cron action schedule.
// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
// Create a new action run based on the schedule
run := &actions_model.ActionRun{
Title: cron.Title,
RepoID: cron.RepoID,
OwnerID: cron.OwnerID,
WorkflowID: cron.WorkflowID,
TriggerUserID: cron.TriggerUserID,
Ref: cron.Ref,
CommitSHA: cron.CommitSHA,
Event: cron.Event,
EventPayload: cron.EventPayload,
TriggerEvent: string(webhook_module.HookEventSchedule),
ScheduleID: cron.ID,
Status: actions_model.StatusWaiting,
}
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
log.Error("GetVariablesOfRun: %v", err)
return err
}
workflow, err := act_model.ReadWorkflow(bytes.NewReader(cron.Content), false)
if err != nil {
return err
}
notifications, err := workflow.Notifications()
if err != nil {
return err
}
run.NotifyEmail = notifications
// Parse the workflow specification from the cron schedule
workflows, err := jobParser(cron.Content, jobparser.WithVars(vars))
if err != nil {
return err
}
// Insert the action run and its associated jobs into the database
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
return err
}
// Return nil if no errors occurred
return nil
}
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
TriggerEvent: event,
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
})
if err != nil {
return err
}
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 {
return nil
}
// Iterate over each found run and cancel its associated jobs.
for _, run := range runs {
// Find all jobs associated with the current run.
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return err
}
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = actions_model.StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return errors.New("job has changed, try again")
}
// Continue with the next job.
continue
}
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
return err
}
}
}
// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
}
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
// There is no other place we can do this because the app.ini will be changed manually
if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
}
if cancelPreviousJobs {
// cancel running cron jobs of this repository and delete old schedules
if err := CancelPreviousJobs(
ctx,
repo.ID,
repo.DefaultBranch,
"",
webhook_module.HookEventSchedule,
); err != nil {
return fmt.Errorf("CancelPreviousJobs: %v", err)
}
}
return nil
}