Other formats: This can also be accessed as a Jupyter notebook or a plain script.

Parallelization example

Note: The packages for this example are documented in the Project.toml, except for some unregistered packages.

First, we'll set up the environment, which entails installing and importing packages.

# Install unregistered packages.
using Pkg: Pkg

using Ensembles
try
    using Lorenz63: Lorenz63
catch
    Ensembles.install(:Lorenz63)
end

# Define a macro for doing imports to avoid duplicating it for remote processes later on.
macro initial_imports()
    return esc(
        quote
            using Ensembles
            using LinearAlgebra: norm
            using Distributed: addprocs, rmprocs, @everywhere, remotecall, fetch, WorkerPool
            using Test: @test
            using Random: Random

            using Lorenz63: Lorenz63
            ext = Base.get_extension(Ensembles, :Lorenz63Ext)
            using .ext
        end
    )
end

@initial_imports
worker_initial_imports = @macroexpand1 @initial_imports

include("../_utils/time.jl")
@time_msg (macro with 1 method)

Define parameters.

params = Dict(
    "format" => "v0.1",
    "transition" => Dict(
        "sigma" => 10,
        "rho" => 28,
        "beta" => 8 / 3,
        "scaling" => 1,
        "ministep_nt" => missing,
        "ministep_dt" => 0.05,
    ),
    "observation" => Dict("noise_scale" => 2, "timestep_size" => 0.1, "num_timesteps" => 5),
    "ensemble" => Dict(
        "size" => 5,
        "seed" => 9347215,
        "prior" => "gaussian",
        "prior_params" => [0.0, 1.0],
    ),
    "spinup" => Dict("num_timesteps" => 5, "transition_noise_scale" => 0.0),
);

Seed for reproducibility.

Random.seed!(1983745)
Random.TaskLocalRNG()

Make operators.

transitioner = Lorenz63Model(; params)
observer = NoisyObserver(get_state_keys(transitioner); params);

Set seed for ground-truth simulation.

Random.seed!(0xfee55e45)
xor_seed!(observer, UInt64(0x243ecae5))
Random.MersenneTwister(0xcdc5cd3801285adb)

Define observation times

observation_times = let
    step = params["observation"]["timestep_size"]
    length = params["observation"]["num_timesteps"]
    range(; start=0, length, step)
end
0.0:0.1:0.4

Generate synthetic ground-truth observations.

if !(@isdefined ground_truth) || isnothing(ground_truth)
    ground_truth = @time let
        state0 = Dict{Symbol,Any}(:state => randn(3))

        # Set seed for ground-truth simulation.
        Random.seed!(0xfee55e45)
        xor_seed!(observer, UInt64(0x243ecae5))

        # Generate states and observations.
        t0 = 0.0
        states = Vector{Dict{Symbol,Any}}(undef, length(observation_times))
        observations = Vector{Dict{Symbol,Any}}(undef, length(observation_times))
        let state = state0
            for (i, t) in enumerate(observation_times)
                state = transitioner(state, t0, t)
                obs = observer(state)
                states[i] = state
                observations[i] = split_clean_noisy(observer, obs)[2]
                t0 = t
            end
        end
        (; states, observations)
    end
    println("  ^ timing for making ground truth observations")
    ground_truth_states_vec = get_ensemble_matrix([:state], ground_truth.states)
    ground_truth_obs_vec = get_ensemble_matrix([:state], ground_truth.observations)
end;
  0.000271 seconds (720 allocations: 39.727 KiB)
  ^ timing for making ground truth observations

Ensembles

Make initial ensemble.

function generate_ensemble(params::Dict)
    seed = params["ensemble"]["seed"]
    ensemble_size = params["ensemble"]["size"]
    prior_type = params["ensemble"]["prior"]

    members = Vector{Dict{Symbol,Any}}(undef, ensemble_size)
    if prior_type == "gaussian"
        rng = Random.MersenneTwister(seed)
        prior_mean, prior_std = params["ensemble"]["prior_params"]
        for i in 1:ensemble_size
            data = prior_mean .+ prior_std .* randn(rng, 3)
            state = Dict{Symbol,Any}(:state => data)
            members[i] = state
        end
    else
        throw(ArgumentError("Invalid prior type: $prior_type"))
    end

    ensemble = Ensemble(members)
    return ensemble
end;

if !(@isdefined ensemble_initial0) || isnothing(ensemble_initial0)
    ensemble_initial0 = generate_ensemble(params)
end

ensemble_initial0 = generate_ensemble(params);
ensemble_initial = Ensemble(ensemble_initial0, ensemble_initial0.members, [:state]);

t_index_end = params["spinup"]["num_timesteps"]
observation_times = observation_times[1:t_index_end]
ground_truth_observations = ground_truth.observations[1:t_index_end]
transition_noise = params["spinup"]["transition_noise_scale"]
0.0

Data assimilation

Choose filtering algorithm.

filter = nothing

Run sequential algorithm.

if !(@isdefined ensembles_sequential) || isnothing(ensembles_sequential)
    ensembles_sequential =
        let t_index_end = params["spinup"]["num_timesteps"],
            observation_times = observation_times[1:t_index_end],
            ground_truth_observations = ground_truth.observations[1:t_index_end],
            ensemble = ensemble_initial0,
            t0 = 0.0,
            transition_noise = params["spinup"]["transition_noise_scale"]

            Random.seed!(0x3289745)
            xor_seed!(observer, UInt64(0x375ef928))

            logs = []
            ensembles = []
            @time begin
                push!(ensembles, (; ensemble, t=t0))
                for (t, y_obs) in zip(observation_times, ground_truth_observations)
                    # Advance ensemble to time t.
                    ensemble = transitioner(ensemble, t0, t; inplace=false)

                    # Keep ensemble separated.
                    if transition_noise != 0
                        for em in ensemble.members
                            em[:state] .+= transition_noise .* Random.randn(3)
                        end
                    end

                    # Take observation at time t.
                    ensemble_obs = observer(ensemble)
                    ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
                        observer, ensemble_obs
                    )

                    # Record.
                    push!(
                        ensembles, (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t)
                    )

                    # Assimilate observation
                    log_data = Dict{Symbol,Any}()
                    (posterior, timing...) = @timed assimilate_data(
                        filter,
                        ensemble,
                        ensemble_obs_clean,
                        ensemble_obs_noisy,
                        y_obs,
                        log_data,
                    )
                    log_data[:timing] = timing
                    ensemble = posterior

                    # Record.
                    push!(ensembles, (; ensemble, t))
                    push!(logs, log_data)

                    # Let time pass.
                    t0 = t
                end
            end
            println("  ^ timing for making initial ensemble")
            ensembles
        end
end
11-element Vector{Any}:
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), t = 0.0)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [0.04603820840751882, -1.3995079064480316, 3.1064010668168405]), Dict(:state => [0.6387341207031547, -0.0745257051949395, -2.0722870493214565]), Dict(:state => [4.45600019715385, 4.125025252008753, -2.4551226991712434]), Dict(:state => [4.389604011170066, -2.901761742663583, 2.24532999576864]), Dict(:state => [0.030306789729348138, 0.5132203910313592, -2.088199932824817])], [:state], true), t = 0.0)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-1.736339381960826, -1.8784568482587118, 1.8459595556154285]), Dict(:state => [-0.4140897015364231, -0.5702717353555934, 0.26195575184977354]), Dict(:state => [0.17866055001613776, 0.32981925832349, 0.09156408660254897]), Dict(:state => [-0.017837671414905457, -2.0503450137478305, 2.1972392296768795]), Dict(:state => [0.25904583722558483, -0.93970105340835, -0.24903577911250885])], [:state], true), t = 0.0)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-4.845823883362617, -6.740747078305999, 4.896686998829272]), Dict(:state => [-2.4866847524973013, -3.2307253486030625, -2.4653437662059767]), Dict(:state => [-0.37704949460747994, -0.5794202086845686, -2.1898990495012196]), Dict(:state => [-1.2454472997273869, -3.4518522381898364, 0.9719789016195703]), Dict(:state => [-0.023503811697042853, -2.095001087169096, -1.51795729836139])], [:state], true), t = 0.1)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-3.7490930573924404, -7.7372305685584974, 2.493379490937681]), Dict(:state => [-1.0410628825132424, -2.2276418145893184, 0.2837674887331014]), Dict(:state => [0.5269424262050609, 1.1422348705162078, 0.09151590588883647]), Dict(:state => [-1.833894124222056, -4.132177007609303, 1.9391933645569985]), Dict(:state => [-0.5236530452177508, -1.3121240329126123, -0.17359625340823812])], [:state], true), t = 0.1)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-13.733800750620576, -23.27577240052977, 10.680949837348987]), Dict(:state => [-5.178743142257742, -4.873524931989422, 2.026544230158968]), Dict(:state => [2.8343717541774494, 3.3955955658720205, -1.7772854626910217]), Dict(:state => [-7.952979433071585, -15.192456317794521, 3.6538489542267816]), Dict(:state => [-1.1781142523264245, -2.4624681857570634, -3.0080783793092367])], [:state], true), t = 0.2)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-10.745227305455062, -21.05343582123514, 11.044681336574117]), Dict(:state => [-3.323185978531821, -7.1807515726935875, 1.0708515410018824]), Dict(:state => [1.7064300552657459, 3.7140529594083396, 0.2947862000388703]), Dict(:state => [-5.806243310198203, -12.170791387607581, 4.115887181098772]), Dict(:state => [-1.864968574548437, -4.088790871003409, 0.13541808673726347])], [:state], true), t = 0.2)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-20.514527403795405, -20.567850255497483, 43.9858016954963]), Dict(:state => [-9.180411181801038, -20.326064684157796, 8.541376817627015]), Dict(:state => [4.338717703246138, 15.662419456666674, 3.453241966592169]), Dict(:state => [-12.96418204337131, -25.04221924503085, 22.492283535359167]), Dict(:state => [-6.733558885243093, -11.263998276796784, 5.510643972051675])], [:state], true), t = 0.3)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-19.211467562434432, -19.31952964709035, 42.57587716913618]), Dict(:state => [-10.13990965703818, -20.473880678696002, 8.89161025955345]), Dict(:state => [5.464807908134707, 11.681983543353084, 2.539670197949465]), Dict(:state => [-15.254763535219917, -26.21722434018124, 22.15547925020126]), Dict(:state => [-6.003581619242793, -12.811519454462218, 2.8970143817459286])], [:state], true), t = 0.3)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), ensemble_obs_clean = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), ensemble_obs_noisy = Ensemble(Dict{Symbol, Any}[Dict(:state => [-12.762769302036986, 6.205844927525672, 37.556836446505926]), Dict(:state => [-13.470954149382568, -20.679763679241084, 38.651355768958695]), Dict(:state => [16.50149305867902, 23.465322886668933, 21.20681405584041]), Dict(:state => [-19.18221088362932, -5.347099604958038, 49.03058203353198]), Dict(:state => [-12.260496022701869, -26.038698125255028, 21.496989669871404])], [:state], true), t = 0.4)
 (ensemble = Ensemble(Dict{Symbol, Any}[Dict(:state => [-8.239275168852691, 6.5918135288015325, 39.31309588366235]), Dict(:state => [-19.699050910444964, -21.50037827832076, 41.90983422111753]), Dict(:state => [15.107370704192729, 26.90565870943407, 20.395859406766434]), Dict(:state => [-16.770587848273813, -5.274913180791662, 47.10543204904237]), Dict(:state => [-16.137354407049678, -27.607885483019007, 23.471361995084457])], [:state], true), t = 0.4)

File-based parallelism

Same assimilation algorithm, but now the transition and observations are done in parallel.

worker_ids = addprocs(4; exeflags="--project=$(Base.active_project())")
worker_ids_driver = vcat([1], worker_ids)
try
    @everywhere worker_ids_driver begin
        println("I am here")
        $worker_initial_imports
        function worker_transition(run_dir, k, t, t0, params, worker_id, num_workers)
            try
                local worker = ParallelWorker(num_workers, worker_id)
                local ensemble_path = joinpath(run_dir, "ensemble_$(k-1)_posterior")
                local ensemble = load_ensemble(ensemble_path)
                local tmp_filepath = joinpath(run_dir, "intermediate_trans_$(k-1)_to_$(k)")
                local output_filepath = joinpath(run_dir, "ensemble_$(k)_prior")
                local parallel_strategy = FileBasedPartial(output_filepath, tmp_filepath)
                local transitioner = Lorenz63Model(; params)
                local closer, num_completed = run_partial_operator(
                    parallel_strategy,
                    worker,
                    em -> transitioner(em, t, t0),
                    ensemble;
                    reset_state_keys=false,
                )
                if closer
                    @debug "Transition: closer did $num_completed"
                else
                    @debug "Transition: worker $worker_id did $num_completed"
                end
            catch e
                return e
            end
        end

        function worker_observer(run_dir, k, params, worker_id, num_workers)
            try
                local worker = ParallelWorker(num_workers, worker_id)
                local ensemble_path = joinpath(run_dir, "ensemble_$(k)_prior")
                local ensemble = load_ensemble(ensemble_path)
                local tmp_filepath = joinpath(run_dir, "intermediate_obs_$(k)")
                local output_filepath = joinpath(run_dir, "ensemble_$(k)_obs_prior")
                local parallel_strategy = FileBasedPartial(output_filepath, tmp_filepath)
                local transitioner = Lorenz63Model(; params)
                local observer = NoisyObserver(get_state_keys(transitioner); params)
                xor_seed!(observer, UInt64(worker_id * num_workers - 1))
                local closer, num_completed = run_partial_operator(
                    parallel_strategy, worker, observer, ensemble; reset_state_keys=false
                )
                if closer
                    @debug "Observer: closer did $num_completed"
                else
                    @debug "Observer: worker $worker_id did $num_completed"
                end
            catch e
                return e
            end
        end
    end

    for observer_type in [:sequential, :not_sequential]
        Random.seed!(0x3289745)
        xor_seed!(observer, UInt64(0x375ef928))

        run_dir = tempname("."; cleanup=false)
        @show run_dir
        mkpath(run_dir)
        save_ensemble(ensemble_initial, joinpath(run_dir, "ensemble_0_posterior"))

        logs = []
        ensembles = []
        @time_msg "Run ensemble parallel" let ensemble = ensemble_initial
            t0 = 0.0
            push!(ensembles, (; ensemble, t=t0))
            for (k, (t, y_obs)) in
                enumerate(zip(observation_times, ground_truth.observations))
                # Advance ensemble to time t.
                worker_tasks = []
                for (i, p) in enumerate(worker_ids)
                    task = remotecall(
                        Main.worker_transition,
                        p,
                        run_dir,
                        k,
                        t0,
                        t,
                        params,
                        i,
                        length(worker_ids),
                    )
                    push!(worker_tasks, task)
                end

                for task in worker_tasks
                    v = fetch(task)
                    isa(v, Exception) && throw(v)
                end
                Main.worker_transition(run_dir, k, t0, t, params, 1, 1)
                output_filepath = joinpath(run_dir, "ensemble_$(k)_prior")
                ensemble = load_ensemble(output_filepath)

                # Take observation at time t.
                if observer_type == :sequential
                    println("Time $t: observer rng $(observer.rng)")
                    ensemble_obs = observer(ensemble)
                    println("Time $t: observer rng $(observer.rng)")
                else
                    worker_tasks = []
                    for (i, p) in enumerate(worker_ids)
                        task = remotecall(
                            Main.worker_observer,
                            p,
                            run_dir,
                            k,
                            params,
                            i,
                            length(worker_ids),
                        )
                        push!(worker_tasks, task)
                    end

                    for task in worker_tasks
                        v = fetch(task)
                        isa(v, Exception) && throw(v)
                    end
                    Main.worker_observer(run_dir, k, params, 1, 1)
                    output_filepath = joinpath(run_dir, "ensemble_$(k)_obs_prior")
                    ensemble_obs = load_ensemble(output_filepath)
                end

                ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
                    observer, ensemble_obs
                )
                println(
                    "Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
                )

                # Record.
                push!(ensembles, (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t))

                # Assimilate observation
                log_data = Dict{Symbol,Any}()
                (posterior, timing...) = @timed assimilate_data(
                    filter,
                    ensemble,
                    ensemble_obs_clean,
                    ensemble_obs_noisy,
                    y_obs,
                    log_data,
                )
                log_data[:timing] = timing
                println(
                    "Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
                )
                ensemble = posterior
                save_ensemble(ensemble, joinpath(run_dir, "ensemble_$(k)_posterior"))

                # Record.
                push!(ensembles, (; ensemble, t))
                push!(logs, log_data)

                # Let time pass.
                t0 = t
            end
        end
        ensembles_parallel = ensembles

        # Print out some info to make sure the results are the same.
        let
            ensemble_obs_noisy_diffs = []
            ensemble_obs_clean_diffs = []
            ensemble_diffs = []
            for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
                em = get_ensemble_matrix(e.ensemble)
                epm = get_ensemble_matrix(ep.ensemble)
                push!(ensemble_diffs, norm(em .- epm))
                println("Index $i, t $(e.t) $(ep.t):")
                println("    ensemble: $(ensemble_diffs[end])")
                if hasfield(typeof(e), :ensemble_obs_noisy)
                    em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
                    epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
                    push!(ensemble_obs_noisy_diffs, norm(em_on .- epm_on))
                    println("    ensemble_obs_noisy: $(ensemble_obs_noisy_diffs[end])")

                    em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
                    epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
                    push!(ensemble_obs_clean_diffs, norm(em_oc .- epm_oc))
                    println("    ensemble_obs_clean: $(ensemble_obs_clean_diffs[end])")

                    println("    noise norm S: $(norm(em_oc .- em_on))")
                    println("    noise norm P: $(norm(epm_oc .- epm_on))")
                end
            end
            if observer_type == :sequential
                @test all(ensemble_diffs .== 0)
                @test all(ensemble_obs_noisy_diffs .== 0)
                @test all(ensemble_obs_clean_diffs .== 0)
            else
                @test ensemble_diffs[1] == 0
                @test ensemble_obs_clean_diffs[1] == 0
            end
        end
    end
finally
    rmprocs(worker_ids)
end;
I am here
      From worker 3:	I am here
      From worker 4:	I am here
      From worker 5:	I am here
      From worker 2:	I am here
run_dir = "./jl_iJKJupS1wz"
Run ensemble parallel: Time 0.0: observer rng Random.MersenneTwister(0xcdc5cd3812486916)
Time 0.0: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 15))
Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 15))
Time 0.1: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 30))
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 30))
Time 0.2: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 45))
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 45))
Time 0.3: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 60))
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 60))
Time 0.4: observer rng Random.MersenneTwister(0xcdc5cd3812486916, (0, 1002, 0, 75))
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
 12.528420 seconds (14.91 M allocations: 910.650 MiB, 2.13% gc time, 38.75% compilation time: 11% of which was recompilation)
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
    ensemble: 0.0
run_dir = "./jl_wToRyH2ti9"
Run ensemble parallel: Time 0.0 : noise norm 7.3456497269816845
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 6.290795297046686
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 8.460909160858042
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 5.967282285417883
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 6.949697374131521
Time 0.4 : posterior norm 0.0
  1.757219 seconds (579.24 k allocations: 31.761 MiB, 14.40% compilation time)
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 10.77955745270178
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 7.3456497269816845
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 8.412224414893354
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 6.290795297046686
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 8.6170080073667
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 8.460909160858042
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 9.902552624691458
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 5.967282285417883
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 13.768784136081246
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 6.949697374131521
Index 11, t 0.4 0.4:
    ensemble: 0.0

Pmap-based parallelism.

Similar, but only parallelizes transition operator to make it easier to read.

worker_ids = addprocs(4; exeflags="--project=$(Base.active_project())")
try
    @everywhere worker_ids $worker_initial_imports

    for distributed_type in [:pmap, :distributed_for, :asyncmap]
        Random.seed!(0x3289745)
        xor_seed!(observer, UInt64(0x375ef928))

        ensembles_parallel =
            let transitioner = DistributedOperator(transitioner, Val(:pmap))
                logs = []
                ensembles = []
                @time_msg "Run ensemble parallel" let ensemble = ensemble_initial
                    t0 = 0.0
                    push!(ensembles, (; ensemble, t=t0))
                    i_obs = 2
                    for (t, y_obs) in zip(observation_times, ground_truth.observations)
                        # Advance ensemble to time t.
                        ensemble = transitioner(ensemble, t0, t; inplace=false)

                        # Get observation at time t.
                        ensemble_obs_clean = ensembles_sequential[i_obs].ensemble_obs_clean
                        ensemble_obs_noisy = ensembles_sequential[i_obs].ensemble_obs_noisy
                        i_obs += 2
                        println(
                            "Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
                        )

                        # Record.
                        push!(
                            ensembles,
                            (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t),
                        )

                        # Assimilate observation
                        log_data = Dict{Symbol,Any}()
                        (posterior, timing...) = @timed assimilate_data(
                            filter,
                            ensemble,
                            ensemble_obs_clean,
                            ensemble_obs_noisy,
                            y_obs,
                            log_data,
                        )
                        log_data[:timing] = timing
                        println(
                            "Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
                        )
                        ensemble = posterior

                        # Record.
                        push!(ensembles, (; ensemble, t))
                        push!(logs, log_data)

                        # Let time pass.
                        t0 = t
                    end
                end
                ensembles
            end

        # Print out some info to make sure the results are the same.
        let
            ensemble_obs_noisy_diffs = []
            ensemble_obs_clean_diffs = []
            ensemble_diffs = []
            for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
                em = get_ensemble_matrix(e.ensemble)
                epm = get_ensemble_matrix(ep.ensemble)
                push!(ensemble_diffs, norm(em .- epm))
                println("Index $i, t $(e.t) $(ep.t):")
                println("    ensemble: $(ensemble_diffs[end])")
                if hasfield(typeof(e), :ensemble_obs_noisy)
                    em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
                    epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
                    push!(ensemble_obs_noisy_diffs, norm(em_on .- epm_on))
                    println("    ensemble_obs_noisy: $(ensemble_obs_noisy_diffs[end])")

                    em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
                    epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
                    push!(ensemble_obs_clean_diffs, norm(em_oc .- epm_oc))
                    println("    ensemble_obs_clean: $(ensemble_obs_clean_diffs[end])")

                    println("    noise norm S: $(norm(em_oc .- em_on))")
                    println("    noise norm P: $(norm(epm_oc .- epm_on))")
                end
            end
            @test all(ensemble_diffs .== 0)
            @test all(ensemble_obs_noisy_diffs .== 0)
            @test all(ensemble_obs_clean_diffs .== 0)
        end
    end
finally
    rmprocs(worker_ids)
end;
Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
  3.795867 seconds (3.03 M allocations: 152.764 MiB, 1.72% gc time, 25.79% compilation time)
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
    ensemble: 0.0
Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
  0.003856 seconds (4.02 k allocations: 1.456 MiB)
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
    ensemble: 0.0
Run ensemble parallel: Time 0.0 : noise norm 8.75678567387657
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.659188645248907
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 7.503346891502936
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 6.383999946279442
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 10.91470098952389
Time 0.4 : posterior norm 0.0
  0.003333 seconds (3.94 k allocations: 1.447 MiB)
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 8.75678567387657
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 5.659188645248907
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 7.503346891502936
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 6.383999946279442
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 0.0
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 10.91470098952389
Index 11, t 0.4 0.4:
    ensemble: 0.0

Now parallelizes both the transition operator and the observation operator.

worker_ids = addprocs(6; exeflags="--project=$(Base.active_project())")
ensembles_parallel = try
    worker_pool = WorkerPool(worker_ids[3:end])
    @everywhere worker_ids $worker_initial_imports

    Random.seed!(0x3289745)
    xor_seed!(observer, UInt64(0x375ef928))

    ensembles =
        let transitioner = DistributedOperator(transitioner, worker_pool, Val(:pmap)),
            observer = DistributedOperator(observer, worker_pool)

            logs = []
            ensembles = []
            @time_msg "Run ensemble parallel" let ensemble = ensemble_initial
                t0 = 0.0
                push!(ensembles, (; ensemble, t=t0))
                for (t, y_obs) in zip(observation_times, ground_truth.observations)
                    # Advance ensemble to time t.
                    ensemble = transitioner(ensemble, t0, t; inplace=false)

                    # Take observation at time t.
                    ensemble_obs = observer(ensemble)
                    ensemble_obs_clean, ensemble_obs_noisy = split_clean_noisy(
                        observer, ensemble_obs
                    )
                    println(
                        "Time $t : noise norm $(norm(get_ensemble_matrix(ensemble_obs_clean) .- get_ensemble_matrix(ensemble_obs_noisy)))",
                    )

                    # Record.
                    push!(
                        ensembles,
                        (; ensemble, ensemble_obs_clean, ensemble_obs_noisy, t),
                    )

                    # Assimilate observation
                    log_data = Dict{Symbol,Any}()
                    (posterior, timing...) = @timed assimilate_data(
                        filter,
                        ensemble,
                        ensemble_obs_clean,
                        ensemble_obs_noisy,
                        y_obs,
                        log_data,
                    )
                    log_data[:timing] = timing
                    println(
                        "Time $t : posterior norm $(norm(get_ensemble_matrix(ensemble) .- get_ensemble_matrix(posterior)))",
                    )
                    ensemble = posterior

                    # Record.
                    push!(ensembles, (; ensemble, t))
                    push!(logs, log_data)

                    # Let time pass.
                    t0 = t
                end
            end
            ensembles
        end
finally
    rmprocs(worker_ids)
end;
Run ensemble parallel: Time 0.0 : noise norm 8.055444785139724
Time 0.0 : posterior norm 0.0
Time 0.1 : noise norm 5.866912972393284
Time 0.1 : posterior norm 0.0
Time 0.2 : noise norm 10.651888064108782
Time 0.2 : posterior norm 0.0
Time 0.3 : noise norm 11.613376783455594
Time 0.3 : posterior norm 0.0
Time 0.4 : noise norm 9.29422701020639
Time 0.4 : posterior norm 0.0
  7.100976 seconds (2.56 M allocations: 130.748 MiB, 1 lock conflict, 26.38% compilation time)

Error check

Print out some info to make sure the results are the same, except the noise should be different.

for (i, (e, ep)) in enumerate(zip(ensembles_sequential, ensembles_parallel))
    em = get_ensemble_matrix(e.ensemble)
    epm = get_ensemble_matrix(ep.ensemble)
    println("Index $i, t $(e.t) $(ep.t):")
    println("    ensemble: $(norm(em .- epm))")
    if hasfield(typeof(e), :ensemble_obs_noisy)
        em_on = get_ensemble_matrix(e.ensemble_obs_noisy)
        epm_on = get_ensemble_matrix(ep.ensemble_obs_noisy)
        println("    ensemble_obs_noisy: $(norm(em_on .- epm_on))")

        em_oc = get_ensemble_matrix(e.ensemble_obs_clean)
        epm_oc = get_ensemble_matrix(ep.ensemble_obs_clean)
        println("    ensemble_obs_clean: $(norm(em_oc .- epm_oc))")

        println("    noise norm S: $(norm(em_oc .- em_on))")
        println("    noise norm P: $(norm(epm_oc .- epm_on))")
    end
end;
Index 1, t 0.0 0.0:
    ensemble: 0.0
Index 2, t 0.0 0.0:
    ensemble: 0.0
    ensemble_obs_noisy: 13.138243446144097
    ensemble_obs_clean: 0.0
    noise norm S: 8.75678567387657
    noise norm P: 8.055444785139724
Index 3, t 0.0 0.0:
    ensemble: 0.0
Index 4, t 0.1 0.1:
    ensemble: 0.0
    ensemble_obs_noisy: 8.46635635399443
    ensemble_obs_clean: 0.0
    noise norm S: 5.659188645248907
    noise norm P: 5.866912972393284
Index 5, t 0.1 0.1:
    ensemble: 0.0
Index 6, t 0.2 0.2:
    ensemble: 0.0
    ensemble_obs_noisy: 12.073673140448676
    ensemble_obs_clean: 0.0
    noise norm S: 7.503346891502936
    noise norm P: 10.651888064108782
Index 7, t 0.2 0.2:
    ensemble: 0.0
Index 8, t 0.3 0.3:
    ensemble: 0.0
    ensemble_obs_noisy: 12.430216886034222
    ensemble_obs_clean: 0.0
    noise norm S: 6.383999946279442
    noise norm P: 11.613376783455594
Index 9, t 0.3 0.3:
    ensemble: 0.0
Index 10, t 0.4 0.4:
    ensemble: 0.0
    ensemble_obs_noisy: 14.71019879474786
    ensemble_obs_clean: 0.0
    noise norm S: 10.91470098952389
    noise norm P: 9.29422701020639
Index 11, t 0.4 0.4:
    ensemble: 0.0

The end!

Note that this cleanup only needs to be run in automated contexts. The unregistered packages can cause issues.

try
    Pkg.rm("Lorenz63")
catch e
    @warn e
end
    Updating `~/work/Ensembles.jl/Ensembles.jl/examples/parallel/Project.toml`
  [72db8b11] - Lorenz63 v0.1.0 `https://github.com/milankl/Lorenz63.jl#15220a7#master`
    Updating `~/work/Ensembles.jl/Ensembles.jl/examples/parallel/Manifest.toml`
  [72db8b11] - Lorenz63 v0.1.0 `https://github.com/milankl/Lorenz63.jl#15220a7#master`

This page was generated using Literate.jl.