implement multithreaded runner for tidepool

This commit is contained in:
tali 2023-04-14 15:29:40 -04:00
parent 8b60c03232
commit d2e315d73b
4 changed files with 459 additions and 31 deletions

282
Cargo.lock generated
View File

@ -14,6 +14,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.3.0"
@ -99,6 +108,21 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b"
dependencies = [
"iana-time-zone",
"js-sys",
"num-integer",
"num-traits",
"time",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "clap"
version = "4.2.2"
@ -141,12 +165,28 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a2dd5a6fe8c6e3502f568a6353e5273bbb15193ad9a89e457b9970798efbea1"
[[package]]
name = "codespan-reporting"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e"
dependencies = [
"termcolor",
"unicode-width",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "core-foundation-sys"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]]
name = "ctrlc"
version = "3.2.5"
@ -157,6 +197,50 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "cxx"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f61f1b6389c3fe1c316bf8a4dccc90a38208354b330925bce1f74a6c4756eb93"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cee708e8962df2aeb38f594aae5d827c022b6460ac71a7a3e2c3c2aae5a07b"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn 2.0.15",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7944172ae7e4068c533afbb984114a56c46e9ccddda550499caa222902c7f7bb"
[[package]]
name = "cxxbridge-macro"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.15",
]
[[package]]
name = "errno"
version = "0.3.1"
@ -197,7 +281,7 @@ checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
@ -221,12 +305,45 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "iana-time-zone"
version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0722cd7114b7de04316e7ea5456a0bbb20e4adb46fd27a3697adb812cff0f37c"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
dependencies = [
"cxx",
"cxx-build",
]
[[package]]
name = "indexmap"
version = "1.9.3"
@ -243,7 +360,7 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
@ -254,7 +371,7 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
@ -266,6 +383,15 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "js-sys"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -278,6 +404,15 @@ version = "0.2.141"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
[[package]]
name = "link-cplusplus"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.1"
@ -346,6 +481,35 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi 0.2.6",
"libc",
]
[[package]]
name = "once_cell"
version = "1.17.1"
@ -471,6 +635,12 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "scratch"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "serde"
version = "1.0.160"
@ -560,6 +730,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]]
name = "thread_local"
version = "1.1.7"
@ -575,10 +754,12 @@ name = "tidepool"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"ctrlc",
"fish",
"mino",
"num_cpus",
"rand",
"rand_xoshiro",
"serde",
@ -588,6 +769,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "toml"
version = "0.7.3"
@ -690,6 +882,12 @@ version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]]
name = "unicode-width"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "utf8parse"
version = "0.2.1"
@ -708,12 +906,72 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 1.0.109",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "winapi"
version = "0.3.9"
@ -730,12 +988,30 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.0",
]
[[package]]
name = "windows-sys"
version = "0.45.0"

View File

@ -21,3 +21,5 @@ serde_json = { version = "1.0" }
clap = { version = "4.0", features = ["derive"] }
anyhow = { version = "1.0" }
ctrlc = { version = "3.2" }
num_cpus = { version = "1.15" }
chrono = { version = "0.4" }

View File

@ -28,7 +28,7 @@ pub struct CliArgs {
pub output_dir: Option<PathBuf>,
/// Stop after this many runs (multi-run mode only).
#[arg(short = 'n', long, group = "multi")]
pub count: Option<usize>,
pub count: Option<u32>,
/// Run this many jobs in parallel (multi-run mode only).
#[arg(short = 'j', long, group = "multi")]
pub jobs: Option<usize>,
@ -40,7 +40,7 @@ pub struct CliArgs {
pub io: IoArgs,
}
#[derive(Debug, Args)]
#[derive(Clone, Debug, Args)]
pub struct OutputDataArgs {
/// Generate profiling data in the output.
#[arg(short = 'P', long)]
@ -50,7 +50,7 @@ pub struct OutputDataArgs {
pub list_moves: bool,
}
#[derive(Debug, Args)]
#[derive(Clone, Debug, Args)]
pub struct IoArgs {
/// Disable printing extraneous text like a progress bar.
#[arg(short = 'q', long)]
@ -79,7 +79,7 @@ pub struct SingleRun {
pub struct MultiRun {
pub config_file: PathBuf,
pub output_dir: PathBuf,
pub count: Option<usize>,
pub count: Option<u32>,
pub jobs: Option<usize>,
pub data: OutputDataArgs,
pub io: IoArgs,

View File

@ -1,9 +1,11 @@
use anyhow::{Context as _, Result};
use rand::RngCore as _;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::path::Path;
use std::sync::{atomic, mpsc, Arc};
use std::time::{Duration, Instant};
use tidepool::output::SummaryStats;
use fish::bot;
use tidepool::{cli, config, output, sim};
@ -23,9 +25,10 @@ fn main() -> Result<()> {
#[derive(Debug)]
enum Msg {
Interrupt,
Heartbeat(u64),
Status(u64, Box<Status>),
Done(u64, Box<output::Output>),
Heartbeat(usize),
Shutdown(usize),
Status(usize, Box<Status>),
Output(usize, Box<output::Output>),
}
#[derive(Debug)]
@ -68,25 +71,27 @@ fn single(args: cli::SingleRun) -> Result<()> {
match msg {
Msg::Interrupt => {
eprintln!("\ncaught interrupt.");
tracing::debug!("interrupted");
exit_early.store(true, atomic::Ordering::Relaxed);
}
Msg::Heartbeat(_id) => {
tracing::debug!("heartbeat");
}
Msg::Status(_id, status) => {
if exit_early.load(atomic::Ordering::Relaxed) {
continue;
}
write_progress_bar(
&args.io,
status.pieces,
status.lines_left,
config.game.goal,
status.time,
)?;
Msg::Shutdown(_id) => {
// only used by `run_simulations()`
unreachable!()
}
Msg::Done(_id, output) => {
Msg::Status(_id, status) => {
if !exit_early.load(atomic::Ordering::Relaxed) {
print_single_progress(
&args.io,
status.pieces,
status.lines_left,
config.game.goal,
status.time,
)?;
}
}
Msg::Output(_id, output) => {
// dropping rx early does two things here: 1) removing any of the 'break's
// in this block become a compile error, 2) interrupts will not be able to
// be sent so the ctrl-c handler will abort the program
@ -108,8 +113,74 @@ fn single(args: cli::SingleRun) -> Result<()> {
}
fn multi(args: cli::MultiRun) -> Result<()> {
let _config = parse_config_file(&args.config_file)?;
panic!("multi-run mode not implemented yet")
let config = parse_config_file(&args.config_file)?;
let (tx, rx) = create_mailbox();
let tasks = args.count.unwrap_or(u32::MAX);
let tasks = Arc::new(atomic::AtomicI64::new(tasks as i64));
let exit_early = Arc::new(atomic::AtomicBool::new(false));
let n_jobs = args.jobs.unwrap_or_else(num_cpus::get);
let mut jobs = HashMap::with_capacity(n_jobs);
let mut completed = SummaryStats::<usize>::new();
tracing::debug!("spawning {n_jobs} jobs");
for id in 0..n_jobs {
jobs.insert(id, None);
let tx2 = tx.clone();
let tasks2 = tasks.clone();
let exit_early2 = exit_early.clone();
let config2 = config.clone();
let data_args = args.data.clone();
std::thread::spawn(move || {
run_simulations(&data_args, config2, id, tx2, tasks2, exit_early2);
});
}
while let Ok(msg) = rx.recv() {
tracing::trace!(msg = debug(&msg));
match msg {
Msg::Interrupt => {
eprintln!("\ncaught interrupt.");
tasks.store(0, atomic::Ordering::Relaxed);
exit_early.store(true, atomic::Ordering::Relaxed);
}
Msg::Heartbeat(id) => {
tracing::debug!("heartbeat {id}");
}
Msg::Shutdown(id) => {
tracing::debug!("shutdown {id}");
assert!(jobs.remove(&id).is_some(), "shutdown {id}");
if jobs.is_empty() {
break;
}
}
Msg::Status(id, status) => {
*jobs.get_mut(&id).unwrap() = Some(status);
if !exit_early.load(atomic::Ordering::Relaxed) {
print_multi_progress(&args.io, &jobs, &completed)?;
}
}
Msg::Output(_id, output) => {
if output.cleared == config.game.goal {
completed.insert(output.pieces);
}
write_output_in_dir(&args.output_dir, &output)?;
if !exit_early.load(atomic::Ordering::Relaxed) {
print_multi_progress(&args.io, &jobs, &completed)?;
}
}
}
}
if !args.io.quiet {
eprintln!();
}
Ok(())
}
fn parse_config_file(path: &Path) -> Result<config::Config> {
@ -151,7 +222,26 @@ fn write_output(io_args: &cli::IoArgs, path: Option<&Path>, output: &output::Out
}
}
fn write_progress_bar(
fn write_output_in_dir(dir_path: &Path, output: &output::Output) -> Result<()> {
let date: chrono::DateTime<chrono::Local> = std::time::SystemTime::now().into();
std::fs::create_dir_all(dir_path).context("error creating directory to store output")?;
let goal = output.config.goal;
let pieces = output.pieces;
let incomplete = if output.cleared < goal { "I-" } else { "" };
let date = date.format("%Y-%m-%d_%H-%M-%S");// "YYYYmmdd-HHMMSS";
let file_name = format!("{goal}L-{incomplete}{pieces}p-{date}.json");
let io_args = cli::IoArgs {
noninteractive: true,
quiet: true,
};
let path = dir_path.join(file_name);
write_output(&io_args, Some(&path), output)
}
fn print_single_progress(
io_args: &cli::IoArgs,
pieces: usize,
lines_left: usize,
@ -189,6 +279,43 @@ fn write_progress_bar(
}
}
fn print_multi_progress(
io_args: &cli::IoArgs,
jobs: &HashMap<usize, Option<Box<Status>>>,
completed: &SummaryStats<usize>,
) -> std::io::Result<()> {
if io_args.quiet {
return Ok(());
}
let writer = std::io::stderr().lock();
let mut writer = std::io::BufWriter::new(writer);
write!(writer, "\rjobs:{}", jobs.len())?;
let mut pps_stats = SummaryStats::new();
for status in jobs.values().filter_map(Option::as_deref) {
let pps = status.pieces as f64 / status.time.as_secs_f64().max(0.001);
pps_stats.insert_f64(pps);
}
if let Some(pps) = pps_stats.avg_f64() {
write!(writer, ", pps(avg):{pps:.2}")?;
}
if completed.count() > 0 {
let count = completed.count();
let min = completed.min().unwrap();
let max = completed.max().unwrap();
let avg = completed.avg().unwrap();
write!(
writer,
", completed:{count}, min:#{min}, max:#{max}, avg:#{avg}"
)?;
}
writer.flush()
}
fn prompt_yn(io_args: &cli::IoArgs, msg: &str) -> bool {
if io_args.noninteractive {
return true;
@ -205,7 +332,7 @@ fn run_simulation(
data_args: &cli::OutputDataArgs,
config: config::Config,
seed: Option<u64>,
id: u64,
id: usize,
tx: mpsc::SyncSender<Msg>,
exit_early: Arc<atomic::AtomicBool>,
) {
@ -265,10 +392,8 @@ fn run_simulation(
}
}
let time_end = Instant::now();
let profile = data_args.profile.then(|| output::Profile {
time: time_end - time_start,
time: Instant::now() - time_start,
});
let output = output::Output {
@ -279,7 +404,32 @@ fn run_simulation(
moves,
profile,
};
if tx.send(Msg::Done(id, output.into())).is_err() {
if tx.send(Msg::Output(id, output.into())).is_err() {
tracing::debug!("nobody received output id={id}");
}
}
fn run_simulations(
data_args: &cli::OutputDataArgs,
config: config::Config,
id: usize,
tx: mpsc::SyncSender<Msg>,
tasks: Arc<atomic::AtomicI64>,
exit_early: Arc<atomic::AtomicBool>,
) {
while tasks.fetch_sub(1, atomic::Ordering::Relaxed) > 0 {
if exit_early.load(atomic::Ordering::Relaxed) {
break;
}
let seed = None;
let config2 = config.clone();
let tx2 = tx.clone();
let exit_early2 = exit_early.clone();
run_simulation(&data_args, config2, seed, id, tx2, exit_early2);
}
if tx.send(Msg::Shutdown(id)).is_err() {
tracing::debug!("nobody received shutdown id={id}");
}
}