Other formats: This can also be accessed as a Jupyter notebook or a plain script.
Parallelization example
Note: The packages for this example are documented in the Project.toml, except for some unregistered packages.
First, we'll set up the environment, which entails installing and importing packages.
# Install unregistered packages.
using Pkg: Pkg
using Ensembles
try
using Lorenz63: Lorenz63
catch
Ensembles.install(:Lorenz63)
end
# Define a macro for doing imports to avoid duplicating it for remote processes later on.
macro initial_imports()
return esc(
quote
using Ensembles
using LinearAlgebra: norm
using Distributed: addprocs, rmprocs, @everywhere, remotecall, fetch, WorkerPool
using Test: @test
using Random: Random
using Lorenz63: Lorenz63
ext = Base.get_extension(Ensembles, :Lorenz63Ext)
using .ext
end
)
end
@initial_imports
worker_initial_imports = @macroexpand1 @initial_imports
include("../_utils/time.jl")@time_msg (macro with 1 method)Define parameters.
params = Dict(
"format" => "v0.1",
"transition" => Dict(
"sigma" => 10,
"rho" => 28,
"beta" => 8 / 3,
"scaling" => 1,
"ministep_nt" => missing,
"ministep_dt" => 0.05,
),
"observation" => Dict("noise_scale" => 2, "timestep_size" => 0.1, "num_timesteps" => 5),
"ensemble" => Dict(
"size" => 5,
"seed" => 9347215,
"prior" => "gaussian",
"prior_params" => [0.0, 1.0],
),
"spinup" => Dict("num_timesteps" => 5, "transition_noise_scale" => 0.0),
);Seed for reproducibility.
Random.seed!(1983745)Random.TaskLocalRNG()Make operators.
transitioner = Lorenz63Model(; params)
observer = NoisyObserver(get_state_keys(transitioner); params);Set seed for ground-truth simulation.
Random.seed!(0xfee55e45)
xor_seed!(observer, UInt64(0x243ecae5))Random.MersenneTwister(0xcdc5cd3801285adb)Define observation times
observation_times = let
step = params["observation"]["timestep_size"]
length = params["observation"]["num_timesteps"]
range(; start=0, length, step)
end0.0:0.1:0.4Generate synthetic ground-truth observations.
if !(@isdefined ground_truth) || isnothing(ground_truth)
ground_truth = @time let
state0 = Dict{Symbol,Any}(:state => randn(3))
# Set seed for ground-truth simulation.
Random.seed!(0xfee55e45)
xor_seed!(observer, UInt64(0x243ecae5))
# Generate states and observations.
t0 = 0.0
states = Vector{Dict{Symbol,Any}}(undef, length(observation_times))
observations = Vector{Dict{Symbol,Any}}(undef, length(observation_times))
let state = state0
for (i, t) in enumerate(observation_times)
state = transitioner(state, t0, t)
obs = observer(state)
states[i] = state
observations[i] = split_clean_noisy(observer, obs)[2]
t0 = t
end
end
(; states, observations)
end
println(" ^ timing for making ground truth observations")
ground_truth_states_vec = get_ensemble_matrix([:state], ground_truth.states)
ground_truth_obs_vec = get_ensemble_matrix([:state], ground_truth.observations)
end; 0.000271 seconds (720 allocations: 39.727 KiB)
^ timing for making ground truth observations
Ensembles
Make initial ensemble.
function generate_ensemble(params::Dict)
seed = params["ensemble"]["seed"]
ensemble_size = params["ensemble"]["size"]
prior_type = params["ensemble"]["prior"]
members = Vector{Dict{Symbol,Any}}(undef, ensemble_size)
if prior_type == "gaussian"
rng = Random.MersenneTwister(seed)
prior_mean, prior_std = params["ensemble"]["prior_params"]
for i in 1:ensemble_size
data = prior_mean .+ prior_std .* randn(rng, 3)
state = Dict{Symbol,Any}(:state => data)
members[i] = state
end
else
throw(ArgumentError("Invalid prior type: $prior_type"))
end
ensemble = Ensemble(members)
return ensemble
end;
if !(@isdefined ensemble_initial0) || isnothing(ensemble_initial0)
ensemble_initial0 = generate_ensemble(params)
end
ensemble_initial0 = generate_ensemble(params);
ensemble_initial = Ensemble(ensemble_initial0, ensemble_initial0.members, [:state]);
t_index_end = params["spinup"]["num_timesteps"]
observation_times = observation_times[1:t_index_end]
ground_truth_observations = ground_truth.observations[1:t_index_end]
transition_noise = params["spinup"]["transition_noise_scale"]0.0Data assimilation
Choose filtering algorithm.
filter = nothingRun sequential algorithm.
if !(@isdefined ensembles_sequential) || isnothing(ensembles_sequential)
ensembles_sequential =
let t_index_end = params["spinup"]["num_timesteps"],
observation_times = observation_times[1:t_index_end],
ground_truth_observations = ground_truth.observations[1:t_index_end],
ensemble = ensemble_initial0,
t0 = 0.0,
transition_noise = params["spinup"]["transition_noise_scale"]
Random.seed!(0x3289745)
xor_seed!(observer, UInt64(0x375ef928))
logs = []
ensembles = []
@time begin
push!(ensembles, (; ensemble, t=t0))
for (t, y_obs) in zip(observation_times, ground_truth_observations)
# Advance ensemble to time t.
ensemble = transitioner(ensemble, t0, t; inplace=false)
# Keep ensemble separated.
if transition_noise != 0
for em in ensemble.members
em[:state] .+= transition_noise .* Random.randn(3)
end
end
# Take observation at time t.
ensemble_obs = observer(ensemble)
ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
observer, ensemble_obs
)
# Record.
push!(
ensembles, (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t)
)
# Assimilate observation
log_data = Dict{Symbol,Any}()
(posterior, timing...) = @timed assimilate_data(
filter,
ensemble,
ensemble_obs_clean,
ensemble_obs_noisy,
y_obs,
log_data,
)
log_data[:timing] = timing
ensemble = posterior
# Record.
push!(ensembles, (; ensemble, t))
push!(logs, log_data)
# Let time pass.
t0 = t
end
end
println(" ^ timing for making initial ensemble")
ensembles
end
end11-element Vector{Any}:
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), t = 0.0)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [0.04603820840751882, -1.3995079064480316, 3.1064010668168405]), Dict(:state => [0.6387341207031547, -0.0745257051949395, -2.0722870493214565]), Dict(:state => [4.45600019715385, 4.125025252008753, -2.4551226991712434]), Dict(:state => [4.389604011170066, -2.901761742663583, 2.24532999576864]), Dict(:state => [0.030306789729348138, 0.5132203910313592, -2.088199932824817])], [:state], true), t = 0.0)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), t = 0.0)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-4.845823883362617, -6.740747078305999, 4.896686998829272]), Dict(:state => [-2.4866847524973013, -3.2307253486030625, -2.4653437662059767]), Dict(:state => [-0.37704949460747994, -0.5794202086845686, -2.1898990495012196]), Dict(:state => [-1.2454472997273869, -3.4518522381898364, 0.9719789016195703]), Dict(:state => [-0.023503811697042853, -2.095001087169096, -1.51795729836139])], [:state], true), t = 0.1)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), t = 0.1)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-13.733800750620576, -23.27577240052977, 10.680949837348987]), Dict(:state => [-5.178743142257742, -4.873524931989422, 2.026544230158968]), Dict(:state => [2.8343717541774494, 3.3955955658720205, -1.7772854626910217]), Dict(:state => [-7.952979433071585, -15.192456317794521, 3.6538489542267816]), Dict(:state => [-1.1781142523264245, -2.4624681857570634, -3.0080783793092367])], [:state], true), t = 0.2)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), t = 0.2)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-20.514527403795405, -20.567850255497483, 43.9858016954963]), Dict(:state => [-9.180411181801038, -20.326064684157796, 8.541376817627015]), Dict(:state => [4.338717703246138, 15.662419456666674, 3.453241966592169]), Dict(:state => [-12.96418204337131, -25.04221924503085, 22.492283535359167]), Dict(:state => [-6.733558885243093, -11.263998276796784, 5.510643972051675])], [:state], true), t = 0.3)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), t = 0.3)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-12.762769302036986, 6.205844927525672, 37.556836446505926]), Dict(:state => [-13.470954149382568, -20.679763679241084, 38.651355768958695]), Dict(:state => [16.50149305867902, 23.465322886668933, 21.20681405584041]), Dict(:state => [-19.18221088362932, -5.347099604958038, 49.03058203353198]), Dict(:state => [-12.260496022701869, -26.038698125255028, 21.496989669871404])], [:state], true), t = 0.4)
(ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), t = 0.4)File-based parallelism
Same assimilation algorithm, but now the transition and observations are done in parallel.
worker_ids = addprocs(4; exeflags="--project=$(Base.active_project())")
worker_ids_driver = vcat([1], worker_ids)
try
@everywhere worker_ids_driver begin
println("I am here")
$worker_initial_imports
function worker_transition(run_dir, k, t, t0, params, worker_id, num_workers)
try
local worker = ParallelWorker(num_workers, worker_id)
local ensemble_path = joinpath(run_dir, "ensemble_$(k-1)_posterior")
local ensemble = load_ensemble(ensemble_path)
local tmp_filepath = joinpath(run_dir, "intermediate_trans_$(k-1)_to_$(k)")
local output_filepath = joinpath(run_dir, "ensemble_$(k)_prior")
local parallel_strategy = FileBasedPartial(output_filepath, tmp_filepath)
local transitioner = Lorenz63Model(; params)
local closer, num_completed = run_partial_operator(
parallel_strategy,
worker,
em -> transitioner(em, t, t0),
ensemble;
reset_state_keys=false,
)
if closer
@debug "Transition: closer did $num_completed"
else
@debug "Transition: worker $worker_id did $num_completed"
end
catch e
return e
end
end
function worker_observer(run_dir, k, params, worker_id, num_workers)
try
local worker = ParallelWorker(num_workers, worker_id)
local ensemble_path = joinpath(run_dir, "ensemble_$(k)_prior")
local ensemble = load_ensemble(ensemble_path)
local tmp_filepath = joinpath(run_dir, "intermediate_obs_$(k)")
local output_filepath = joinpath(run_dir, "ensemble_$(k)_obs_prior")
local parallel_strategy = FileBasedPartial(output_filepath, tmp_filepath)
local transitioner = Lorenz63Model(; params)
local observer = NoisyObserver(get_state_keys(transitioner); params)
xor_seed!(observer, UInt64(worker_id * num_workers - 1))
local closer, num_completed = run_partial_operator(
parallel_strategy, worker, observer, ensemble; reset_state_keys=false
)
if closer
@debug "Observer: closer did $num_completed"
else
@debug "Observer: worker $worker_id did $num_completed"
end
catch e
return e
end
end
end
for observer_type in [:sequential, :not_sequential]
Random.seed!(0x3289745)
xor_seed!(observer, UInt64(0x375ef928))
run_dir = tempname("."; cleanup=false)
@show run_dir
mkpath(run_dir)
save_ensemble(ensemble_initial, joinpath(run_dir, "ensemble_0_posterior"))
logs = []
ensembles = []
@time_msg "Run ensemble parallel" let ensemble = ensemble_initial
t0 = 0.0
push!(ensembles, (; ensemble, t=t0))
for (k, (t, y_obs)) in
enumerate(zip(observation_times, ground_truth.observations))
# Advance ensemble to time t.
worker_tasks = []
for (i, p) in enumerate(worker_ids)
task = remotecall(
Main.worker_transition,
p,
run_dir,
k,
t0,
t,
params,
i,
length(worker_ids),
)
push!(worker_tasks, task)
end
for task in worker_tasks
v = fetch(task)
isa(v, Exception) && throw(v)
end
Main.worker_transition(run_dir, k, t0, t, params, 1, 1)
output_filepath = joinpath(run_dir, "ensemble_$(k)_prior")
ensemble = load_ensemble(output_filepath)
# Take observation at time t.
if observer_type == :sequential
println("Time $t: observer rng $(observer.rng)")
ensemble_obs = observer(ensemble)
println("Time $t: observer rng $(observer.rng)")
else
worker_tasks = []
for (i, p) in enumerate(worker_ids)
task = remotecall(
Main.worker_observer,
p,
run_dir,
k,
params,
i,
length(worker_ids),
)
push!(worker_tasks, task)
end
for task in worker_tasks
v = fetch(task)
isa(v, Exception) && throw(v)
end
Main.worker_observer(run_dir, k, params, 1, 1)
output_filepath = joinpath(run_dir, "ensemble_$(k)_obs_prior")
ensemble_obs = load_ensemble(output_filepath)
end
ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
observer, ensemble_obs
)
println(
"Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
)
# Record.
push!(ensembles, (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t))
# Assimilate observation
log_data = Dict{Symbol,Any}()
(posterior, timing...) = @timed assimilate_data(
filter,
ensemble,
ensemble_obs_clean,
ensemble_obs_noisy,
y_obs,
log_data,
)
log_data[:timing] = timing
println(
"Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
)
ensemble = posterior
save_ensemble(ensemble, joinpath(run_dir, "ensemble_$(k)_posterior"))
# Record.
push!(ensembles, (; ensemble, t))
push!(logs, log_data)
# Let time pass.
t0 = t
end
end
ensembles_parallel = ensembles
# Print out some info to make sure the results are the same.
let
ensemble_obs_noisy_diffs = []
ensemble_obs_clean_diffs = []
ensemble_diffs = []
for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
em = get_ensemble_matrix(e.ensemble)
epm = get_ensemble_matrix(ep.ensemble)
push!(ensemble_diffs, norm(em .- epm))
println("Index $i, t $(e.t) $(ep.t):")
println(" ensemble: $(ensemble_diffs[end])")
if hasfield(typeof(e), :ensemble_obs_noisy)
em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
push!(ensemble_obs_noisy_diffs, norm(em_on .- epm_on))
println(" ensemble_obs_noisy: $(ensemble_obs_noisy_diffs[end])")
em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
push!(ensemble_obs_clean_diffs, norm(em_oc .- epm_oc))
println(" ensemble_obs_clean: $(ensemble_obs_clean_diffs[end])")
println(" noise norm S: $(norm(em_oc .- em_on))")
println(" noise norm P: $(norm(epm_oc .- epm_on))")
end
end
if observer_type == :sequential
@test all(ensemble_diffs .== 0)
@test all(ensemble_obs_noisy_diffs .== 0)
@test all(ensemble_obs_clean_diffs .== 0)
else
@test ensemble_diffs[1] == 0
@test ensemble_obs_clean_diffs[1] == 0
end
end
end
finally
rmprocs(worker_ids)
end;I am here
From worker 3: I am here
From worker 4: I am here
From worker 5: I am here
From worker 2: I am here
run_dir = "./jl_iJKJupS1wz"
Run ensemble parallel: Time 0.0: observer rng Random.MersenneTwister(0xcdc5cd3812486916)
Time 0.0: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 15))
Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 15))
Time 0.1: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 30))
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 30))
Time 0.2: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 45))
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 45))
Time 0.3: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 60))
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 60))
Time 0.4: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 75))
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
12.528420 seconds (14.91 M allocations: 910.650 MiB, 2.13% gc time, 38.75% compilation time: 11% of which was recompilation)
Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
ensemble: 0.0
run_dir = "./jl_wToRyH2ti9"
Run ensemble parallel: Time 0.0 : noise norm 7.3456497269816845
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 6.290795297046686
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 8.460909160858042
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 5.967282285417883
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 6.949697374131521
Time 0.4 : posterior norm 0.0
1.757219 seconds (579.24 k allocations: 31.761 MiB, 14.40% compilation time)
Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 10.77955745270178
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 7.3456497269816845
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 8.412224414893354
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 6.290795297046686
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 8.6170080073667
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 8.460909160858042
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 9.902552624691458
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 5.967282285417883
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 13.768784136081246
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 6.949697374131521
Index 11, t 0.4 0.4:
ensemble: 0.0
Pmap-based parallelism.
Similar, but only parallelizes transition operator to make it easier to read.
worker_ids = addprocs(4; exeflags="--project=$(Base.active_project())")
try
@everywhere worker_ids $worker_initial_imports
for distributed_type in [:pmap, :distributed_for, :asyncmap]
Random.seed!(0x3289745)
xor_seed!(observer, UInt64(0x375ef928))
ensembles_parallel =
let transitioner = DistributedOperator(transitioner, Val(:pmap))
logs = []
ensembles = []
@time_msg "Run ensemble parallel" let ensemble = ensemble_initial
t0 = 0.0
push!(ensembles, (; ensemble, t=t0))
i_obs = 2
for (t, y_obs) in zip(observation_times, ground_truth.observations)
# Advance ensemble to time t.
ensemble = transitioner(ensemble, t0, t; inplace=false)
# Get observation at time t.
ensemble_obs_clean = ensembles_sequential[i_obs].ensemble_obs_clean
ensemble_obs_noisy = ensembles_sequential[i_obs].ensemble_obs_noisy
i_obs += 2
println(
"Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
)
# Record.
push!(
ensembles,
(; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t),
)
# Assimilate observation
log_data = Dict{Symbol,Any}()
(posterior, timing...) = @timed assimilate_data(
filter,
ensemble,
ensemble_obs_clean,
ensemble_obs_noisy,
y_obs,
log_data,
)
log_data[:timing] = timing
println(
"Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
)
ensemble = posterior
# Record.
push!(ensembles, (; ensemble, t))
push!(logs, log_data)
# Let time pass.
t0 = t
end
end
ensembles
end
# Print out some info to make sure the results are the same.
let
ensemble_obs_noisy_diffs = []
ensemble_obs_clean_diffs = []
ensemble_diffs = []
for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
em = get_ensemble_matrix(e.ensemble)
epm = get_ensemble_matrix(ep.ensemble)
push!(ensemble_diffs, norm(em .- epm))
println("Index $i, t $(e.t) $(ep.t):")
println(" ensemble: $(ensemble_diffs[end])")
if hasfield(typeof(e), :ensemble_obs_noisy)
em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
push!(ensemble_obs_noisy_diffs, norm(em_on .- epm_on))
println(" ensemble_obs_noisy: $(ensemble_obs_noisy_diffs[end])")
em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
push!(ensemble_obs_clean_diffs, norm(em_oc .- epm_oc))
println(" ensemble_obs_clean: $(ensemble_obs_clean_diffs[end])")
println(" noise norm S: $(norm(em_oc .- em_on))")
println(" noise norm P: $(norm(epm_oc .- epm_on))")
end
end
@test all(ensemble_diffs .== 0)
@test all(ensemble_obs_noisy_diffs .== 0)
@test all(ensemble_obs_clean_diffs .== 0)
end
end
finally
rmprocs(worker_ids)
end;Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
3.795867 seconds (3.03 M allocations: 152.764 MiB, 1.72% gc time, 25.79% compilation time)
Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
ensemble: 0.0
Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
0.003856 seconds (4.02 k allocations: 1.456 MiB)
Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
ensemble: 0.0
Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
0.003333 seconds (3.94 k allocations: 1.447 MiB)
Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 0.0
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
ensemble: 0.0
Now parallelizes both the transition operator and the observation operator.
worker_ids = addprocs(6; exeflags="--project=$(Base.active_project())")
ensembles_parallel = try
worker_pool = WorkerPool(worker_ids[3:end])
@everywhere worker_ids $worker_initial_imports
Random.seed!(0x3289745)
xor_seed!(observer, UInt64(0x375ef928))
ensembles =
let transitioner = DistributedOperator(transitioner, worker_pool, Val(:pmap)),
observer = DistributedOperator(observer, worker_pool)
logs = []
ensembles = []
@time_msg "Run ensemble parallel" let ensemble = ensemble_initial
t0 = 0.0
push!(ensembles, (; ensemble, t=t0))
for (t, y_obs) in zip(observation_times, ground_truth.observations)
# Advance ensemble to time t.
ensemble = transitioner(ensemble, t0, t; inplace=false)
# Take observation at time t.
ensemble_obs = observer(ensemble)
ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
observer, ensemble_obs
)
println(
"Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
)
# Record.
push!(
ensembles,
(; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t),
)
# Assimilate observation
log_data = Dict{Symbol,Any}()
(posterior, timing...) = @timed assimilate_data(
filter,
ensemble,
ensemble_obs_clean,
ensemble_obs_noisy,
y_obs,
log_data,
)
log_data[:timing] = timing
println(
"Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
)
ensemble = posterior
# Record.
push!(ensembles, (; ensemble, t))
push!(logs, log_data)
# Let time pass.
t0 = t
end
end
ensembles
end
finally
rmprocs(worker_ids)
end;Run ensemble parallel: Time 0.0 : noise norm 8.055444785139724
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.866912972393284
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 10.651888064108782
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 11.613376783455594
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 9.29422701020639
Time 0.4 : posterior norm 0.0
7.100976 seconds (2.56 M allocations: 130.748 MiB, 1 lock conflict, 26.38% compilation time)
Error check
Print out some info to make sure the results are the same, except the noise should be different.
for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
em = get_ensemble_matrix(e.ensemble)
epm = get_ensemble_matrix(ep.ensemble)
println("Index $i, t $(e.t) $(ep.t):")
println(" ensemble: $(norm(em .- epm))")
if hasfield(typeof(e), :ensemble_obs_noisy)
em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
println(" ensemble_obs_noisy: $(norm(em_on .- epm_on))")
em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
println(" ensemble_obs_clean: $(norm(em_oc .- epm_oc))")
println(" noise norm S: $(norm(em_oc .- em_on))")
println(" noise norm P: $(norm(epm_oc .- epm_on))")
end
end;Index 1, t 0.0 0.0:
ensemble: 0.0
Index 2, t 0.0 0.0:
ensemble: 0.0
ensemble_obs_noisy: 13.138243446144097
ensemble_obs_clean: 0.0
noise norm S: 8.75678567387657
noise norm P: 8.055444785139724
Index 3, t 0.0 0.0:
ensemble: 0.0
Index 4, t 0.1 0.1:
ensemble: 0.0
ensemble_obs_noisy: 8.46635635399443
ensemble_obs_clean: 0.0
noise norm S: 5.659188645248907
noise norm P: 5.866912972393284
Index 5, t 0.1 0.1:
ensemble: 0.0
Index 6, t 0.2 0.2:
ensemble: 0.0
ensemble_obs_noisy: 12.073673140448676
ensemble_obs_clean: 0.0
noise norm S: 7.503346891502936
noise norm P: 10.651888064108782
Index 7, t 0.2 0.2:
ensemble: 0.0
Index 8, t 0.3 0.3:
ensemble: 0.0
ensemble_obs_noisy: 12.430216886034222
ensemble_obs_clean: 0.0
noise norm S: 6.383999946279442
noise norm P: 11.613376783455594
Index 9, t 0.3 0.3:
ensemble: 0.0
Index 10, t 0.4 0.4:
ensemble: 0.0
ensemble_obs_noisy: 14.71019879474786
ensemble_obs_clean: 0.0
noise norm S: 10.91470098952389
noise norm P: 9.29422701020639
Index 11, t 0.4 0.4:
ensemble: 0.0
The end!
Note that this cleanup only needs to be run in automated contexts. The unregistered packages can cause issues.
try
Pkg.rm("Lorenz63")
catch e
@warn e
end Updating `~/work/Ensembles.jl/Ensembles.jl/examples/parallel/Project.toml`
[72db8b11] - Lorenz63 v0.1.0 `https://github.com/milankl/Lorenz63.jl#15220a7#master`
Updating `~/work/Ensembles.jl/Ensembles.jl/examples/parallel/Manifest.toml`
[72db8b11] - Lorenz63 v0.1.0 `https://github.com/milankl/Lorenz63.jl#15220a7#master`
This page was generated using Literate.jl.