diff --git a/Project.toml b/Project.toml index 71af462..817296a 100644 --- a/Project.toml +++ b/Project.toml @@ -1,11 +1,12 @@ name = "MultistartOptimization" uuid = "3933049c-43be-478e-a8bb-6e0f7fd53575" authors = ["Tamas K. Papp "] -version = "0.3.1" +version = "0.4" [deps] ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae" +OhMyThreads = "67456a42-1dca-4109-a031-0a68de7e3ad5" Sobol = "ed01d8cd-4d21-5b2a-85b4-cc3bdc58bad4" [weakdeps] @@ -18,5 +19,6 @@ MultistartOptimizationNLoptExt = "NLopt" ArgCheck = "1, 2" DocStringExtensions = "0.8, 0.9" NLopt = "0.5, 0.6, 1" +OhMyThreads = "0.8" Sobol = "1.3" julia = "1.10" diff --git a/src/MultistartOptimization.jl b/src/MultistartOptimization.jl index 2aa6169..142dbe6 100644 --- a/src/MultistartOptimization.jl +++ b/src/MultistartOptimization.jl @@ -3,9 +3,9 @@ module MultistartOptimization # NOTE: exports in included files using ArgCheck: @argcheck -using Base.Threads: @spawn, fetch using DocStringExtensions: FIELDS, FUNCTIONNAME, SIGNATURES, TYPEDEF using Sobol: SobolSeq, Sobol +using OhMyThreads include("generic_api.jl") include("tiktak.jl") diff --git a/src/generic_api.jl b/src/generic_api.jl index 3b90602..4176f06 100644 --- a/src/generic_api.jl +++ b/src/generic_api.jl @@ -58,9 +58,13 @@ function local_minimization(local_method, minimization_problem::MinimizationProb end """ -`$(FUNCTIONNAME)(multistart_method, local_method, minimization_problem)` +`$(FUNCTIONNAME)(multistart_method, local_method, minimization_problem, [scheduler]; + scheduler_options...)` -Multistart minimization using the given multistart and local methods. +Multistart minimization using the given multistart and local methods. The individual +minimizations are carried out using the `scheduler` from `OhMyThreads`. Instead of +passing a `Scheduler`, it is also possible to use `scheduler_options`, which will +automatically construct the most suitable `Scheduler`. """ function multistart_minimization end diff --git a/src/tiktak.jl b/src/tiktak.jl index 3f31a0a..c61fe0c 100644 --- a/src/tiktak.jl +++ b/src/tiktak.jl @@ -4,6 +4,8 @@ export TikTak +VERSION ≥ v"1.11.0-DEV.469" && eval(Expr(:public, :default_initial_point_scheduler, :default_local_scheduler)) + struct TikTak quasirandom_N::Int initial_N::Int @@ -72,20 +74,15 @@ $(SIGNATURES) Evaluate and return points of an `N`-element Sobol sequence. -When `use_threads`, execution is parallelized using `Threads.@spawn`. +Execution is potentially parallelized using the strategy specified by `scheduler`. """ -function sobol_starting_points(minimization_problem::MinimizationProblem, N::Integer, - use_threads::Bool) +function sobol_starting_points(minimization_problem::MinimizationProblem, N::Integer, scheduler::Scheduler) (; objective, lower_bounds, upper_bounds) = minimization_problem s = SobolSeq(lower_bounds, upper_bounds) skip(s, N) # better uniformity - points = Iterators.take(s, N) + points = collect(Iterators.take(s, N)) _initial(x) = _objective_at_location(objective, x) - if use_threads - map(fetch, map(x -> @spawn(_initial(x)), points)) - else - map(_initial, points) - end + tmap(_initial, Base.promote_op(_initial, eltype(points)), points; scheduler) end """ @@ -93,9 +90,77 @@ $(SIGNATURES) Helper function to keep the `N` points with the lowest `value`. """ -function _keep_lowest(xs, N) +function _keep_lowest!(xs, N) @argcheck 1 ≤ N ≤ length(xs) - partialsort(xs, 1:N, by = p -> p.value) + partialsort!(xs, 1:N, by = p -> p.value) +end + +""" +$(SIGNATURES) + +Constructs the scheduler used to compute the function values at the initial points. If +multiple threads are used, this is a `StaticScheduler`, to which arguments may be passed. +For a single-threaded run, this is a `SerialScheduler`. + +This function is not exported. +""" +default_initial_point_scheduler(; ntasks=Threads.nthreads(), chunksize=nothing, minchunksize=nothing, + split::Union{OhMyThreads.Split,Symbol}=OhMyThreads.Consecutive(), chunking::Bool=true) = + isone(Threads.nthreads()) || isone(ntasks) ? SerialScheduler() : + StaticScheduler(; ntasks, chunksize, minchunksize, split, chunking) + +""" +$(SIGNATURES) + +Constructs the scheduler used to invoke the local minimizer on the objective from various +initial points. If multiple threads are used, this is a `GreedyScheduler`, to which +arguments may be passed. For a single-threaded run, this is a `SerialScheduler`. + +This function is not exported. + +!!! warning + It is highly recommended to only use the `RoundRobin` splitting strategy in order to + resemble most closely the steps a serial execution would take. +""" +default_local_scheduler(; ntasks=Threads.nthreads(), nchunks=nothing, chunksize=nothing, minchunksize=nothing, + split::Union{OhMyThreads.Split,Symbol}=OhMyThreads.RoundRobin(), chunking::Bool=false) = + isone(Threads.nthreads()) || isone(ntasks) ? SerialScheduler() : + GreedyScheduler(; ntasks, nchunks, chunksize, minchunksize, split, chunking) + +# We need an indexable enumerate() and also want to iterate over the concatenation of two vectors without having to concatenate +# them. Define this simple struct to achieve both things. +struct EnumeratedCatVector{T,X<:Tuple{AbstractVector{T},Vararg{AbstractVector{T}}}} <: AbstractVector{Tuple{Int,T}} + v::X +end + +Base.size(e::EnumeratedCatVector) = (sum(length, e.v, init=0),) +@generated function Base.getindex(e::EnumeratedCatVector, i::Int) + result = Expr(:block, Expr(:meta, :inline), Expr(:meta, :propagate_inbounds), + :(@boundscheck i ≤ 0 && throw(BoundsError(e, i))), + :(i_ = i) + ) + jmax = fieldcount(fieldtype(e, :v)) + for j in 1:fieldcount(fieldtype(e, :v))-1 + push!(result.args, :(len = length(e.v[$j])), :(if i ≤ len + return i_, @inbounds e.v[$j][begin+i-1] + else + i -= len + end)) + end + push!(result.args, :(return i_, e.v[$jmax][i])) + return result +end +Base.IndexStyle(::Type{X}) where {X<:EnumeratedCatVector} = IndexLinear() + +struct NoLock <: Base.AbstractLock end +Base.lock(::NoLock) = nothing +Base.trylock(::NoLock) = true +Base.unlock(::NoLock) = nothing +Base.islocked(::NoLock) = false + +mutable struct VisitedMinimum{V,T} + location::V + value::T end """ @@ -103,31 +168,74 @@ $(SIGNATURES) Solve `minimization_problem` by using `local_method` within `multistart_method`. -When `use_threads`, initial point search is parallelized using `Threads.@spawn`. +Initial point search and subsequent minimizations are executed according to the scheduler +policy. The argument `use_threads` enables default parallelization settings; for more +fine-grained control, explicitly specify the schedulers for the two stages. `prepend_points` should contain a vector of initial starting points that are prepended to the Sobol sequence. These are useful if a guess is available for the vicinity of the optimum. + +!!! warning + If the local optimization method is deterministic, so is the multistart optimization, + provided the `local_scheduler` is serial (multi-threading initial point generation is + fine). For a multi-threaded `local_scheduler`, no guarantees can be made either to + visit the same starting points in each run. + While chunking can be enabled, it is strongly recommended to use the `RoundRobin` + method is to be preferred over the `Consecutive` splitting strategy for + `local_scheduler`, so that it is more likely to be mixing initial points from + probably better starting points. """ function multistart_minimization(multistart_method::TikTak, local_method, minimization_problem; - use_threads = true, - prepend_points = Vector{Vector{Float64}}()) + prepend_points = Vector{Vector{Float64}}(), + use_threads::Bool=false, + initial_point_scheduler::Scheduler=use_threads ? default_initial_point_scheduler() : + SerialScheduler(), + local_scheduler::Scheduler=use_threads ? default_local_scheduler() : SerialScheduler()) + # We now have at most two choices for the schedulers, so union splitting will work - this is type stable (; quasirandom_N, initial_N, θ_min, θ_max, θ_pow) = multistart_method (; objective, lower_bounds, upper_bounds) = minimization_problem - @argcheck(all(x -> all(lower_bounds .≤ x .≤ upper_bounds), prepend_points), - "prepend_points outside problem bounds") - quasirandom_points = sobol_starting_points(minimization_problem, quasirandom_N, - use_threads) - initial_points = _keep_lowest(quasirandom_points, initial_N) - all_points = vcat(map(x -> _objective_at_location(objective, x), prepend_points), - initial_points) - function _step(visited_minimum, (i, initial_point)) - θ = _weight_parameter(multistart_method, i) - x = @. (1 - θ) * initial_point.location + θ * visited_minimum.location - local_minimum = local_minimization(local_method, minimization_problem, x) - local_minimum ≡ nothing && return visited_minimum - local_minimum.value < visited_minimum.value ? local_minimum : visited_minimum + @argcheck(all(x -> begin + length(lower_bounds) == length(x) || throw(DimensionMismatch("prepend_points with invalid length")) + all(issorted, zip(lower_bounds, x, upper_bounds)) + end, prepend_points), "prepend_points outside problem bounds") + quasirandom_points = sobol_starting_points(minimization_problem, quasirandom_N, initial_point_scheduler) + initial_points = _keep_lowest!(quasirandom_points, initial_N) + all_points = EnumeratedCatVector((map(Base.Fix1(_objective_at_location, objective), prepend_points), initial_points)) + + init = first(all_points)[2] + if local_scheduler isa SerialScheduler + tlv = Ref(similar(init.location)) + l = NoLock() + else + tlv = OhMyThreads.TaskLocalValue{Base.RefValue{typeof(init.location)}}(let x=init.location; () -> Ref(similar(x)) end) + l = Threads.SpinLock() + end + visited_minimum = VisitedMinimum(copy(init.location), init.value) + _step = let tlv=tlv, l=l, visited_minimum=visited_minimum + function _step((i, initial_point)::Tuple{Int,NamedTuple}) + θ = _weight_parameter(multistart_method, i) + xref = local_scheduler isa SerialScheduler ? tlv : tlv[] + x = xref[] + @. x = (1 - θ) * initial_point.location + θ * visited_minimum.location + local_minimum = local_minimization(local_method, minimization_problem, x) + local_minimum ≡ nothing && return visited_minimum + if local_minimum.value < visited_minimum.value + lock(l) + if local_minimum.value < visited_minimum.value + # If local_minimization works in-place, local_minimum ≡ x - so the next iteration in this task would + # overwrite the minimum position. Hence, we swap our temporary with the previous minimum, which has now + # become obsolete. + xref[] = visited_minimum.location + visited_minimum.location = local_minimum.location + visited_minimum.value = local_minimum.value + end + unlock(l) + end + return + end end - foldl(_step, enumerate(Iterators.drop(all_points, 1)); init = first(all_points)) + tforeach(_step, all_points; scheduler=local_scheduler) + return (; location=visited_minimum.location, value=visited_minimum.value) end