diff --git a/lua/lazy/manage/runner.lua b/lua/lazy/manage/runner.lua index 9b766df..31b194b 100644 --- a/lua/lazy/manage/runner.lua +++ b/lua/lazy/manage/runner.lua @@ -1,23 +1,24 @@ +local Async = require("lazy.async") local Config = require("lazy.core.config") local Task = require("lazy.manage.task") -local Util = require("lazy.util") ---@class RunnerOpts ---@field pipeline (string|{[1]:string, [string]:any})[] ---@field plugins? LazyPlugin[]|fun(plugin:LazyPlugin):any? ---@field concurrency? number ----@alias PipelineStep {task:string, opts?:TaskOptions} ----@alias LazyRunnerTask {co:thread, status: {task?:LazyTask, waiting?:boolean}, plugin: string} +---@class RunnerTask +---@field task? LazyTask +---@field step number + +---@alias PipelineStep {task:string, opts?:TaskOptions } ---@class Runner ---@field _plugins table ----@field _running LazyRunnerTask[] ---@field _pipeline PipelineStep[] ----@field _sync PipelineStep[] ---@field _on_done fun()[] ----@field _syncing boolean ---@field _opts RunnerOpts +---@field _running? Async local Runner = {} ---@param opts RunnerOpts @@ -37,7 +38,6 @@ function Runner.new(opts) for _, plugin in ipairs(pp) do self._plugins[plugin.name] = plugin end - self._running = {} self._on_done = {} ---@param step string|(TaskOptions|{[1]:string}) @@ -45,10 +45,6 @@ function Runner.new(opts) return type(step) == "string" and { task = step } or { task = step[1], opts = step } end, self._opts.pipeline) - self._sync = vim.tbl_filter(function(step) - return step.task == "wait" - end, self._pipeline) - return self end @@ -56,139 +52,107 @@ function Runner:plugin(name) return Config.plugins[name] or self._plugins[name] end ----@param entry LazyRunnerTask -function Runner:_resume(entry) - if entry.status.task and not entry.status.task:is_done() then - return true - end - local ok, status = coroutine.resume(entry.co) - if not ok then - Util.error("Could not resume a task\n" .. status) - end - entry.status = ok and status - return entry.status ~= nil -end - -function Runner:resume(waiting) - if self._syncing then - return true - end - if waiting then - local sync = self._sync[1] - table.remove(self._sync, 1) - if sync then - self._syncing = true - vim.schedule(function() - if sync.opts and type(sync.opts.sync) == "function" then - sync.opts.sync(self) - end - for _, entry in ipairs(self._running) do - if entry.status then - if entry.status.waiting then - entry.status.waiting = false - local plugin = self:plugin(entry.plugin) - if plugin then - plugin._.working = true - end - end - end - end - self._syncing = false - end) - end - end - local running = 0 - for _, entry in ipairs(self._running) do - if entry.status then - if not entry.status.waiting and self:_resume(entry) then - running = running + 1 - if self._opts.concurrency and running >= self._opts.concurrency then - break - end - end - end - end - return self._syncing or running > 0 or (not waiting and self:resume(true)) -end - function Runner:start() - ---@type string[] - local names = vim.tbl_keys(self._plugins) - table.sort(names) - for _, name in pairs(names) do - local co = coroutine.create(self.run_pipeline) - local ok, err = coroutine.resume(co, self, name) - if ok then - table.insert(self._running, { co = co, status = {}, plugin = name }) - else - Util.error("Could not start tasks for " .. name .. "\n" .. err) - end - end - - local check = vim.uv.new_check() - check:start(function() - if self:resume() then - return - end - check:stop() - self._running = {} - for _, cb in ipairs(self._on_done) do - vim.schedule(cb) - end - self._on_done = {} - end) + ---@async + self._running = Async.run(function() + self:_start() + end, { + on_done = function() + for _, cb in ipairs(self._on_done) do + cb() + end + end, + }) end ---@async ----@param name string -function Runner:run_pipeline(name) - local plugin = self:plugin(name) - plugin._.working = true - coroutine.yield() - for _, step in ipairs(self._pipeline) do - if step.task == "wait" then - plugin._.working = false - coroutine.yield({ waiting = true }) - plugin._.working = true - else - plugin = self:plugin(name) - local task = self:queue(plugin, step.task, step.opts) - if task then - coroutine.yield({ task = task }) - assert(task:is_done()) - if task.error then +function Runner:_start() + ---@type string[] + local names = vim.tbl_keys(self._plugins) + table.sort(names) + + ---@type table + local state = {} + + local active = 1 + local waiting = 0 + ---@type number? + local wait_step = nil + + ---@param resume? boolean + local function continue(resume) + active = #names + waiting = 0 + wait_step = nil + for _, name in ipairs(names) do + state[name] = state[name] or { step = 0 } + local s = state[name] + local running = s.task and s.task:is_running() + local step = self._pipeline[s.step] + + if step and step.task == "wait" and not resume then + waiting = waiting + 1 + active = active - 1 + wait_step = s.step + elseif not running then + local plugin = self:plugin(name) + if s.task and s.task.error then + active = active - 1 + elseif s.step == #self._pipeline then + s.task = nil + active = active - 1 plugin._.working = false - return + elseif s.step < #self._pipeline then + s.step = s.step + 1 + step = self._pipeline[s.step] + if step.task == "wait" then + plugin._.working = false + else + s.task = self:queue(plugin, step) + plugin._.working = not not s.task + end end end end end - plugin._.working = false + + while active > 0 do + continue() + if active == 0 and waiting > 0 then + local sync = self._pipeline[wait_step] + if sync and sync.opts and type(sync.opts.sync) == "function" then + sync.opts.sync(self) + end + continue(true) + end + coroutine.yield() + end end ---@param plugin LazyPlugin ----@param task_name string ----@param opts? TaskOptions +---@param step PipelineStep ---@return LazyTask? -function Runner:queue(plugin, task_name, opts) - assert(self._running) - local def = vim.split(task_name, ".", { plain = true }) +function Runner:queue(plugin, step) + assert(self._running and self._running:running(), "Runner is not running") + local def = vim.split(step.task, ".", { plain = true }) ---@type LazyTaskDef local task_def = require("lazy.manage.task." .. def[1])[def[2]] - assert(task_def) - opts = opts or {} + assert(task_def, "Task not found: " .. step.task) + local opts = step.opts or {} if not (task_def.skip and task_def.skip(plugin, opts)) then - local task = Task.new(plugin, def[2], task_def.run, opts) - task:start() - return task + return Task.new(plugin, def[2], task_def.run, opts) end end +function Runner:is_running() + return self._running and self._running:running() +end + -- Execute the callback async when done. -- When no callback is specified, this will wait sync ---@param cb? fun() function Runner:wait(cb) - if #self._running == 0 then + if not self:is_running() then if cb then cb() end @@ -199,7 +163,7 @@ function Runner:wait(cb) table.insert(self._on_done, cb) else -- sync wait - while #self._running > 0 do + while self:is_running() do vim.wait(10) end end diff --git a/lua/lazy/manage/task/init.lua b/lua/lazy/manage/task/init.lua index 5a9079d..98c7636 100644 --- a/lua/lazy/manage/task/init.lua +++ b/lua/lazy/manage/task/init.lua @@ -5,7 +5,7 @@ local Process = require("lazy.manage.process") ---@field skip? fun(plugin:LazyPlugin, opts?:TaskOptions):any? ---@field run fun(task:LazyTask, opts:TaskOptions) ----@alias LazyTaskState {task:LazyTask, thread:thread} +---@alias LazyTaskFn async fun(task:LazyTask, opts:TaskOptions) ---@class LazyTask ---@field plugin LazyPlugin @@ -14,11 +14,10 @@ local Process = require("lazy.manage.process") ---@field status string ---@field error? string ---@field warn? string ----@field private _task fun(task:LazyTask, opts:TaskOptions) ---@field private _started? number ---@field private _ended? number ---@field private _opts TaskOptions ----@field private _running Async[] +---@field private _running Async local Task = {} ---@class TaskOptions: {[string]:any} @@ -27,15 +26,10 @@ local Task = {} ---@param plugin LazyPlugin ---@param name string ---@param opts? TaskOptions ----@param task fun(task:LazyTask) +---@param task LazyTaskFn function Task.new(plugin, name, task, opts) - local self = setmetatable({}, { - __index = Task, - }) + local self = setmetatable({}, { __index = Task }) self._opts = opts or {} - self._running = {} - self._task = task - self._started = nil self.plugin = plugin self.name = name self.output = "" @@ -45,6 +39,7 @@ function Task.new(plugin, name, task, opts) return other.name ~= name or other:is_running() end, plugin._.tasks or {}) table.insert(plugin._.tasks, self) + self:_start(task) return self end @@ -56,22 +51,31 @@ function Task:has_ended() return self._ended ~= nil end -function Task:is_done() - return self:has_started() and self:has_ended() -end - function Task:is_running() - return self:has_started() and not self:has_ended() + return not self:has_ended() end -function Task:start() +---@private +---@param task LazyTaskFn +function Task:_start(task) assert(not self:has_started(), "task already started") assert(not self:has_ended(), "task already done") self._started = vim.uv.hrtime() - self:async(function() - self._task(self, self._opts) - end) + ---@async + self._running = Async.run(function() + task(self, self._opts) + end, { + on_done = function() + self:_done() + end, + on_error = function(err) + self:notify_error(err) + end, + on_yield = function(res) + self:notify(res) + end, + }) end ---@param msg string|string[] @@ -98,31 +102,13 @@ function Task:notify_warn(msg) self:notify(msg, vim.diagnostic.severity.WARN) end ----@param fn async fun() -function Task:async(fn) - local async = Async.run(fn, { - on_done = function() - self:_done() - end, - on_error = function(err) - self:notify_error(err) - end, - on_yield = function(res) - self:notify(res) - end, - }) - table.insert(self._running, async) -end - ---@private function Task:_done() assert(self:has_started(), "task not started") assert(not self:has_ended(), "task already done") - for _, t in ipairs(self._running) do - if t:running() then - return - end + if self._running and self._running:running() then + return end self._ended = vim.uv.hrtime() @@ -180,16 +166,6 @@ function Task:spawn(cmd, opts) end end ----@param tasks (LazyTask?)[] -function Task.all_done(tasks) - for _, task in ipairs(tasks) do - if task and not task:is_done() then - return false - end - end - return true -end - function Task:wait() while self:is_running() do vim.wait(10) diff --git a/lua/lazy/manage/task/plugin.lua b/lua/lazy/manage/task/plugin.lua index cb426ec..ddf53a1 100644 --- a/lua/lazy/manage/task/plugin.lua +++ b/lua/lazy/manage/task/plugin.lua @@ -65,9 +65,7 @@ M.build = { ---@cast builders (string|fun(LazyPlugin))[] for _, build in ipairs(builders) do if type(build) == "function" then - self:async(function() - build(self.plugin) - end) + build(self.plugin) elseif build == "rockspec" then Rocks.build(self) elseif build:sub(1, 1) == ":" then @@ -78,7 +76,7 @@ M.build = { if not chunk or err then error(err) end - self:async(chunk) + chunk() else B.shell(self, build) end diff --git a/tests/manage/runner_spec.lua b/tests/manage/runner_spec.lua index ba940e2..2eb1ab1 100644 --- a/tests/manage/runner_spec.lua +++ b/tests/manage/runner_spec.lua @@ -30,12 +30,11 @@ describe("runner", function() end, } package.loaded["lazy.manage.task.test"]["async" .. i] = { + ---@async ---@param task LazyTask run = function(task) - task:async(function() - coroutine.yield() - table.insert(runs, { plugin = task.plugin.name, task = task.name }) - end) + coroutine.yield() + table.insert(runs, { plugin = task.plugin.name, task = task.name }) end, } end diff --git a/tests/manage/task_spec.lua b/tests/manage/task_spec.lua index 546bc34..11487c5 100644 --- a/tests/manage/task_spec.lua +++ b/tests/manage/task_spec.lua @@ -1,3 +1,4 @@ +---@module 'luassert' --# selene:allow(incorrect_standard_library_use) local Task = require("lazy.manage.task") @@ -20,12 +21,10 @@ describe("task", function() it("simple function", function() local task = Task.new(plugin, "test", function() end, opts) - assert(not task:has_started()) - assert(not task:is_running()) - task:start() + assert(task:has_started()) + assert(task:is_running()) task:wait() assert(not task:is_running()) - assert(task:is_done()) assert(task_result.done) end) @@ -33,11 +32,9 @@ describe("task", function() local task = Task.new(plugin, "test", function() error("test") end, opts) - assert(not task:has_started()) - assert(not task:is_running()) - task:start() + assert(task:has_started()) + assert(task:is_running()) task:wait() - assert(task:is_done()) assert(not task:is_running()) assert(task_result.done) assert(task_result.error) @@ -46,21 +43,17 @@ describe("task", function() it("async", function() local running = true - local task = Task.new(plugin, "test", function(task) - task:async(function() - coroutine.yield() - running = false - end) + ---@async + local task = Task.new(plugin, "test", function() + coroutine.yield() + running = false end, opts) - assert(not task:is_running()) - assert(not task:has_started()) - task:start() + assert(task:has_started()) + assert(task:is_running()) assert(running) assert(task:is_running()) - assert(not task:is_done()) task:wait() assert(not running) - assert(task:is_done()) assert(not task:is_running()) assert(task_result.done) assert(not task.error) @@ -70,8 +63,8 @@ describe("task", function() local task = Task.new(plugin, "spawn_errors", function(task) task:spawn("foobar") end, opts) - assert(not task:is_running()) - task:start() + assert(task:has_started()) + assert(task:is_running()) task:wait() assert(not task:is_running()) assert(task_result.done) @@ -82,13 +75,11 @@ describe("task", function() local task = Task.new(plugin, "test", function(task) task:spawn("echo", { args = { "foo" } }) end, opts) - assert(not task:is_running()) - assert(not task:has_started()) - task:start() + assert(task:has_started()) + assert(task:is_running()) assert(task:has_started()) assert(task:is_running()) task:wait() - assert(task:is_done()) assert.same(task.output, "foo\n") assert(task_result.done) assert(not task.error) @@ -99,8 +90,8 @@ describe("task", function() task:spawn("echo", { args = { "foo" } }) task:spawn("echo", { args = { "bar" } }) end, opts) - assert(not task:is_running()) - task:start() + assert(task:has_started()) + assert(task:is_running()) assert(task:is_running()) task:wait() assert(task.output == "foo\nbar\n" or task.output == "bar\nfoo\n", task.output)