Skip to content

Some latency improvements #397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ iscompatible_arg(proc::ThreadProc, opts, x) = true
function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @nospecialize(kwargs...))
tls = get_tls()
task = Task() do
@nospecialize f args kwargs
set_tls!(tls)
TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
@invokelatest f(args...; kwargs...)
Expand Down
15 changes: 12 additions & 3 deletions src/sch/dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function dynamic_listener!(ctx, state, wid)
end
res = try
(false, lock(state.lock) do
@nospecialize f data
Base.invokelatest(f, ctx, state, task, tid, data)
end)
catch err
Expand Down Expand Up @@ -101,6 +102,7 @@ const DYNAMIC_EXEC_LOCK = Threads.ReentrantLock()

"Executes an arbitrary function within the scheduler, returning the result."
function exec!(f, h::SchedulerHandle, args...)
@nospecialize f args
failed, res = lock(DYNAMIC_EXEC_LOCK) do
put!(h.out_chan, (h.thunk_id.id, f, args))
take!(h.inp_chan)
Expand Down Expand Up @@ -195,11 +197,18 @@ function _get_dag_ids(ctx, state, task, tid, _)
end

"Adds a new Thunk to the DAG."
add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...) =
function add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...)
@nospecialize f args
exec!(_add_thunk!, h, f, args, options, future, ref)
function _add_thunk!(ctx, state, task, tid, (f, args, options, future, ref))
end
function _add_thunk!(ctx, state, task, tid, payload)
@nospecialize payload
f, args, options, future, ref = payload
timespan_start(ctx, :add_thunk, tid, 0)
_args = map(pos_arg->pos_arg[1] => (pos_arg[2] isa ThunkID ? state.thunk_dict[pos_arg[2].id] : pos_arg[2]), args)
_args = Pair{Union{Symbol,Nothing},Any}[]
for (pos, arg) in args
push!(_args, pos => (arg isa ThunkID ? state.thunk_dict[arg.id] : arg))
end
GC.@preserve _args begin
thunk = Thunk(f, _args...; options...)
# Create a `DRef` to `thunk` so that the caller can preserve it
Expand Down
2 changes: 1 addition & 1 deletion src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ function eager_thunk()
added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN)
# preserve inputs until they enter the scheduler
tid = GC.@preserve args begin
_args = map(args) do pos_x
_args = Base.mapany(args) do pos_x
pos, x = pos_x
if x isa Dagger.EagerThunk
return pos => ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref)
Expand Down
2 changes: 2 additions & 0 deletions src/thunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ mutable struct Thunk
propagates=(),
kwargs...
)
@nospecialize f xs
if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
f = tochunk(f,
something(processor, OSProc()),
Expand Down Expand Up @@ -138,6 +139,7 @@ Options(;options...) = Options((;options...))
Options(options...) = Options((;options...))

function args_kwargs_to_pairs(args, kwargs)
@nospecialize args kwargs
args_kwargs = Pair{Union{Symbol,Nothing},Any}[]
for arg in args
push!(args_kwargs, nothing => arg)
Expand Down