diff --git a/src/processor.jl b/src/processor.jl index 024c66c53..02efdca7a 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -157,6 +157,7 @@ iscompatible_arg(proc::ThreadProc, opts, x) = true function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @nospecialize(kwargs...)) tls = get_tls() task = Task() do + @nospecialize f args kwargs set_tls!(tls) TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id) @invokelatest f(args...; kwargs...) diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl index 679514b83..58cc5bdf6 100644 --- a/src/sch/dynamic.jl +++ b/src/sch/dynamic.jl @@ -64,6 +64,7 @@ function dynamic_listener!(ctx, state, wid) end res = try (false, lock(state.lock) do + @nospecialize f data Base.invokelatest(f, ctx, state, task, tid, data) end) catch err @@ -101,6 +102,7 @@ const DYNAMIC_EXEC_LOCK = Threads.ReentrantLock() "Executes an arbitrary function within the scheduler, returning the result." function exec!(f, h::SchedulerHandle, args...) + @nospecialize f args failed, res = lock(DYNAMIC_EXEC_LOCK) do put!(h.out_chan, (h.thunk_id.id, f, args)) take!(h.inp_chan) @@ -195,11 +197,18 @@ function _get_dag_ids(ctx, state, task, tid, _) end "Adds a new Thunk to the DAG." -add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...) = +function add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...) + @nospecialize f args exec!(_add_thunk!, h, f, args, options, future, ref) -function _add_thunk!(ctx, state, task, tid, (f, args, options, future, ref)) +end +function _add_thunk!(ctx, state, task, tid, payload) + @nospecialize payload + f, args, options, future, ref = payload timespan_start(ctx, :add_thunk, tid, 0) - _args = map(pos_arg->pos_arg[1] => (pos_arg[2] isa ThunkID ? state.thunk_dict[pos_arg[2].id] : pos_arg[2]), args) + _args = Pair{Union{Symbol,Nothing},Any}[] + for (pos, arg) in args + push!(_args, pos => (arg isa ThunkID ? state.thunk_dict[arg.id] : arg)) + end GC.@preserve _args begin thunk = Thunk(f, _args...; options...) # Create a `DRef` to `thunk` so that the caller can preserve it diff --git a/src/sch/eager.jl b/src/sch/eager.jl index 1bafc0874..a57c324f0 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -97,7 +97,7 @@ function eager_thunk() added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN) # preserve inputs until they enter the scheduler tid = GC.@preserve args begin - _args = map(args) do pos_x + _args = Base.mapany(args) do pos_x pos, x = pos_x if x isa Dagger.EagerThunk return pos => ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref) diff --git a/src/thunk.jl b/src/thunk.jl index c89e89523..1edcdceeb 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -77,6 +77,7 @@ mutable struct Thunk propagates=(), kwargs... ) + @nospecialize f xs if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope)) f = tochunk(f, something(processor, OSProc()), @@ -138,6 +139,7 @@ Options(;options...) = Options((;options...)) Options(options...) = Options((;options...)) function args_kwargs_to_pairs(args, kwargs) + @nospecialize args kwargs args_kwargs = Pair{Union{Symbol,Nothing},Any}[] for arg in args push!(args_kwargs, nothing => arg)