feat(parallel): parallel execution of tasks

This commit is contained in:
Ray Sinurat 2026-02-06 03:13:06 -06:00
parent 9490328bfb
commit 19b82e6313
27 changed files with 1857 additions and 539 deletions

19
Cargo.lock generated
View file

@ -362,6 +362,17 @@ dependencies = [
"rustix 1.1.3",
]
[[package]]
name = "async-recursion"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "async-signal"
version = "0.2.13"
@ -1034,6 +1045,7 @@ dependencies = [
"dirs",
"doot-core",
"doot-lang",
"glob",
"indicatif",
"ratatui",
"serde",
@ -1050,7 +1062,6 @@ version = "0.1.0"
dependencies = [
"age",
"anyhow",
"async-fs",
"blake3",
"dirs",
"doot-lang",
@ -1059,11 +1070,11 @@ dependencies = [
"indicatif",
"minijinja",
"os_info",
"rayon",
"regex-lite",
"serde",
"serde_json",
"similar",
"smol",
"thiserror 2.0.18",
"toml 0.8.23",
"tracing",
@ -1078,11 +1089,11 @@ dependencies = [
"age",
"anyhow",
"ariadne",
"async-fs",
"async-net",
"async-recursion",
"blake3",
"chumsky",
"dirs",
"futures-lite 2.6.1",
"glob",
"hostname",
"indexmap",

View file

@ -19,8 +19,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.8"
smol = "2"
async-fs = "2"
async-net = "2"
async-recursion = "1"
futures-lite = "2"
surf = "2"
rayon = "1"
age = "0.10"
@ -36,3 +36,8 @@ thiserror = "2"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }
[profile.release]
lto = true
codegen-units = 1
strip = true

View file

@ -21,5 +21,6 @@ thiserror.workspace = true
anyhow.workspace = true
dirs.workspace = true
blake3.workspace = true
glob = "0.3"
tracing.workspace = true
tracing-subscriber.workspace = true

View file

@ -2,15 +2,18 @@ use super::{find_config_file, parse_config, type_check};
use doot_core::state::{StateStore, SyncStatus};
use doot_core::{Config, Deployer};
use doot_lang::ast::HookStage;
use doot_lang::evaluator::{DotfileConfig, HookConfig};
use doot_lang::evaluator::{DotfileConfig, DotfilesPattern, DotfilesSource, HookConfig};
use doot_lang::{DotfileConflict, Evaluator, validate_dotfile_targets};
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::HashSet;
use std::io::{self, Write};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Instant;
#[tracing::instrument(skip_all, fields(dry_run, parallel))]
pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(dry_run, prune))]
pub fn run(config_path: Option<PathBuf>, dry_run: bool, prune: bool) -> anyhow::Result<()> {
let start = Instant::now();
let path = find_config_file(config_path)?;
let source = std::fs::read_to_string(&path)?;
@ -20,11 +23,22 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let mut result = evaluator.eval_sync(&program)?;
// Get environment variables to expose to hook scripts
let hook_env = evaluator.get_hook_env();
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
// Expand glob patterns from dotfiles: blocks
let glob_count =
expand_dotfile_patterns(&mut result.dotfiles, &result.dotfile_patterns, &source_dir);
// Merge specializations (explicit dotfile blocks override glob-expanded entries)
if glob_count > 0 {
merge_specializations(&mut result.dotfiles, glob_count);
}
let _total_items = result.dotfiles.len() + result.packages.len();
println!(
"config parsed: {} dotfiles, {} packages",
@ -32,8 +46,6 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
result.packages.len()
);
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
// Validate dotfile targets and get proper execution order
let validation = validate_dotfile_targets(&result.dotfiles, &source_dir);
@ -81,32 +93,27 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
}
}
// Reorder dotfiles based on dependency analysis
let ordered_dotfiles: Vec<DotfileConfig> = validation
.ordered_indices
.iter()
.map(|&i| result.dotfiles[i].clone())
.collect();
let config = Config::new(source_dir.clone())
.dry_run(dry_run)
.parallel(parallel);
let config = Config::new(source_dir.clone()).dry_run(dry_run);
let state_file = config.state_file.clone();
let state = StateStore::new(&state_file);
// Check for conflicts before deploying
let mut to_deploy: Vec<&DotfileConfig> = Vec::new();
let mut conflicts: Vec<(&DotfileConfig, SyncStatus)> = Vec::new();
// Check for conflicts before deploying (track by original index)
let mut deploy_set: HashSet<usize> = HashSet::new();
let mut conflicts: Vec<(usize, SyncStatus)> = Vec::new();
// Track per-file conflicts for directories (file_path, source, target, status)
let mut file_conflicts: Vec<(PathBuf, PathBuf, PathBuf, SyncStatus)> = Vec::new();
for dotfile in &ordered_dotfiles {
for &idx in &validation.ordered_indices {
let dotfile = &result.dotfiles[idx];
let full_source = source_dir.join(&dotfile.source);
let status = state.check_sync_status_with_template(
let status = state.check_sync_status_with_config(
&full_source,
&dotfile.target,
Some(dotfile.template),
None,
Some(&dotfile.permissions),
dotfile.owner.as_deref(),
);
// For directories, check individual files for smarter merging
@ -126,6 +133,10 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
has_changes = true;
tracing::debug!(source = %src.display(), "source changed");
}
SyncStatus::PermissionsChanged => {
has_changes = true;
tracing::debug!(target = %tgt.display(), "permissions changed");
}
SyncStatus::TargetChanged => {
// Target changed but source didn't - keep target, will update state
has_changes = true;
@ -144,20 +155,20 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
}
if has_real_conflicts {
conflicts.push((dotfile, SyncStatus::Conflict));
conflicts.push((idx, SyncStatus::Conflict));
} else if has_changes {
to_deploy.push(dotfile);
deploy_set.insert(idx);
} else {
tracing::debug!(target = %dotfile.target.display(), "synced");
}
} else {
// Single file handling (unchanged)
// Single file handling
match status {
SyncStatus::Synced => {
tracing::debug!(target = %dotfile.target.display(), "synced");
}
SyncStatus::NotDeployed | SyncStatus::TargetMissing => {
to_deploy.push(dotfile);
deploy_set.insert(idx);
}
SyncStatus::SourceChanged => {
println!(
@ -165,13 +176,21 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
dotfile.source.display(),
dotfile.target.display()
);
to_deploy.push(dotfile);
deploy_set.insert(idx);
}
SyncStatus::PermissionsChanged => {
println!(
" [permissions changed] {} -> {}",
dotfile.source.display(),
dotfile.target.display()
);
deploy_set.insert(idx);
}
SyncStatus::TargetChanged => {
conflicts.push((dotfile, status));
conflicts.push((idx, status));
}
SyncStatus::Conflict => {
conflicts.push((dotfile, status));
conflicts.push((idx, status));
}
SyncStatus::SourceMissing => {
tracing::error!(source = %dotfile.source.display(), "source missing");
@ -183,7 +202,8 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
// Handle conflicts
if !conflicts.is_empty() {
println!("\nConflicts detected:");
for (dotfile, status) in &conflicts {
for &(idx, ref status) in &conflicts {
let dotfile = &result.dotfiles[idx];
let status_str = match status {
SyncStatus::TargetChanged => "target changed",
SyncStatus::Conflict => "both changed",
@ -221,15 +241,16 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
match input.trim().to_lowercase().as_str() {
"s" => {
for (dotfile, _) in conflicts {
to_deploy.push(dotfile);
for (idx, _) in conflicts {
deploy_set.insert(idx);
}
}
"t" => {
println!("Skipping conflicted files.");
}
"i" => {
for (dotfile, status) in conflicts {
for (idx, status) in conflicts {
let dotfile = &result.dotfiles[idx];
let status_str = match status {
SyncStatus::TargetChanged => "target changed",
SyncStatus::Conflict => "both changed",
@ -252,7 +273,7 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
match choice.trim().to_lowercase().as_str() {
"s" => {
to_deploy.push(dotfile);
deploy_set.insert(idx);
}
"d" => {
let full_source = source_dir.join(&dotfile.source);
@ -263,14 +284,14 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
let mut confirm = String::new();
io::stdin().read_line(&mut confirm)?;
if confirm.trim().to_lowercase() == "y" {
to_deploy.push(dotfile);
deploy_set.insert(idx);
}
}
"m" => {
let full_source = source_dir.join(&dotfile.source);
if merge_in_editor(&full_source, &dotfile.target)? {
// Source was updated with merged content, deploy it
to_deploy.push(dotfile);
deploy_set.insert(idx);
} else {
println!(" Merge cancelled, keeping target.");
}
@ -290,11 +311,12 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
// Dry-run: show what would be done and exit
if dry_run {
if to_deploy.is_empty() {
if deploy_set.is_empty() {
println!("\n[dry-run] all dotfiles synced, nothing to deploy");
} else {
println!("\n[dry-run] would deploy:");
for dotfile in &to_deploy {
for &idx in &deploy_set {
let dotfile = &result.dotfiles[idx];
println!(
" {} -> {}",
dotfile.source.display(),
@ -337,18 +359,53 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
}
}
// Show packages that would be pruned
{
let configured_names: std::collections::HashSet<String> = result
.packages
.iter()
.filter_map(|p| p.default.clone())
.collect();
let state_for_prune = StateStore::new(&state_file);
let to_prune: Vec<_> = state_for_prune
.get_all_packages()
.iter()
.filter(|(name, _)| !configured_names.contains(*name))
.collect();
if !to_prune.is_empty() {
println!("\n[dry-run] would uninstall removed packages:");
for (name, _) in &to_prune {
println!(" {}", name);
}
}
}
return Ok(());
}
// Run before_deploy hooks
run_hooks(&result.hooks, HookStage::BeforeDeploy, &hook_env)?;
if to_deploy.is_empty() {
if deploy_set.is_empty() {
println!("\nNothing to deploy (all files synced).");
} else {
let mut deployer = Deployer::new(config, result.sandbox);
// Filter parallel batches to only include items in deploy_set
let filtered_batches: Vec<Vec<usize>> = validation
.parallel_batches
.iter()
.map(|batch| {
batch
.iter()
.copied()
.filter(|i| deploy_set.contains(i))
.collect::<Vec<_>>()
})
.filter(|batch| !batch.is_empty())
.collect();
let pb = ProgressBar::new(to_deploy.len() as u64);
let deployer = Deployer::new(config, result.sandbox);
let pb = ProgressBar::new(deploy_set.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} {msg}")
@ -358,9 +415,8 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
pb.set_message("deploying dotfiles");
// Convert to owned for deploy
let dotfiles_to_deploy: Vec<DotfileConfig> = to_deploy.into_iter().cloned().collect();
let deploy_result = deployer.deploy(&dotfiles_to_deploy)?;
let deploy_result =
deployer.deploy_batches(&result.dotfiles, &filtered_batches, Some(&pb))?;
pb.finish_with_message("done");
println!("\ndeployment complete:");
@ -447,9 +503,70 @@ pub fn run(config_path: Option<PathBuf>, dry_run: bool, parallel: bool) -> anyho
run_hooks(&result.hooks, HookStage::AfterPackage, &hook_env)?;
}
// Prune packages removed from config
{
let configured_names: std::collections::HashSet<String> = result
.packages
.iter()
.filter_map(|p| p.default.clone())
.collect();
let state_for_prune = StateStore::new(&state_file);
let to_prune: Vec<(String, String)> = state_for_prune
.get_all_packages()
.iter()
.filter(|(name, _)| !configured_names.contains(*name))
.map(|(name, rec)| (name.clone(), rec.manager.clone()))
.collect();
if !to_prune.is_empty() {
println!("\n{} package(s) removed from config:", to_prune.len());
for (name, _) in &to_prune {
println!(" {}", name);
}
let mut uninstalled = Vec::new();
for (name, mgr_name) in &to_prune {
let should_uninstall = if prune { true } else { prompt_uninstall(name)? };
if should_uninstall {
if let Some(mgr) = doot_core::package::get_package_manager(mgr_name) {
mgr.uninstall(std::slice::from_ref(name))?;
println!("uninstalled {}", name);
uninstalled.push(name.clone());
} else {
tracing::warn!(
package = %name, manager = %mgr_name,
"cannot uninstall: package manager not available"
);
}
}
}
if !uninstalled.is_empty() {
let mut state = StateStore::new(&state_file);
for name in &uninstalled {
state.remove_package(name);
}
state.save()?;
}
}
}
let elapsed = start.elapsed();
println!("\nfinished in {:.2?}", elapsed);
Ok(())
}
fn prompt_uninstall(package: &str) -> anyhow::Result<bool> {
print!("Uninstall {}? [y/N] ", package);
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
Ok(input.trim().eq_ignore_ascii_case("y") || input.trim().eq_ignore_ascii_case("yes"))
}
#[tracing::instrument(skip_all)]
fn show_diff(source: &PathBuf, target: &PathBuf) {
use std::process::Command;
@ -577,3 +694,132 @@ fn merge_in_editor(source: &PathBuf, target: &PathBuf) -> anyhow::Result<bool> {
Ok(false)
}
}
/// Extracts the directory prefix before any wildcard in a glob pattern.
/// "config/*" → "config", "a/b/**/*.rs" → "a/b", "*" → ""
fn glob_base_dir(pattern: &str) -> PathBuf {
let wildcard_pos = pattern.find(['*', '?', '[']);
let prefix = match wildcard_pos {
Some(pos) => &pattern[..pos],
None => pattern,
};
match prefix.rfind('/') {
Some(pos) => PathBuf::from(&prefix[..pos]),
None => PathBuf::new(),
}
}
/// Finds the common path prefix of a list of paths.
fn common_path_prefix(paths: &[PathBuf]) -> PathBuf {
if paths.is_empty() {
return PathBuf::new();
}
let mut prefix = paths[0].parent().unwrap_or(Path::new("")).to_path_buf();
for path in &paths[1..] {
while !path.starts_with(&prefix) {
if !prefix.pop() {
return PathBuf::new();
}
}
}
prefix
}
/// Expands dotfile patterns (from `dotfiles:` blocks) into individual DotfileConfig entries.
/// Returns the number of entries added.
fn expand_dotfile_patterns(
dotfiles: &mut Vec<DotfileConfig>,
patterns: &[DotfilesPattern],
source_dir: &Path,
) -> usize {
let before = dotfiles.len();
for pattern in patterns {
let (sources, base) = match &pattern.source {
DotfilesSource::Pattern(pat) => {
let base_rel = glob_base_dir(pat);
let full_pattern = source_dir.join(pat);
let paths: Vec<PathBuf> = glob::glob(&full_pattern.to_string_lossy())
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.collect();
(paths, source_dir.join(&base_rel))
}
DotfilesSource::Paths(paths) => {
let base = common_path_prefix(paths);
(paths.clone(), base)
}
};
for source_path in sources {
let rel_to_source = source_path.strip_prefix(source_dir).unwrap_or(&source_path);
let suffix = source_path.strip_prefix(&base).unwrap_or(&source_path);
let target = pattern.target_base.join(suffix);
dotfiles.push(DotfileConfig {
source: rel_to_source.to_path_buf(),
target,
template: pattern.template,
permissions: pattern.permissions.clone(),
owner: pattern.owner.clone(),
deploy: pattern.deploy,
link_patterns: pattern.link_patterns.clone(),
copy_patterns: pattern.copy_patterns.clone(),
exclude_paths: vec![],
});
}
}
dotfiles.len() - before
}
/// Merges explicit dotfile blocks into glob-expanded entries.
///
/// Two merge cases:
/// 1. Same target: explicit replaces glob-expanded entry entirely.
/// 2. File inside directory: adds the file's target to the directory entry's exclude_paths.
fn merge_specializations(dotfiles: &mut Vec<DotfileConfig>, glob_count: usize) {
let total = dotfiles.len();
let explicit_end = total - glob_count;
let glob_start = explicit_end;
let mut glob_to_remove: HashSet<usize> = HashSet::new();
// First pass: find same-target replacements
for exp_idx in 0..explicit_end {
for glob_idx in glob_start..total {
if glob_to_remove.contains(&glob_idx) {
continue;
}
let exp_target = &dotfiles[exp_idx].target;
let glob_target = &dotfiles[glob_idx].target;
if exp_target == glob_target {
glob_to_remove.insert(glob_idx);
}
}
}
// Second pass: collect exclude_paths for directory entries
for exp_idx in 0..explicit_end {
for glob_idx in glob_start..total {
if glob_to_remove.contains(&glob_idx) {
continue;
}
let exp_target = dotfiles[exp_idx].target.clone();
let glob_target = &dotfiles[glob_idx].target;
if exp_target.starts_with(glob_target) && exp_target != *glob_target {
dotfiles[glob_idx].exclude_paths.push(exp_target);
}
}
}
// Remove replaced glob entries (reverse order to preserve indices)
let mut remove_sorted: Vec<usize> = glob_to_remove.into_iter().collect();
remove_sorted.sort_unstable_by(|a, b| b.cmp(a));
for idx in remove_sorted {
dotfiles.remove(idx);
}
}

View file

@ -1,7 +1,9 @@
use super::{find_config_file, parse_config, type_check};
use doot_core::deploy::DiffDisplay;
use doot_core::state::{expected_mode_for_file, get_file_mode};
use doot_lang::Evaluator;
use std::path::PathBuf;
use doot_lang::evaluator::PermissionRule;
use std::path::{Path, PathBuf};
#[tracing::instrument(skip_all, fields(all))]
pub fn run(config_path: Option<PathBuf>, all: bool) -> anyhow::Result<()> {
@ -12,7 +14,7 @@ pub fn run(config_path: Option<PathBuf>, all: bool) -> anyhow::Result<()> {
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
@ -27,7 +29,9 @@ pub fn run(config_path: Option<PathBuf>, all: bool) -> anyhow::Result<()> {
continue;
}
let changed = DiffDisplay::has_changes(&source_path, target_path)?;
let content_changed = DiffDisplay::has_changes(&source_path, target_path)?;
let perms_changed = check_permissions_diff(target_path, &dotfile.permissions);
let changed = content_changed || perms_changed;
if changed || all {
has_changes = true;
@ -51,6 +55,11 @@ pub fn run(config_path: Option<PathBuf>, all: bool) -> anyhow::Result<()> {
if !diff.is_empty() {
println!("{}", diff);
}
// Show permission differences
if perms_changed {
show_permissions_diff(target_path, &dotfile.permissions);
}
} else {
println!(" [new file]");
}
@ -63,3 +72,30 @@ pub fn run(config_path: Option<PathBuf>, all: bool) -> anyhow::Result<()> {
Ok(())
}
fn check_permissions_diff(target: &Path, rules: &[PermissionRule]) -> bool {
if rules.is_empty() || !target.exists() {
return false;
}
if let Some(expected) = expected_mode_for_file(target, rules)
&& let Some(actual) = get_file_mode(target)
{
return (actual & 0o7777) != (expected & 0o7777);
}
false
}
fn show_permissions_diff(target: &Path, rules: &[PermissionRule]) {
if let Some(expected) = expected_mode_for_file(target, rules)
&& let Some(actual) = get_file_mode(target)
{
let actual_bits = actual & 0o7777;
let expected_bits = expected & 0o7777;
if actual_bits != expected_bits {
println!(
" permissions: {:#06o} -> {:#06o} (expected)",
actual_bits, expected_bits
);
}
}
}

View file

@ -22,7 +22,7 @@ pub fn run(
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
let config = Config::default();

View file

@ -81,11 +81,11 @@ const EXAMPLE_CONFIG: &str = r#"# doot.doot
# Dotfiles
dotfile:
source = "config/nvim"
target = config_path("nvim")
target = config_dir() / "nvim"
dotfile:
source = "config/kitty"
target = config_path("kitty")
target = config_dir() / "kitty"
# Platform-specific
if os == Os::MacOS:
@ -113,11 +113,11 @@ package:
const EXAMPLE_CONFIG_BODY: &str = r#"# Dotfiles
dotfile:
source = "config/nvim"
target = config_path("nvim")
target = config_dir() / "nvim"
dotfile:
source = "config/kitty"
target = config_path("kitty")
target = config_dir() / "kitty"
# Platform-specific
if os == Os::MacOS:

View file

@ -11,7 +11,7 @@ pub fn install(config_path: Option<PathBuf>) -> anyhow::Result<()> {
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
if result.packages.is_empty() {
println!("no packages configured");
@ -73,7 +73,7 @@ pub fn list(config_path: Option<PathBuf>) -> anyhow::Result<()> {
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
if result.packages.is_empty() {
println!("no packages configured");

View file

@ -1,5 +1,5 @@
use super::{find_config_file, parse_config, type_check};
use doot_core::state::StateStore;
use doot_core::state::{StateStore, SyncStatus};
use doot_lang::Evaluator;
use std::path::PathBuf;
@ -12,7 +12,7 @@ pub fn run(config_path: Option<PathBuf>) -> anyhow::Result<()> {
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
let state_file = source_dir.join(".doot-state.json");
@ -24,38 +24,52 @@ pub fn run(config_path: Option<PathBuf>) -> anyhow::Result<()> {
println!("dotfiles ({}):", result.dotfiles.len());
for dotfile in &result.dotfiles {
let target = &dotfile.target;
let status = if target.is_symlink() {
let source_path = source_dir.join(&dotfile.source);
let full_source = source_dir.join(&dotfile.source);
let sync = state.check_sync_status_with_config(
&full_source,
target,
Some(dotfile.template),
None,
Some(&dotfile.permissions),
dotfile.owner.as_deref(),
);
let (status, extra) = if target.is_symlink() {
let link_target = std::fs::read_link(target).ok();
if link_target.as_ref() == Some(&source_path) {
"ok"
if link_target.as_ref() == Some(&full_source) {
("ok", "")
} else {
"mismatch"
}
} else if target.exists() {
if state.has_changed(&source_dir.join(&dotfile.source), target) {
"modified"
} else {
"deployed"
("mismatch", "")
}
} else {
"pending"
match sync {
SyncStatus::Synced => ("deployed", ""),
SyncStatus::NotDeployed => ("pending", ""),
SyncStatus::TargetMissing => ("pending", ""),
SyncStatus::SourceMissing => ("missing", " [source missing]"),
SyncStatus::SourceChanged => ("modified", " [source changed]"),
SyncStatus::TargetChanged => ("modified", " [target changed]"),
SyncStatus::Conflict => ("modified", " [conflict]"),
SyncStatus::PermissionsChanged => ("perms", " [permissions changed]"),
}
};
let marker = match status {
"ok" => "\x1b[32m✓\x1b[0m",
"deployed" => "\x1b[32m✓\x1b[0m",
"ok" | "deployed" => "\x1b[32m✓\x1b[0m",
"pending" => "\x1b[33m○\x1b[0m",
"modified" => "\x1b[33m~\x1b[0m",
"mismatch" => "\x1b[31m✗\x1b[0m",
"perms" => "\x1b[33m⚙\x1b[0m",
"mismatch" | "missing" => "\x1b[31m✗\x1b[0m",
_ => "?",
};
println!(
" {} {} -> {}",
" {} {} -> {}{}",
marker,
dotfile.source.display(),
target.display()
target.display(),
extra,
);
if status != "ok" && status != "deployed" {

View file

@ -117,7 +117,7 @@ impl App {
type_check(&program, &source, &path.display().to_string())?;
let mut evaluator = Evaluator::new();
let result = evaluator.eval(&program)?;
let result = evaluator.eval_sync(&program)?;
let source_dir = path.parent().unwrap_or(&PathBuf::from(".")).to_path_buf();
@ -142,6 +142,7 @@ impl App {
doot_core::state::SyncStatus::SourceChanged => FileStatus::Modified,
doot_core::state::SyncStatus::TargetChanged => FileStatus::Modified,
doot_core::state::SyncStatus::Conflict => FileStatus::Modified,
doot_core::state::SyncStatus::PermissionsChanged => FileStatus::Modified,
doot_core::state::SyncStatus::NotDeployed => FileStatus::Pending,
doot_core::state::SyncStatus::TargetMissing => FileStatus::Pending,
doot_core::state::SyncStatus::SourceMissing => FileStatus::Error,

View file

@ -5,7 +5,7 @@ use std::path::PathBuf;
use tracing_subscriber::EnvFilter;
#[derive(Parser)]
#[command(name = "doot")]
#[command(name = "doot", version)]
#[command(about = "A modern dotfiles manager with a typed DSL", long_about = None)]
struct Cli {
#[command(subcommand)]
@ -57,8 +57,9 @@ enum Commands {
#[arg(short = 'n', long)]
dry_run: bool,
#[arg(short, long)]
parallel: bool,
/// Auto-uninstall packages removed from config
#[arg(long)]
prune: bool,
},
Diff {
@ -187,9 +188,7 @@ fn main() -> anyhow::Result<()> {
match cli.command {
Commands::Init { path } => commands::init::run(path),
Commands::Apply { dry_run, parallel } => {
commands::apply::run(cli.config, dry_run, parallel)
}
Commands::Apply { dry_run, prune } => commands::apply::run(cli.config, dry_run, prune),
Commands::Diff { all } => commands::diff::run(cli.config, all),
Commands::Status => commands::status::run(cli.config),
Commands::Check => commands::check::run(cli.config),

View file

@ -8,8 +8,6 @@ doot-lang.workspace = true
serde.workspace = true
serde_json.workspace = true
toml.workspace = true
smol.workspace = true
async-fs.workspace = true
age.workspace = true
walkdir.workspace = true
dirs.workspace = true
@ -22,6 +20,7 @@ anyhow.workspace = true
hostname = "0.4"
regex-lite = "0.1"
glob = "0.3"
rayon.workspace = true
minijinja = { version = "2", features = ["builtins"] }
which = "7"
tracing.workspace = true

View file

@ -24,8 +24,6 @@ pub struct Config {
pub dry_run: bool,
/// Enable verbose output.
pub verbose: bool,
/// Enable parallel operations.
pub parallel: bool,
}
impl Config {
@ -45,7 +43,6 @@ impl Config {
identity_file: config_dir.join("identity.txt"),
dry_run: false,
verbose: false,
parallel: true,
}
}
@ -100,12 +97,6 @@ impl Config {
self
}
/// Sets parallel mode.
pub fn parallel(mut self, parallel: bool) -> Self {
self.parallel = parallel;
self
}
/// Creates all required directories.
#[tracing::instrument(skip(self))]
pub fn ensure_dirs(&self) -> std::io::Result<()> {

View file

@ -9,7 +9,10 @@ use crate::state::StateStore;
use crate::state::store::DeployMode;
use doot_lang::evaluator::DotfileConfig;
use glob::Pattern;
use indicatif::ProgressBar;
use rayon::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use thiserror::Error;
pub use diff::DiffDisplay;
@ -93,10 +96,10 @@ pub struct DeployErrorInfo {
/// Handles dotfile deployment.
pub struct Deployer {
config: Config,
linker: Linker,
template_engine: TemplateEngine,
state: StateStore,
config: Arc<Config>,
linker: Arc<Linker>,
template_engine: Arc<TemplateEngine>,
state: Arc<Mutex<StateStore>>,
sandbox: bool,
}
@ -105,11 +108,12 @@ impl Deployer {
#[tracing::instrument(skip_all)]
pub fn new(config: Config, sandbox: bool) -> Self {
let state = StateStore::new(&config.state_file);
let linker = Linker::new(config.clone());
Self {
linker: Linker::new(config.clone()),
template_engine: TemplateEngine::new(),
state,
config,
linker: Arc::new(linker),
template_engine: Arc::new(TemplateEngine::new()),
state: Arc::new(Mutex::new(state)),
config: Arc::new(config),
sandbox,
}
}
@ -134,7 +138,7 @@ impl Deployer {
/// Deploys all dotfiles.
#[tracing::instrument(skip_all)]
pub fn deploy(&mut self, dotfiles: &[DotfileConfig]) -> Result<DeployResult, DeployError> {
pub fn deploy(&self, dotfiles: &[DotfileConfig]) -> Result<DeployResult, DeployError> {
let mut result = DeployResult {
deployed: Vec::new(),
skipped: Vec::new(),
@ -161,12 +165,67 @@ impl Deployer {
}
}
self.state.save()?;
self.state.lock().unwrap().save()?;
Ok(result)
}
/// Deploy dotfiles using parallel batches from the DAG.
/// Items within each batch run concurrently. Batches run sequentially.
#[tracing::instrument(skip_all)]
pub fn deploy_batches(
&self,
dotfiles: &[DotfileConfig],
batches: &[Vec<usize>],
progress: Option<&ProgressBar>,
) -> Result<DeployResult, DeployError> {
let mut result = DeployResult {
deployed: Vec::new(),
skipped: Vec::new(),
errors: Vec::new(),
};
for batch in batches {
let batch_results: Vec<_> = batch
.par_iter()
.map(|&idx| {
let dotfile = &dotfiles[idx];
let r = self
.deploy_single(dotfile)
.map_err(|e| (dotfile.clone(), e));
if let Some(pb) = progress {
pb.inc(1);
}
r
})
.collect();
for br in batch_results {
match br {
Ok(deployed) => result.deployed.push(deployed),
Err((df, DeployError::TargetExists(p))) => {
result.skipped.push(SkippedFile {
source: df.source,
target: df.target,
reason: format!("target exists: {}", p.display()),
});
}
Err((df, e)) => {
result.errors.push(DeployErrorInfo {
source: df.source,
target: df.target,
error: e.to_string(),
});
}
}
}
}
self.state.lock().unwrap().save()?;
Ok(result)
}
#[tracing::instrument(skip(self), fields(source = %dotfile.source.display(), target = %dotfile.target.display()))]
fn deploy_single(&mut self, dotfile: &DotfileConfig) -> Result<DeployedFile, DeployError> {
fn deploy_single(&self, dotfile: &DotfileConfig) -> Result<DeployedFile, DeployError> {
let source = self.config.source_dir.join(&dotfile.source);
let target = &dotfile.target;
@ -214,8 +273,14 @@ impl Deployer {
set_owner(target, owner)?;
}
self.state
.record_deployment_with_template(&source, target, deploy_mode, dotfile.template);
self.state.lock().unwrap().record_deployment_full(
&source,
target,
deploy_mode,
dotfile.template,
dotfile.permissions.clone(),
dotfile.owner.clone(),
);
Ok(DeployedFile {
source: source.clone(),
@ -226,7 +291,7 @@ impl Deployer {
#[tracing::instrument(skip(self), fields(source = %source.display(), target = %target.display()))]
fn deploy_directory(
&mut self,
&self,
dotfile: &DotfileConfig,
source: &Path,
target: &Path,
@ -234,7 +299,11 @@ impl Deployer {
) -> Result<DeployedFile, DeployError> {
use crate::state::SyncStatus;
let changed_files = self.state.get_changed_files_in_dir(source, target);
let changed_files = self
.state
.lock()
.unwrap()
.get_changed_files_in_dir(source, target);
tracing::trace!(
changed_count = changed_files.len(),
"directory file changes"
@ -252,14 +321,28 @@ impl Deployer {
let mut any_created = false;
for (src_file, tgt_file, status) in changed_files {
// Skip files that have explicit specializations
if dotfile
.exclude_paths
.iter()
.any(|ex| tgt_file.starts_with(ex) || *ex == tgt_file)
{
continue;
}
match status {
SyncStatus::NotDeployed | SyncStatus::TargetMissing | SyncStatus::SourceChanged => {
SyncStatus::NotDeployed
| SyncStatus::TargetMissing
| SyncStatus::SourceChanged
| SyncStatus::PermissionsChanged => {
// Copy from source to target
if !self.config.dry_run {
if status != SyncStatus::PermissionsChanged {
if let Some(parent) = tgt_file.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&src_file, &tgt_file)?;
}
// Apply permissions if specified
if !dotfile.permissions.is_empty() {
@ -273,14 +356,26 @@ impl Deployer {
any_updated = true;
}
self.state
.record_deployment(&src_file, &tgt_file, deploy_mode);
self.state.lock().unwrap().record_deployment_full(
&src_file,
&tgt_file,
deploy_mode,
false,
dotfile.permissions.clone(),
dotfile.owner.clone(),
);
}
SyncStatus::TargetChanged => {
// Target changed but source didn't - keep target, just update state
// This is like keeping local changes in git
self.state
.record_deployment(&src_file, &tgt_file, deploy_mode);
self.state.lock().unwrap().record_deployment_full(
&src_file,
&tgt_file,
deploy_mode,
false,
dotfile.permissions.clone(),
dotfile.owner.clone(),
);
}
SyncStatus::Conflict => {
// Real conflict - user already chose "use source" at directory level
@ -295,15 +390,21 @@ impl Deployer {
}
}
any_updated = true;
self.state
.record_deployment(&src_file, &tgt_file, deploy_mode);
self.state.lock().unwrap().record_deployment_full(
&src_file,
&tgt_file,
deploy_mode,
false,
dotfile.permissions.clone(),
dotfile.owner.clone(),
);
}
SyncStatus::SourceMissing => {
// File was deleted from source, remove from target
if !self.config.dry_run && tgt_file.exists() {
std::fs::remove_file(&tgt_file)?;
}
self.state.remove_deployment(&tgt_file);
self.state.lock().unwrap().remove_deployment(&tgt_file);
any_updated = true;
}
SyncStatus::Synced => {
@ -320,7 +421,14 @@ impl Deployer {
}
// Also record the directory-level deployment for sync status checks
self.state.record_deployment(source, target, deploy_mode);
self.state.lock().unwrap().record_deployment_full(
source,
target,
deploy_mode,
false,
dotfile.permissions.clone(),
dotfile.owner.clone(),
);
let action = if any_created && !any_updated {
DeployAction::Created

View file

@ -4,4 +4,6 @@ pub mod snapshot;
pub mod store;
pub use snapshot::Snapshot;
pub use store::{DeployMode, DeploymentRecord, StateStore, SyncStatus};
pub use store::{
DeployMode, DeploymentRecord, StateStore, SyncStatus, expected_mode_for_file, get_file_mode,
};

View file

@ -1,5 +1,6 @@
//! State persistence for doot.
use doot_lang::evaluator::PermissionRule;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
@ -44,6 +45,12 @@ pub struct DeploymentRecord {
/// Whether this file was deployed as a template (source != target content).
#[serde(default)]
pub template: bool,
/// Permission rules configured at deploy time.
#[serde(default)]
pub permissions: Vec<PermissionRule>,
/// Owner configured at deploy time (e.g. "user:group").
#[serde(default)]
pub owner: Option<String>,
}
/// Sync status after comparing current hashes with recorded state.
@ -56,6 +63,7 @@ pub enum SyncStatus {
NotDeployed,
TargetMissing,
SourceMissing,
PermissionsChanged,
}
/// Record of an installed package.
@ -96,7 +104,7 @@ impl StateStore {
/// Records a deployment with both source and target hashes.
#[tracing::instrument(skip(self), fields(source = %source.display(), target = %target.display()))]
pub fn record_deployment(&mut self, source: &Path, target: &Path, mode: DeployMode) {
self.record_deployment_with_template(source, target, mode, false);
self.record_deployment_full(source, target, mode, false, vec![], None);
}
/// Records a deployment with template flag.
@ -107,6 +115,20 @@ impl StateStore {
target: &Path,
mode: DeployMode,
template: bool,
) {
self.record_deployment_full(source, target, mode, template, vec![], None);
}
/// Records a deployment with all configuration details.
#[tracing::instrument(skip(self, permissions, owner), fields(source = %source.display(), target = %target.display()))]
pub fn record_deployment_full(
&mut self,
source: &Path,
target: &Path,
mode: DeployMode,
template: bool,
permissions: Vec<PermissionRule>,
owner: Option<String>,
) {
let source_hash = hash_path(source);
let target_hash = hash_path(target);
@ -119,6 +141,8 @@ impl StateStore {
deployed_at: chrono_now(),
mode,
template,
permissions,
owner,
};
self.state
@ -130,7 +154,7 @@ impl StateStore {
/// Checks sync status by comparing current hashes with recorded state.
#[tracing::instrument(level = "trace", skip(self))]
pub fn check_sync_status(&self, source: &Path, target: &Path) -> SyncStatus {
self.check_sync_status_with_config(source, target, None, None)
self.check_sync_status_with_config(source, target, None, None, None, None)
}
/// Checks sync status, also detecting if template flag changed in config.
@ -141,17 +165,19 @@ impl StateStore {
target: &Path,
current_template: Option<bool>,
) -> SyncStatus {
self.check_sync_status_with_config(source, target, current_template, None)
self.check_sync_status_with_config(source, target, current_template, None, None, None)
}
/// Checks sync status, also detecting if config flags changed.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "trace", skip(self, current_permissions, current_owner))]
pub fn check_sync_status_with_config(
&self,
source: &Path,
target: &Path,
current_template: Option<bool>,
current_mode: Option<DeployMode>,
current_permissions: Option<&[PermissionRule]>,
current_owner: Option<&str>,
) -> SyncStatus {
let Some(record) = self.get_deployment(target) else {
return SyncStatus::NotDeployed;
@ -203,12 +229,43 @@ impl StateStore {
let source_changed = current_source_hash != record.source_hash;
let target_changed = current_target_hash != record.target_hash;
match (source_changed, target_changed) {
let content_status = match (source_changed, target_changed) {
(false, false) => SyncStatus::Synced,
(true, false) => SyncStatus::SourceChanged,
(false, true) => SyncStatus::TargetChanged,
(true, true) => SyncStatus::Conflict,
};
// If content is not synced, report that first (more important)
if content_status != SyncStatus::Synced {
return content_status;
}
// Content is synced — check if permissions config changed
if let Some(perms) = current_permissions
&& perms != record.permissions.as_slice()
{
return SyncStatus::PermissionsChanged;
}
// Check if owner config changed
if let Some(owner) = current_owner
&& record.owner.as_deref() != Some(owner)
{
return SyncStatus::PermissionsChanged;
}
// Check actual file permissions match expected
if let Some(perms) = current_permissions
&& !perms.is_empty()
&& let Some(expected) = expected_mode_for_file(target, perms)
&& let Some(actual) = get_file_mode(target)
&& (actual & 0o7777) != (expected & 0o7777)
{
return SyncStatus::PermissionsChanged;
}
SyncStatus::Synced
}
/// Records a package installation.
@ -224,6 +281,18 @@ impl StateStore {
self.dirty = true;
}
/// Returns all recorded package records.
pub fn get_all_packages(&self) -> &HashMap<String, PackageRecord> {
&self.state.packages
}
/// Removes a package record from state.
#[tracing::instrument(skip(self))]
pub fn remove_package(&mut self, name: &str) {
self.state.packages.remove(name);
self.dirty = true;
}
/// Gets a deployment record by target path.
pub fn get_deployment(&self, target: &Path) -> Option<&DeploymentRecord> {
self.state.deployments.get(&target.display().to_string())
@ -432,3 +501,33 @@ fn chrono_now() -> String {
.as_secs();
format!("{}", secs)
}
/// Returns the file mode bits (unix only).
#[cfg(unix)]
pub fn get_file_mode(path: &Path) -> Option<u32> {
use std::os::unix::fs::PermissionsExt;
std::fs::metadata(path).ok().map(|m| m.permissions().mode())
}
#[cfg(not(unix))]
pub fn get_file_mode(_path: &Path) -> Option<u32> {
None
}
/// Resolves which permission rule applies to a file and returns the expected mode.
pub fn expected_mode_for_file(path: &Path, rules: &[PermissionRule]) -> Option<u32> {
for rule in rules {
match rule {
PermissionRule::Single(mode) => return Some(*mode),
PermissionRule::Pattern { pattern, mode } => {
if let Ok(p) = glob::Pattern::new(pattern) {
let name = path.file_name().unwrap_or_default().to_string_lossy();
if p.matches(&name) {
return Some(*mode);
}
}
}
}
}
None
}

View file

@ -10,8 +10,8 @@ serde.workspace = true
serde_json.workspace = true
toml.workspace = true
smol.workspace = true
async-fs.workspace = true
async-net.workspace = true
async-recursion.workspace = true
futures-lite.workspace = true
surf.workspace = true
rayon.workspace = true
walkdir.workspace = true

View file

@ -1,19 +1,57 @@
use crate::evaluator::{EvalError, Value};
use crate::evaluator::{AsyncValue, EvalError, Value};
#[tracing::instrument(level = "trace", skip_all)]
pub fn all(args: &[Value]) -> Result<Value, EvalError> {
Ok(Value::List(args.to_vec()))
pub async fn all(args: &[Value]) -> Result<Value, EvalError> {
let mut results = Vec::with_capacity(args.len());
for arg in args {
match arg {
Value::Future(av) => {
let task =
av.0.lock()
.map_err(|e| EvalError::AsyncError(e.to_string()))?
.take()
.ok_or_else(|| EvalError::AsyncError("future already consumed".into()))?;
results.push(task.await?);
}
other => results.push(other.clone()),
}
}
Ok(Value::List(results))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn race(args: &[Value]) -> Result<Value, EvalError> {
Ok(args.first().cloned().unwrap_or(Value::None))
pub async fn race(args: &[Value]) -> Result<Value, EvalError> {
let mut tasks = Vec::new();
for arg in args {
match arg {
Value::Future(av) => {
let task =
av.0.lock()
.map_err(|e| EvalError::AsyncError(e.to_string()))?
.take()
.ok_or_else(|| EvalError::AsyncError("future already consumed".into()))?;
tasks.push(task);
}
other => return Ok(other.clone()), // Non-future wins immediately
}
}
match tasks.len() {
0 => Ok(Value::None),
1 => tasks.remove(0).await,
_ => {
let mut combined = tasks.remove(0);
for t in tasks {
combined = smol::spawn(futures_lite::future::race(combined, t));
}
combined.await
}
}
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn fetch(args: &[Value]) -> Result<Value, EvalError> {
pub async fn fetch(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"fetch expects a URL string".to_string(),
@ -21,8 +59,8 @@ pub fn fetch(args: &[Value]) -> Result<Value, EvalError> {
}
};
smol::block_on(async {
let mut response = surf::get(url)
let task = smol::spawn(async move {
let mut response = surf::get(&url)
.await
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
@ -32,13 +70,14 @@ pub fn fetch(args: &[Value]) -> Result<Value, EvalError> {
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
Ok(Value::Str(body))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn fetch_json(args: &[Value]) -> Result<Value, EvalError> {
pub async fn fetch_json(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"fetch_json expects a URL string".to_string(),
@ -46,8 +85,8 @@ pub fn fetch_json(args: &[Value]) -> Result<Value, EvalError> {
}
};
smol::block_on(async {
let mut response = surf::get(url)
let task = smol::spawn(async move {
let mut response = surf::get(&url)
.await
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
@ -57,13 +96,14 @@ pub fn fetch_json(args: &[Value]) -> Result<Value, EvalError> {
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
Ok(json_to_value(&json))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn fetch_bytes(args: &[Value]) -> Result<Value, EvalError> {
pub async fn fetch_bytes(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"fetch_bytes expects a URL string".to_string(),
@ -71,8 +111,8 @@ pub fn fetch_bytes(args: &[Value]) -> Result<Value, EvalError> {
}
};
smol::block_on(async {
let mut response = surf::get(url)
let task = smol::spawn(async move {
let mut response = surf::get(&url)
.await
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
@ -83,13 +123,14 @@ pub fn fetch_bytes(args: &[Value]) -> Result<Value, EvalError> {
let values: Vec<Value> = bytes.iter().map(|b| Value::Int(*b as i64)).collect();
Ok(Value::List(values))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn post(args: &[Value]) -> Result<Value, EvalError> {
pub async fn post(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"post expects a URL string".to_string(),
@ -102,8 +143,8 @@ pub fn post(args: &[Value]) -> Result<Value, EvalError> {
_ => String::new(),
};
smol::block_on(async {
let mut response = surf::post(url)
let task = smol::spawn(async move {
let mut response = surf::post(&url)
.body(body)
.await
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
@ -114,13 +155,14 @@ pub fn post(args: &[Value]) -> Result<Value, EvalError> {
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
Ok(Value::Str(result))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn post_json(args: &[Value]) -> Result<Value, EvalError> {
pub async fn post_json(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"post_json expects a URL string".to_string(),
@ -131,8 +173,8 @@ pub fn post_json(args: &[Value]) -> Result<Value, EvalError> {
let data = args.get(1).unwrap_or(&Value::None);
let json = value_to_json(data);
smol::block_on(async {
let mut response = surf::post(url)
let task = smol::spawn(async move {
let mut response = surf::post(&url)
.body_json(&json)
.map_err(|e| EvalError::AsyncError(e.to_string()))?
.await
@ -144,13 +186,14 @@ pub fn post_json(args: &[Value]) -> Result<Value, EvalError> {
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
Ok(json_to_value(&result))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn download(args: &[Value]) -> Result<Value, EvalError> {
pub async fn download(args: &[Value]) -> Result<Value, EvalError> {
let url = match args.first() {
Some(Value::Str(s)) => s,
Some(Value::Str(s)) => s.clone(),
_ => {
return Err(EvalError::TypeError(
"download expects a URL string".to_string(),
@ -168,8 +211,8 @@ pub fn download(args: &[Value]) -> Result<Value, EvalError> {
}
};
smol::block_on(async {
let mut response = surf::get(url)
let task = smol::spawn(async move {
let mut response = surf::get(&url)
.await
.map_err(|e| EvalError::AsyncError(e.to_string()))?;
@ -180,7 +223,8 @@ pub fn download(args: &[Value]) -> Result<Value, EvalError> {
std::fs::write(&path, bytes)?;
Ok(Value::Bool(true))
})
});
Ok(Value::Future(AsyncValue::new(task)))
}
fn json_to_value(json: &serde_json::Value) -> Value {

View file

@ -1,8 +1,14 @@
use crate::ast::Expr;
use crate::evaluator::{EvalError, Evaluator, Value};
use async_recursion::async_recursion;
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn map(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<Value, EvalError> {
pub async fn map(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = match args.first() {
Some(Value::List(items)) => items.clone(),
Some(v) => {
@ -27,7 +33,7 @@ pub fn map(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item);
}
let result = eval.eval_in_env(body, local_env)?;
let result = eval.eval_in_env(body, local_env).await?;
results.push(result);
}
Ok(Value::List(results))
@ -35,7 +41,7 @@ pub fn map(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<
Some(Value::Function(func, func_env)) => {
let mut results = Vec::new();
for item in list {
let result = eval.call_fn(func, func_env, &[item])?;
let result = eval.call_fn(func, func_env, &[item]).await?;
results.push(result);
}
Ok(Value::List(results))
@ -44,8 +50,9 @@ pub fn map(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<
}
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn filter(
pub async fn filter(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
@ -74,7 +81,7 @@ pub fn filter(
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item.clone());
}
let result = eval.eval_in_env(body, local_env)?;
let result = eval.eval_in_env(body, local_env).await?;
if result.is_truthy() {
results.push(item);
}
@ -84,7 +91,9 @@ pub fn filter(
Some(Value::Function(func, func_env)) => {
let mut results = Vec::new();
for item in list {
let result = eval.call_fn(func, func_env, std::slice::from_ref(&item))?;
let result = eval
.call_fn(func, func_env, std::slice::from_ref(&item))
.await?;
if result.is_truthy() {
results.push(item);
}
@ -97,8 +106,13 @@ pub fn filter(
}
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn fold(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<Value, EvalError> {
pub async fn fold(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = match args.first() {
Some(Value::List(items)) => items.clone(),
Some(v) => {
@ -128,14 +142,14 @@ pub fn fold(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result
if let Some(item_param) = params.get(1) {
local_env.define(item_param.name.clone(), item);
}
acc = eval.eval_in_env(body, local_env)?;
acc = eval.eval_in_env(body, local_env).await?;
}
Ok(acc)
}
Some(Value::Function(func, func_env)) => {
let mut acc = init;
for item in list {
acc = eval.call_fn(func, func_env, &[acc, item])?;
acc = eval.call_fn(func, func_env, &[acc, item]).await?;
}
Ok(acc)
}
@ -298,8 +312,9 @@ pub fn sort(args: &[Value]) -> Result<Value, EvalError> {
Ok(Value::List(sortable.into_iter().map(|(v, _)| v).collect()))
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn sort_by(
pub async fn sort_by(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
@ -318,7 +333,7 @@ pub fn sort_by(
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item.clone());
}
let key = eval.eval_in_env(body, local_env)?;
let key = eval.eval_in_env(body, local_env).await?;
keyed.push((item, key.to_string_repr()));
}
keyed.sort_by(|a, b| a.1.cmp(&b.1));
@ -342,8 +357,13 @@ pub fn reverse(args: &[Value]) -> Result<Value, EvalError> {
Ok(Value::List(reversed))
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn seq(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<Value, EvalError> {
pub async fn seq(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = match args.first() {
Some(Value::List(items)) => items.clone(),
_ => return Err(EvalError::TypeError("seq expects a list".to_string())),
@ -358,7 +378,7 @@ pub fn seq(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item);
}
let result = eval.eval_in_env(body, local_env)?;
let result = eval.eval_in_env(body, local_env).await?;
results.push(result);
}
Ok(Value::List(results))
@ -367,8 +387,9 @@ pub fn seq(eval: &mut Evaluator, args: &[Value], _arg_exprs: &[Expr]) -> Result<
}
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn batch(
pub async fn batch(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
@ -397,7 +418,7 @@ pub fn batch(
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item.clone());
}
let result = eval.eval_in_env(body, local_env)?;
let result = eval.eval_in_env(body, local_env).await?;
results.push(result);
}
}

View file

@ -85,26 +85,6 @@ pub fn list_dir(args: &[Value]) -> Result<Value, EvalError> {
Ok(Value::List(entries))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn glob_files(args: &[Value]) -> Result<Value, EvalError> {
let pattern = match args.first() {
Some(Value::Str(s)) => s,
_ => {
return Err(EvalError::TypeError(
"glob expects a pattern string".to_string(),
));
}
};
let entries: Vec<Value> = glob::glob(pattern)
.map_err(|e| EvalError::TypeError(e.to_string()))?
.filter_map(|e| e.ok())
.map(Value::Path)
.collect();
Ok(Value::List(entries))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn walk_dir(args: &[Value]) -> Result<Value, EvalError> {
let path = get_path(args)?;
@ -203,20 +183,6 @@ pub fn config_dir() -> Result<Value, EvalError> {
Ok(Value::Path(dirs::config_dir().unwrap_or_default()))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn config_path(args: &[Value]) -> Result<Value, EvalError> {
let app = match args.first() {
Some(Value::Str(s)) => s,
_ => {
return Err(EvalError::TypeError(
"config_path expects an app name string".to_string(),
));
}
};
let config = dirs::config_dir().unwrap_or_default();
Ok(Value::Path(config.join(app)))
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn data_dir() -> Result<Value, EvalError> {
Ok(Value::Path(dirs::data_dir().unwrap_or_default()))

View file

@ -4,24 +4,32 @@ pub mod async_ops;
pub mod collections;
pub mod crypto;
pub mod io;
pub mod parallel;
pub mod strings;
use crate::ast::Expr;
use crate::evaluator::{EvalError, Evaluator, Value};
use async_recursion::async_recursion;
/// Dispatches a built-in function call.
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all, fields(name))]
pub fn call_builtin(
pub async fn call_builtin(
eval: &mut Evaluator,
name: &str,
args: &[Value],
arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
match name {
// Collections
"map" => collections::map(eval, args, arg_exprs),
"filter" => collections::filter(eval, args, arg_exprs),
"fold" => collections::fold(eval, args, arg_exprs),
// Collections (async - take &mut Evaluator)
"map" => collections::map(eval, args, arg_exprs).await,
"filter" => collections::filter(eval, args, arg_exprs).await,
"fold" => collections::fold(eval, args, arg_exprs).await,
"sort_by" => collections::sort_by(eval, args, arg_exprs).await,
"seq" => collections::seq(eval, args, arg_exprs).await,
"batch" => collections::batch(eval, args, arg_exprs).await,
// Collections (sync)
"flatten" => collections::flatten(args),
"concat" => collections::concat(args),
"zip" => collections::zip(args),
@ -32,10 +40,7 @@ pub fn call_builtin(
"contains" => collections::contains(args),
"unique" => collections::unique(args),
"sort" => collections::sort(args),
"sort_by" => collections::sort_by(eval, args, arg_exprs),
"reverse" => collections::reverse(args),
"seq" => collections::seq(eval, args, arg_exprs),
"batch" => collections::batch(eval, args, arg_exprs),
// Strings
"join" => strings::join(args),
@ -64,7 +69,6 @@ pub fn call_builtin(
"dir_exists" => io::dir_exists(args),
"create_dir_all" => io::create_dir_all(args),
"list_dir" => io::list_dir(args),
"glob" => io::glob_files(args),
"walk_dir" => io::walk_dir(args),
"temp_dir" => io::temp_dir(),
"temp_file" => io::temp_file(args),
@ -78,7 +82,6 @@ pub fn call_builtin(
"path_extension" => io::path_extension(args),
"home" => io::home(),
"config_dir" => io::config_dir(),
"config_path" => io::config_path(args),
"data_dir" => io::data_dir(),
"cache_dir" => io::cache_dir(),
@ -102,17 +105,32 @@ pub fn call_builtin(
"encrypt_age" => crypto::encrypt_age(args),
"decrypt_age" => crypto::decrypt_age(args),
// Parallel (rayon)
"par_map" => parallel::par_map(eval, args, arg_exprs),
"par_filter" => parallel::par_filter(eval, args, arg_exprs),
"par_sort_by" => parallel::par_sort_by(eval, args, arg_exprs),
"par_batch" => parallel::par_batch(eval, args, arg_exprs),
"par_flat_map" => parallel::par_flat_map(eval, args, arg_exprs),
"par_any" => parallel::par_any(eval, args, arg_exprs),
"par_all" => parallel::par_all(eval, args, arg_exprs),
"par_find" => parallel::par_find(eval, args, arg_exprs),
"par_partition" => parallel::par_partition(eval, args, arg_exprs),
"par_reduce" => parallel::par_reduce(eval, args, arg_exprs),
"par_min_by" => parallel::par_min_by(eval, args, arg_exprs),
"par_max_by" => parallel::par_max_by(eval, args, arg_exprs),
"par_for_each" => parallel::par_for_each(eval, args, arg_exprs),
// Async
"all" => async_ops::all(args),
"race" => async_ops::race(args),
"all" => async_ops::all(args).await,
"race" => async_ops::race(args).await,
// Network
"fetch" => async_ops::fetch(args),
"fetch_json" => async_ops::fetch_json(args),
"fetch_bytes" => async_ops::fetch_bytes(args),
"post" => async_ops::post(args),
"post_json" => async_ops::post_json(args),
"download" => async_ops::download(args),
"fetch" => async_ops::fetch(args).await,
"fetch_json" => async_ops::fetch_json(args).await,
"fetch_bytes" => async_ops::fetch_bytes(args).await,
"post" => async_ops::post(args).await,
"post_json" => async_ops::post_json(args).await,
"download" => async_ops::download(args).await,
// Environment
"env" => env_get(args),
@ -127,8 +145,9 @@ pub fn call_builtin(
}
/// Dispatches a method call on a value.
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all, fields(method))]
pub fn call_method(
pub async fn call_method(
eval: &mut Evaluator,
obj: &Value,
method: &str,
@ -151,19 +170,19 @@ pub fn call_method(
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
collections::map(eval, &all_args, arg_exprs)
collections::map(eval, &all_args, arg_exprs).await
}
"filter" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
collections::filter(eval, &all_args, arg_exprs)
collections::filter(eval, &all_args, arg_exprs).await
}
"fold" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
collections::fold(eval, &all_args, arg_exprs)
collections::fold(eval, &all_args, arg_exprs).await
}
"join" => {
let sep = args
@ -197,6 +216,84 @@ pub fn call_method(
.collect::<Vec<_>>();
collections::unique(&all_args)
}
"par_map" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_map(eval, &all_args, arg_exprs)
}
"par_filter" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_filter(eval, &all_args, arg_exprs)
}
"par_flat_map" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_flat_map(eval, &all_args, arg_exprs)
}
"par_sort_by" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_sort_by(eval, &all_args, arg_exprs)
}
"par_any" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_any(eval, &all_args, arg_exprs)
}
"par_all" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_all(eval, &all_args, arg_exprs)
}
"par_find" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_find(eval, &all_args, arg_exprs)
}
"par_partition" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_partition(eval, &all_args, arg_exprs)
}
"par_reduce" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_reduce(eval, &all_args, arg_exprs)
}
"par_min_by" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_min_by(eval, &all_args, arg_exprs)
}
"par_max_by" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_max_by(eval, &all_args, arg_exprs)
}
"par_batch" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_batch(eval, &all_args, arg_exprs)
}
"par_for_each" => {
let all_args = std::iter::once(obj.clone())
.chain(args.iter().cloned())
.collect::<Vec<_>>();
parallel::par_for_each(eval, &all_args, arg_exprs)
}
_ => Err(EvalError::UndefinedFunction(format!("list.{}", method))),
},
@ -284,14 +381,14 @@ pub fn call_method(
let mut method_args = vec![obj.clone()];
method_args.extend(args.iter().cloned());
let env_clone = eval.env().clone();
return eval.call_function(m, &env_clone, &method_args);
return eval.call_function(m, &env_clone, &method_args).await;
}
}
}
if let Some(field) = fields.get(method)
&& let Value::Function(func, env) = field
{
return eval.call_function(func, env, args);
return eval.call_function(func, env, args).await;
}
Err(EvalError::FieldNotFound {
ty: name.clone(),

View file

@ -0,0 +1,569 @@
//! Parallel collection builtins using rayon.
//!
//! Each function clones the Evaluator per rayon task. Side effects (env mutations)
//! inside parallel callbacks are isolated per clone and lost after execution.
//! I/O side effects (file writes, exec, etc.) still happen.
use crate::ast::Expr;
use crate::evaluator::{EvalError, Evaluator, Value};
use rayon::prelude::*;
/// Helper: evaluate a lambda body with one parameter bound, using a cloned evaluator.
fn eval_lambda_sync(
eval: &Evaluator,
params: &[crate::ast::FnParam],
body: &Expr,
env: &crate::evaluator::Env,
item: Value,
) -> Result<Value, EvalError> {
let mut local_eval = eval.clone();
let mut local_env = env.clone();
local_env.push_scope();
if let Some(param) = params.first() {
local_env.define(param.name.clone(), item);
}
smol::block_on(local_eval.eval_in_env(body, local_env))
}
/// Helper: call a named function with args, using a cloned evaluator.
fn call_fn_sync(
eval: &Evaluator,
func: &crate::ast::FnDecl,
func_env: &crate::evaluator::Env,
args: &[Value],
) -> Result<Value, EvalError> {
let mut local_eval = eval.clone();
smol::block_on(local_eval.call_fn(func, func_env, args))
}
/// Extract a list from the first argument, or return a TypeError.
fn extract_list(args: &[Value], fn_name: &str) -> Result<Vec<Value>, EvalError> {
match args.first() {
Some(Value::List(items)) => Ok(items.clone()),
Some(v) => Err(EvalError::TypeError(format!(
"{} expects list, got {}",
fn_name,
v.type_name()
))),
None => Err(EvalError::TypeError(format!(
"{} requires a list argument",
fn_name
))),
}
}
// ---------------------------------------------------------------------------
// par_map
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_map(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_map")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
let results: Result<Vec<Value>, EvalError> = list
.into_par_iter()
.map(|item| eval_lambda_sync(eval, params, body, env, item))
.collect();
Ok(Value::List(results?))
}
Some(Value::Function(func, func_env)) => {
let results: Result<Vec<Value>, EvalError> = list
.into_par_iter()
.map(|item| call_fn_sync(eval, func, func_env, &[item]))
.collect();
Ok(Value::List(results?))
}
_ => Err(EvalError::TypeError(
"par_map requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_filter
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_filter(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_filter")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
let results: Result<Vec<Value>, EvalError> = list
.into_par_iter()
.map(|item| {
let keep = eval_lambda_sync(eval, params, body, env, item.clone())?;
Ok((item, keep.is_truthy()))
})
.collect::<Result<Vec<_>, _>>()
.map(|pairs| {
pairs
.into_iter()
.filter(|(_, keep)| *keep)
.map(|(v, _)| v)
.collect()
});
Ok(Value::List(results?))
}
Some(Value::Function(func, func_env)) => {
let results: Result<Vec<Value>, EvalError> = list
.into_par_iter()
.map(|item| {
let keep = call_fn_sync(eval, func, func_env, std::slice::from_ref(&item))?;
Ok((item, keep.is_truthy()))
})
.collect::<Result<Vec<_>, _>>()
.map(|pairs| {
pairs
.into_iter()
.filter(|(_, keep)| *keep)
.map(|(v, _)| v)
.collect()
});
Ok(Value::List(results?))
}
_ => Err(EvalError::TypeError(
"par_filter requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_sort_by
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_sort_by(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_sort_by")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// Compute sort keys in parallel
let keyed: Result<Vec<(Value, String)>, EvalError> = list
.into_par_iter()
.map(|item| {
let key = eval_lambda_sync(eval, params, body, env, item.clone())?;
Ok((item, key.to_string_repr()))
})
.collect();
let mut keyed = keyed?;
// Sort sequentially (fast, already have keys)
keyed.sort_by(|a, b| a.1.cmp(&b.1));
Ok(Value::List(keyed.into_iter().map(|(v, _)| v).collect()))
}
_ => Err(EvalError::TypeError(
"par_sort_by requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_batch
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_batch(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_batch")?;
let batch_size = match args.get(1) {
Some(Value::Int(n)) => *n as usize,
_ => {
return Err(EvalError::TypeError(
"par_batch requires batch size".to_string(),
));
}
};
match args.get(2) {
Some(Value::Lambda(params, body, env)) => {
let mut all_results = Vec::new();
// Process chunks sequentially, items within each chunk in parallel
for chunk in list.chunks(batch_size) {
let chunk_results: Result<Vec<Value>, EvalError> = chunk
.into_par_iter()
.map(|item| eval_lambda_sync(eval, params, body, env, item.clone()))
.collect();
all_results.extend(chunk_results?);
}
Ok(Value::List(all_results))
}
_ => Err(EvalError::TypeError(
"par_batch requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_flat_map
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_flat_map(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_flat_map")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
let results: Result<Vec<Vec<Value>>, EvalError> = list
.into_par_iter()
.map(|item| {
let val = eval_lambda_sync(eval, params, body, env, item)?;
match val {
Value::List(inner) => Ok(inner),
v => Ok(vec![v]),
}
})
.collect();
Ok(Value::List(results?.into_iter().flatten().collect()))
}
Some(Value::Function(func, func_env)) => {
let results: Result<Vec<Vec<Value>>, EvalError> = list
.into_par_iter()
.map(|item| {
let val = call_fn_sync(eval, func, func_env, &[item])?;
match val {
Value::List(inner) => Ok(inner),
v => Ok(vec![v]),
}
})
.collect();
Ok(Value::List(results?.into_iter().flatten().collect()))
}
_ => Err(EvalError::TypeError(
"par_flat_map requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_any
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_any(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_any")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// Use find_any for early exit on first match
let found = list.into_par_iter().find_any(|item| {
eval_lambda_sync(eval, params, body, env, item.clone())
.map(|v| v.is_truthy())
.unwrap_or(false)
});
Ok(Value::Bool(found.is_some()))
}
Some(Value::Function(func, func_env)) => {
let found = list.into_par_iter().find_any(|item| {
call_fn_sync(eval, func, func_env, std::slice::from_ref(item))
.map(|v| v.is_truthy())
.unwrap_or(false)
});
Ok(Value::Bool(found.is_some()))
}
_ => Err(EvalError::TypeError(
"par_any requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_all
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_all(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_all")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// find_any that does NOT match = early exit on first failure
let failed = list.into_par_iter().find_any(|item| {
eval_lambda_sync(eval, params, body, env, item.clone())
.map(|v| !v.is_truthy())
.unwrap_or(true) // error counts as failure
});
Ok(Value::Bool(failed.is_none()))
}
Some(Value::Function(func, func_env)) => {
let failed = list.into_par_iter().find_any(|item| {
call_fn_sync(eval, func, func_env, std::slice::from_ref(item))
.map(|v| !v.is_truthy())
.unwrap_or(true)
});
Ok(Value::Bool(failed.is_none()))
}
_ => Err(EvalError::TypeError(
"par_all requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_find
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_find(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_find")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
let found = list.into_par_iter().find_first(|item| {
eval_lambda_sync(eval, params, body, env, item.clone())
.map(|v| v.is_truthy())
.unwrap_or(false)
});
Ok(found.unwrap_or(Value::None))
}
Some(Value::Function(func, func_env)) => {
let found = list.into_par_iter().find_first(|item| {
call_fn_sync(eval, func, func_env, std::slice::from_ref(item))
.map(|v| v.is_truthy())
.unwrap_or(false)
});
Ok(found.unwrap_or(Value::None))
}
_ => Err(EvalError::TypeError(
"par_find requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_partition
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_partition(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_partition")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
let (matches, rest): (Vec<Value>, Vec<Value>) = list
.into_par_iter()
.map(|item| {
let keep = eval_lambda_sync(eval, params, body, env, item.clone())
.map(|v| v.is_truthy())
.unwrap_or(false);
(item, keep)
})
.partition_map(|(item, keep)| {
if keep {
rayon::iter::Either::Left(item)
} else {
rayon::iter::Either::Right(item)
}
});
Ok(Value::List(vec![Value::List(matches), Value::List(rest)]))
}
Some(Value::Function(func, func_env)) => {
let (matches, rest): (Vec<Value>, Vec<Value>) = list
.into_par_iter()
.map(|item| {
let keep = call_fn_sync(eval, func, func_env, std::slice::from_ref(&item))
.map(|v| v.is_truthy())
.unwrap_or(false);
(item, keep)
})
.partition_map(|(item, keep)| {
if keep {
rayon::iter::Either::Left(item)
} else {
rayon::iter::Either::Right(item)
}
});
Ok(Value::List(vec![Value::List(matches), Value::List(rest)]))
}
_ => Err(EvalError::TypeError(
"par_partition requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_reduce
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_reduce(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_reduce")?;
if list.is_empty() {
return Ok(Value::None);
}
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// rayon reduce requires the op to be associative.
// We evaluate the lambda with (acc, item) params in parallel.
let result = list.into_par_iter().reduce_with(|acc, item| {
let mut local_eval = eval.clone();
let mut local_env = env.clone();
local_env.push_scope();
if let Some(acc_param) = params.first() {
local_env.define(acc_param.name.clone(), acc);
}
if let Some(item_param) = params.get(1) {
local_env.define(item_param.name.clone(), item);
}
smol::block_on(local_eval.eval_in_env(body, local_env)).unwrap_or(Value::None)
});
Ok(result.unwrap_or(Value::None))
}
Some(Value::Function(func, func_env)) => {
let result = list.into_par_iter().reduce_with(|acc, item| {
call_fn_sync(eval, func, func_env, &[acc, item]).unwrap_or(Value::None)
});
Ok(result.unwrap_or(Value::None))
}
_ => Err(EvalError::TypeError(
"par_reduce requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_min_by
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_min_by(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_min_by")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// Compute keys in parallel, then find min
let keyed: Result<Vec<(Value, String)>, EvalError> = list
.into_par_iter()
.map(|item| {
let key = eval_lambda_sync(eval, params, body, env, item.clone())?;
Ok((item, key.to_string_repr()))
})
.collect();
let keyed = keyed?;
let min = keyed.into_iter().min_by(|a, b| a.1.cmp(&b.1));
Ok(min.map(|(v, _)| v).unwrap_or(Value::None))
}
_ => Err(EvalError::TypeError(
"par_min_by requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_max_by
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_max_by(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_max_by")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// Compute keys in parallel, then find max
let keyed: Result<Vec<(Value, String)>, EvalError> = list
.into_par_iter()
.map(|item| {
let key = eval_lambda_sync(eval, params, body, env, item.clone())?;
Ok((item, key.to_string_repr()))
})
.collect();
let keyed = keyed?;
let max = keyed.into_iter().max_by(|a, b| a.1.cmp(&b.1));
Ok(max.map(|(v, _)| v).unwrap_or(Value::None))
}
_ => Err(EvalError::TypeError(
"par_max_by requires a function".to_string(),
)),
}
}
// ---------------------------------------------------------------------------
// par_for_each
// ---------------------------------------------------------------------------
#[tracing::instrument(level = "trace", skip_all)]
pub fn par_for_each(
eval: &mut Evaluator,
args: &[Value],
_arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
let list = extract_list(args, "par_for_each")?;
match args.get(1) {
Some(Value::Lambda(params, body, env)) => {
// Collect errors from parallel execution
let errors: Vec<EvalError> = list
.into_par_iter()
.filter_map(|item| eval_lambda_sync(eval, params, body, env, item).err())
.collect();
if let Some(err) = errors.into_iter().next() {
return Err(err);
}
Ok(Value::None)
}
Some(Value::Function(func, func_env)) => {
let errors: Vec<EvalError> = list
.into_par_iter()
.filter_map(|item| call_fn_sync(eval, func, func_env, &[item]).err())
.collect();
if let Some(err) = errors.into_iter().next() {
return Err(err);
}
Ok(Value::None)
}
_ => Err(EvalError::TypeError(
"par_for_each requires a function".to_string(),
)),
}
}

View file

@ -2,9 +2,11 @@
use crate::ast::*;
use crate::builtins;
use async_recursion::async_recursion;
use indexmap::IndexMap;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use thiserror::Error;
@ -70,6 +72,18 @@ pub enum EvalError {
AsyncError(String),
}
/// Wrapper for an async task result. Cloning shares the same task handle.
/// The inner Option is taken on first await; subsequent awaits return an error.
#[derive(Clone, Debug)]
#[allow(clippy::type_complexity)]
pub struct AsyncValue(pub Arc<std::sync::Mutex<Option<smol::Task<Result<Value, EvalError>>>>>);
impl AsyncValue {
pub fn new(task: smol::Task<Result<Value, EvalError>>) -> Self {
Self(Arc::new(std::sync::Mutex::new(Some(task))))
}
}
/// Runtime value types.
#[derive(Clone, Debug)]
pub enum Value {
@ -83,6 +97,7 @@ pub enum Value {
Enum(String, String),
Function(FnDecl, Env),
Lambda(Vec<FnParam>, Expr, Env),
Future(AsyncValue),
None,
}
@ -100,6 +115,7 @@ impl Value {
Value::Enum(_, _) => "enum",
Value::Function(_, _) => "function",
Value::Lambda(_, _, _) => "lambda",
Value::Future(_) => "future",
Value::None => "none",
}
}
@ -134,6 +150,7 @@ impl Value {
.collect::<Vec<_>>()
.join(":")
}
Value::Future(_) => "<future>".to_string(),
Value::None => String::new(),
_ => self.to_string_repr(),
}
@ -160,6 +177,7 @@ impl Value {
Value::Enum(ty, variant) => format!("{}::{}", ty, variant),
Value::Function(f, _) => format!("<fn {}>", f.name),
Value::Lambda(_, _, _) => "<lambda>".to_string(),
Value::Future(_) => "<future>".to_string(),
Value::None => "none".to_string(),
}
}
@ -262,12 +280,34 @@ pub enum DeployMode {
}
/// Permission rule for deployed files.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub enum PermissionRule {
Single(u32),
Pattern { pattern: String, mode: u32 },
}
/// Source for a dotfiles glob block.
#[derive(Debug, Clone)]
pub enum DotfilesSource {
/// Glob pattern string to expand later (e.g. "config/*").
Pattern(String),
/// Pre-expanded list of paths (e.g. from glob() function call).
Paths(Vec<PathBuf>),
}
/// Unexpanded dotfiles pattern from a `dotfiles:` block.
#[derive(Debug, Clone)]
pub struct DotfilesPattern {
pub source: DotfilesSource,
pub target_base: PathBuf,
pub template: bool,
pub permissions: Vec<PermissionRule>,
pub owner: Option<String>,
pub deploy: DeployMode,
pub link_patterns: Vec<String>,
pub copy_patterns: Vec<String>,
}
/// Evaluated dotfile configuration.
#[derive(Clone, Debug)]
pub struct DotfileConfig {
@ -279,6 +319,8 @@ pub struct DotfileConfig {
pub deploy: DeployMode,
pub link_patterns: Vec<String>,
pub copy_patterns: Vec<String>,
/// Files to skip during directory deploy (specialized by explicit dotfile blocks).
pub exclude_paths: Vec<PathBuf>,
}
/// Evaluated package configuration.
@ -307,8 +349,10 @@ pub struct HookConfig {
}
/// Result of evaluating a doot program.
#[derive(Clone)]
pub struct EvalResult {
pub dotfiles: Vec<DotfileConfig>,
pub dotfile_patterns: Vec<DotfilesPattern>,
pub packages: Vec<PackageConfig>,
pub secrets: Vec<SecretConfig>,
pub hooks: Vec<HookConfig>,
@ -319,6 +363,7 @@ impl Default for EvalResult {
fn default() -> Self {
Self {
dotfiles: Vec::new(),
dotfile_patterns: Vec::new(),
packages: Vec::new(),
secrets: Vec::new(),
hooks: Vec::new(),
@ -328,6 +373,7 @@ impl Default for EvalResult {
}
/// Evaluates doot AST and collects configuration.
#[derive(Clone)]
pub struct Evaluator {
env: Env,
result: EvalResult,
@ -390,13 +436,18 @@ impl Evaluator {
/// Evaluates the program and returns collected configuration.
#[tracing::instrument(level = "trace", skip_all)]
pub fn eval(&mut self, program: &Program) -> Result<EvalResult, EvalError> {
pub async fn eval(&mut self, program: &Program) -> Result<EvalResult, EvalError> {
for stmt in &program.statements {
self.eval_statement(&stmt.node)?;
self.eval_statement(&stmt.node).await?;
}
Ok(std::mem::take(&mut self.result))
}
/// Synchronous entry point. Runs the async evaluator on smol's executor.
pub fn eval_sync(&mut self, program: &Program) -> Result<EvalResult, EvalError> {
smol::block_on(self.eval(program))
}
/// Returns all variables as environment variables for hooks.
#[tracing::instrument(level = "trace", skip(self))]
pub fn get_hook_env(&self) -> std::collections::HashMap<String, String> {
@ -421,12 +472,13 @@ impl Evaluator {
vars
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
fn eval_statement(&mut self, stmt: &Statement) -> Result<Option<Value>, EvalError> {
async fn eval_statement(&mut self, stmt: &Statement) -> Result<Option<Value>, EvalError> {
match stmt {
Statement::VarDecl(decl) => {
tracing::trace!(name = %decl.name, "eval var declaration");
let value = self.eval_expr(&decl.value)?;
let value = self.eval_expr(&decl.value).await?;
// Handle special config variables
if decl.name == "sandbox"
@ -465,11 +517,11 @@ impl Evaluator {
if let Some(macro_decl) = self.env.get_macro(&call.name).cloned() {
self.env.push_scope();
for (param, arg) in macro_decl.params.iter().zip(call.args.iter()) {
let value = self.eval_expr(arg)?;
let value = self.eval_expr(arg).await?;
self.env.define(param.clone(), value);
}
for body_stmt in &macro_decl.body {
self.eval_statement(&body_stmt.node)?;
self.eval_statement(&body_stmt.node).await?;
}
self.env.pop_scope();
}
@ -477,7 +529,7 @@ impl Evaluator {
}
Statement::ForLoop(for_loop) => {
let iter_val = self.eval_expr(&for_loop.iter)?;
let iter_val = self.eval_expr(&for_loop.iter).await?;
let items = match iter_val {
Value::List(items) => items,
Value::Str(s) => s.chars().map(|c| Value::Str(c.to_string())).collect(),
@ -488,7 +540,7 @@ impl Evaluator {
self.env.push_scope();
self.env.define(for_loop.var.clone(), item);
for body_stmt in &for_loop.body {
if let Some(v) = self.eval_statement(&body_stmt.node)? {
if let Some(v) = self.eval_statement(&body_stmt.node).await? {
self.env.pop_scope();
return Ok(Some(v));
}
@ -499,11 +551,11 @@ impl Evaluator {
}
Statement::If(if_stmt) => {
let cond = self.eval_expr(&if_stmt.condition)?;
let cond = self.eval_expr(&if_stmt.condition).await?;
if cond.is_truthy() {
self.env.push_scope();
for body_stmt in &if_stmt.then_body {
if let Some(v) = self.eval_statement(&body_stmt.node)? {
if let Some(v) = self.eval_statement(&body_stmt.node).await? {
self.env.pop_scope();
return Ok(Some(v));
}
@ -512,7 +564,7 @@ impl Evaluator {
} else if let Some(ref else_body) = if_stmt.else_body {
self.env.push_scope();
for body_stmt in else_body {
if let Some(v) = self.eval_statement(&body_stmt.node)? {
if let Some(v) = self.eval_statement(&body_stmt.node).await? {
self.env.pop_scope();
return Ok(Some(v));
}
@ -523,10 +575,10 @@ impl Evaluator {
}
Statement::Match(match_stmt) => {
let value = self.eval_expr(&match_stmt.expr)?;
let value = self.eval_expr(&match_stmt.expr).await?;
for arm in &match_stmt.arms {
if self.pattern_matches(&arm.pattern, &value) {
let result = self.eval_expr(&arm.body)?;
let result = self.eval_expr(&arm.body).await?;
return Ok(Some(result));
}
}
@ -536,14 +588,13 @@ impl Evaluator {
Statement::Dotfile(dotfile) => {
tracing::trace!("eval dotfile");
if let Some(ref when) = dotfile.when {
let cond = self.eval_expr(when)?;
let cond = self.eval_expr(when).await?;
if !cond.is_truthy() {
return Ok(None);
}
}
let source = self.eval_to_path(&dotfile.source)?;
let target = self.eval_to_path(&dotfile.target)?;
let source_val = self.eval_expr(&dotfile.source).await?;
let deploy = match dotfile.deploy {
crate::ast::DeployMode::Copy => DeployMode::Copy,
@ -562,8 +613,62 @@ impl Evaluator {
}
}
})
.collect();
.collect::<Vec<_>>();
// Detect glob patterns or lists and store as DotfilesPattern
let is_glob = |s: &str| s.contains('*') || s.contains('?') || s.contains('[');
match &source_val {
Value::Str(s) if is_glob(s) => {
let target_base = self.eval_to_path(&dotfile.target).await?;
self.result.dotfile_patterns.push(DotfilesPattern {
source: DotfilesSource::Pattern(s.clone()),
target_base,
template: dotfile.template.unwrap_or(false),
permissions,
owner: dotfile.owner.clone(),
deploy,
link_patterns: dotfile.link_patterns.clone(),
copy_patterns: dotfile.copy_patterns.clone(),
});
}
Value::Path(p) if is_glob(&p.display().to_string()) => {
let target_base = self.eval_to_path(&dotfile.target).await?;
self.result.dotfile_patterns.push(DotfilesPattern {
source: DotfilesSource::Pattern(p.display().to_string()),
target_base,
template: dotfile.template.unwrap_or(false),
permissions,
owner: dotfile.owner.clone(),
deploy,
link_patterns: dotfile.link_patterns.clone(),
copy_patterns: dotfile.copy_patterns.clone(),
});
}
Value::List(items) => {
let paths = items
.iter()
.filter_map(|v| match v {
Value::Path(p) => Some(p.clone()),
Value::Str(s) => Some(PathBuf::from(s)),
_ => None,
})
.collect();
let target_base = self.eval_to_path(&dotfile.target).await?;
self.result.dotfile_patterns.push(DotfilesPattern {
source: DotfilesSource::Paths(paths),
target_base,
template: dotfile.template.unwrap_or(false),
permissions,
owner: dotfile.owner.clone(),
deploy,
link_patterns: dotfile.link_patterns.clone(),
copy_patterns: dotfile.copy_patterns.clone(),
});
}
_ => {
let source = Self::value_to_path(&source_val)?;
let target = self.eval_to_path(&dotfile.target).await?;
self.result.dotfiles.push(DotfileConfig {
source,
target,
@ -573,42 +678,45 @@ impl Evaluator {
deploy,
link_patterns: dotfile.link_patterns.clone(),
copy_patterns: dotfile.copy_patterns.clone(),
exclude_paths: vec![],
});
}
}
Ok(None)
}
Statement::Package(pkg) => {
tracing::trace!("eval package");
if let Some(ref when) = pkg.when {
let cond = self.eval_expr(when)?;
let cond = self.eval_expr(when).await?;
if !cond.is_truthy() {
return Ok(None);
}
}
let default = if let Some(ref d) = pkg.default {
Some(self.eval_to_string(d)?)
Some(self.eval_to_string(d).await?)
} else {
None
};
let brew = if let Some(ref s) = pkg.brew {
Some(self.eval_to_string(&s.name)?)
Some(self.eval_to_string(&s.name).await?)
} else {
None
};
let apt = if let Some(ref s) = pkg.apt {
Some(self.eval_to_string(&s.name)?)
Some(self.eval_to_string(&s.name).await?)
} else {
None
};
let pacman = if let Some(ref s) = pkg.pacman {
Some(self.eval_to_string(&s.name)?)
Some(self.eval_to_string(&s.name).await?)
} else {
None
};
let yay = if let Some(ref s) = pkg.yay {
Some(self.eval_to_string(&s.name)?)
Some(self.eval_to_string(&s.name).await?)
} else {
None
};
@ -624,8 +732,8 @@ impl Evaluator {
}
Statement::Secret(secret) => {
let source = self.eval_to_path(&secret.source)?;
let target = self.eval_to_path(&secret.target)?;
let source = self.eval_to_path(&secret.source).await?;
let target = self.eval_to_path(&secret.target).await?;
self.result.secrets.push(SecretConfig {
source,
@ -638,13 +746,13 @@ impl Evaluator {
Statement::Hook(hook) => {
tracing::trace!("eval hook");
if let Some(ref when) = hook.when {
let cond = self.eval_expr(when)?;
let cond = self.eval_expr(when).await?;
if !cond.is_truthy() {
return Ok(None);
}
}
let run = self.eval_to_string(&hook.run)?;
let run = self.eval_to_string(&hook.run).await?;
self.result.hooks.push(HookConfig {
stage: hook.stage.clone(),
run,
@ -654,7 +762,7 @@ impl Evaluator {
Statement::Return(expr) => {
let value = if let Some(e) = expr {
self.eval_expr(e)?
self.eval_expr(e).await?
} else {
Value::None
};
@ -662,7 +770,7 @@ impl Evaluator {
}
Statement::Expr(expr) => {
self.eval_expr(expr)?;
self.eval_expr(expr).await?;
Ok(None)
}
@ -670,8 +778,9 @@ impl Evaluator {
}
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
fn eval_expr(&mut self, expr: &Expr) -> Result<Value, EvalError> {
async fn eval_expr(&mut self, expr: &Expr) -> Result<Value, EvalError> {
match expr {
Expr::Literal(lit) => Ok(match lit {
Literal::Int(n) => Value::Int(*n),
@ -693,13 +802,13 @@ impl Evaluator {
}
Expr::Binary(left, op, right) => {
let left_val = self.eval_expr(left)?;
let right_val = self.eval_expr(right)?;
let left_val = self.eval_expr(left).await?;
let right_val = self.eval_expr(right).await?;
self.eval_binary_op(&left_val, op, &right_val)
}
Expr::Unary(op, expr) => {
let val = self.eval_expr(expr)?;
let val = self.eval_expr(expr).await?;
match op {
UnaryOp::Neg => match val {
Value::Int(n) => Ok(Value::Int(-n)),
@ -719,13 +828,13 @@ impl Evaluator {
// First check if it's defined in the environment
if self.env.get(name).is_none() && self.env.get_function(name).is_none() {
// Not in env, try as a builtin
let arg_vals: Vec<Value> = args
.iter()
.map(|a| self.eval_expr(a))
.collect::<Result<_, _>>()?;
let mut arg_vals = Vec::with_capacity(args.len());
for a in args {
arg_vals.push(self.eval_expr(a).await?);
}
// Try calling as builtin - if it succeeds, return the result
match self.call_builtin(name, &arg_vals, args) {
match self.call_builtin(name, &arg_vals, args).await {
Ok(result) => return Ok(result),
Err(EvalError::UndefinedFunction(_)) => {
// Not a builtin either, fall through to report undefined variable
@ -736,18 +845,19 @@ impl Evaluator {
}
}
let callee_val = self.eval_expr(callee)?;
let arg_vals: Vec<Value> = args
.iter()
.map(|a| self.eval_expr(a))
.collect::<Result<_, _>>()?;
let callee_val = self.eval_expr(callee).await?;
let mut arg_vals = Vec::with_capacity(args.len());
for a in args {
arg_vals.push(self.eval_expr(a).await?);
}
match callee_val {
Value::Function(func, func_env) => {
self.call_function(&func, &func_env, &arg_vals)
self.call_function(&func, &func_env, &arg_vals).await
}
Value::Lambda(params, body, lambda_env) => {
self.call_lambda(&params, &body, &lambda_env, &arg_vals)
.await
}
_ => Err(EvalError::TypeError(format!(
"cannot call {}",
@ -757,17 +867,17 @@ impl Evaluator {
}
Expr::MethodCall(obj, method, args) => {
let obj_val = self.eval_expr(obj)?;
let arg_vals: Vec<Value> = args
.iter()
.map(|a| self.eval_expr(a))
.collect::<Result<_, _>>()?;
let obj_val = self.eval_expr(obj).await?;
let mut arg_vals = Vec::with_capacity(args.len());
for a in args {
arg_vals.push(self.eval_expr(a).await?);
}
self.call_method(&obj_val, method, &arg_vals, args)
self.call_method(&obj_val, method, &arg_vals, args).await
}
Expr::Field(obj, field) => {
let obj_val = self.eval_expr(obj)?;
let obj_val = self.eval_expr(obj).await?;
match obj_val {
Value::Struct(name, fields) => {
fields
@ -786,8 +896,8 @@ impl Evaluator {
}
Expr::Index(obj, idx) => {
let obj_val = self.eval_expr(obj)?;
let idx_val = self.eval_expr(idx)?;
let obj_val = self.eval_expr(obj).await?;
let idx_val = self.eval_expr(idx).await?;
match (obj_val, idx_val) {
(Value::List(items), Value::Int(i)) => {
@ -815,10 +925,10 @@ impl Evaluator {
}
Expr::List(items) => {
let values: Vec<Value> = items
.iter()
.map(|i| self.eval_expr(i))
.collect::<Result<_, _>>()?;
let mut values = Vec::with_capacity(items.len());
for i in items {
values.push(self.eval_expr(i).await?);
}
Ok(Value::List(values))
}
@ -828,14 +938,14 @@ impl Evaluator {
if let Some(decl) = self.env.get_struct(name).cloned() {
for field in &decl.fields {
if let Some(expr) = fields.get(&field.name) {
values.insert(field.name.clone(), self.eval_expr(expr)?);
values.insert(field.name.clone(), self.eval_expr(expr).await?);
} else if let Some(ref default) = field.default {
values.insert(field.name.clone(), self.eval_expr(default)?);
values.insert(field.name.clone(), self.eval_expr(default).await?);
}
}
} else {
for (k, v) in fields {
values.insert(k.clone(), self.eval_expr(v)?);
values.insert(k.clone(), self.eval_expr(v).await?);
}
}
@ -845,11 +955,11 @@ impl Evaluator {
Expr::EnumVariant(ty, variant) => Ok(Value::Enum(ty.clone(), variant.clone())),
Expr::If(cond, then_expr, else_expr) => {
let cond_val = self.eval_expr(cond)?;
let cond_val = self.eval_expr(cond).await?;
if cond_val.is_truthy() {
self.eval_expr(then_expr)
self.eval_expr(then_expr).await
} else if let Some(else_e) = else_expr {
self.eval_expr(else_e)
self.eval_expr(else_e).await
} else {
Ok(Value::None)
}
@ -861,17 +971,75 @@ impl Evaluator {
self.env.clone(),
)),
Expr::Await(expr) => self.eval_expr(expr),
Expr::Await(expr) => {
let val = self.eval_expr(expr).await?;
match val {
Value::Future(async_val) => {
let task = async_val
.0
.lock()
.map_err(|e| EvalError::AsyncError(e.to_string()))?
.take()
.ok_or_else(|| {
EvalError::AsyncError("future already consumed".into())
})?;
task.await
}
other => Ok(other), // Non-futures pass through
}
}
Expr::Path(left, right) => {
let left_path = self.eval_to_path(left)?;
let right_path = self.eval_to_path(right)?;
Ok(Value::Path(left_path.join(right_path)))
let left_val = self.eval_expr(left).await?;
let right_val = self.eval_expr(right).await?;
// If left is a list (from a previous glob), map over it
if let Value::List(items) = left_val {
let right_path = Self::value_to_path(&right_val)?;
let mut results = Vec::with_capacity(items.len());
for item in items {
let item_path = Self::value_to_path(&item)?;
let joined = item_path.join(&right_path);
let joined_str = joined.to_string_lossy();
if joined_str.contains('*')
|| joined_str.contains('?')
|| joined_str.contains('[')
{
for entry in glob::glob(&joined_str)
.map_err(|e| EvalError::TypeError(format!("invalid glob: {}", e)))?
.flatten()
{
results.push(Value::Path(entry));
}
} else {
results.push(Value::Path(joined));
}
}
return Ok(Value::List(results));
}
let left_path = Self::value_to_path(&left_val)?;
let right_path = Self::value_to_path(&right_val)?;
let joined = left_path.join(right_path);
// If the resulting path contains glob wildcards, expand as glob
let joined_str = joined.to_string_lossy();
if joined_str.contains('*') || joined_str.contains('?') || joined_str.contains('[')
{
let paths: Vec<Value> = glob::glob(&joined_str)
.map_err(|e| EvalError::TypeError(format!("invalid glob: {}", e)))?
.filter_map(|e| e.ok())
.map(Value::Path)
.collect();
Ok(Value::List(paths))
} else {
Ok(Value::Path(joined))
}
}
Expr::HomePath(path) => {
let home = Self::home_dir();
let path_val = self.eval_expr(path)?;
let path_val = self.eval_expr(path).await?;
match path_val {
Value::Str(s) if s.is_empty() => Ok(Value::Path(home)),
Value::Str(s) => Ok(Value::Path(home.join(s))),
@ -886,7 +1054,7 @@ impl Evaluator {
match part {
InterpolatedPart::Literal(s) => result.push_str(s),
InterpolatedPart::Expr(e) => {
let val = self.eval_expr(e)?;
let val = self.eval_expr(e).await?;
result.push_str(&val.to_string_repr());
}
}
@ -1081,8 +1249,9 @@ impl Evaluator {
}
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all, fields(name = %func.name))]
pub fn call_function(
pub async fn call_function(
&mut self,
func: &FnDecl,
func_env: &Env,
@ -1099,7 +1268,7 @@ impl Evaluator {
let mut result = Value::None;
for stmt in &func.body {
if let Some(v) = self.eval_statement(&stmt.node)? {
if let Some(v) = self.eval_statement(&stmt.node).await? {
result = v;
break;
}
@ -1109,8 +1278,9 @@ impl Evaluator {
Ok(result)
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
fn call_lambda(
async fn call_lambda(
&mut self,
params: &[FnParam],
body: &Expr,
@ -1125,38 +1295,37 @@ impl Evaluator {
}
let old_env = std::mem::replace(&mut self.env, new_env);
let result = self.eval_expr(body)?;
let result = self.eval_expr(body).await?;
self.env = old_env;
Ok(result)
}
#[tracing::instrument(level = "trace", skip_all, fields(name))]
fn call_builtin(
async fn call_builtin(
&mut self,
name: &str,
args: &[Value],
arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
builtins::call_builtin(self, name, args, arg_exprs)
builtins::call_builtin(self, name, args, arg_exprs).await
}
#[tracing::instrument(level = "trace", skip_all, fields(method))]
fn call_method(
async fn call_method(
&mut self,
obj: &Value,
method: &str,
args: &[Value],
arg_exprs: &[Expr],
) -> Result<Value, EvalError> {
builtins::call_method(self, obj, method, args, arg_exprs)
builtins::call_method(self, obj, method, args, arg_exprs).await
}
#[tracing::instrument(level = "trace", skip_all)]
fn eval_to_path(&mut self, expr: &Expr) -> Result<PathBuf, EvalError> {
let val = self.eval_expr(expr)?;
/// Converts a Value to a PathBuf without needing an expression.
fn value_to_path(val: &Value) -> Result<PathBuf, EvalError> {
match val {
Value::Path(p) => Ok(p),
Value::Path(p) => Ok(p.clone()),
Value::Str(s) => {
if let Some(stripped) = s.strip_prefix('~') {
let home = Self::home_dir();
@ -1172,6 +1341,12 @@ impl Evaluator {
}
}
#[tracing::instrument(level = "trace", skip_all)]
async fn eval_to_path(&mut self, expr: &Expr) -> Result<PathBuf, EvalError> {
let val = self.eval_expr(expr).await?;
Self::value_to_path(&val)
}
/// Returns DOOT_HOME if set, otherwise the real home directory.
#[tracing::instrument(level = "trace")]
fn home_dir() -> PathBuf {
@ -1181,8 +1356,8 @@ impl Evaluator {
}
#[tracing::instrument(level = "trace", skip_all)]
fn eval_to_string(&mut self, expr: &Expr) -> Result<String, EvalError> {
let val = self.eval_expr(expr)?;
async fn eval_to_string(&mut self, expr: &Expr) -> Result<String, EvalError> {
let val = self.eval_expr(expr).await?;
Ok(val.to_string_repr())
}
@ -1202,22 +1377,24 @@ impl Default for Evaluator {
}
impl Evaluator {
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all)]
pub fn eval_in_env(&mut self, expr: &Expr, env: Env) -> Result<Value, EvalError> {
pub async fn eval_in_env(&mut self, expr: &Expr, env: Env) -> Result<Value, EvalError> {
let old_env = std::mem::replace(&mut self.env, env);
let result = self.eval_expr(expr);
let result = self.eval_expr(expr).await;
self.env = old_env;
result
}
#[async_recursion(?Send)]
#[tracing::instrument(level = "trace", skip_all, fields(name = %func.name))]
pub fn call_fn(
pub async fn call_fn(
&mut self,
func: &FnDecl,
func_env: &Env,
args: &[Value],
) -> Result<Value, EvalError> {
self.call_function(func, func_env, args)
self.call_function(func, func_env, args).await
}
}

View file

@ -1,190 +0,0 @@
//! Task execution engine.
use super::dag::{DependencyGraph, Node, TaskData};
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use thiserror::Error;
/// Execution errors.
#[derive(Error, Debug)]
pub enum ExecutionError {
#[error("task failed: {task_id}: {message}")]
TaskFailed { task_id: String, message: String },
#[error("io error: {0}")]
IoError(#[from] std::io::Error),
}
/// Result type for task execution.
pub type TaskResult = Result<(), ExecutionError>;
/// Handler for task types.
pub trait TaskHandler: Send + Sync {
/// Handles dotfile deployment.
fn handle_dotfile(
&self,
source: &std::path::Path,
target: &std::path::Path,
template: bool,
) -> TaskResult;
/// Handles package installation.
fn handle_package(&self, name: &str, manager: &str) -> TaskResult;
/// Handles secret decryption.
fn handle_secret(&self, source: &std::path::Path, target: &std::path::Path) -> TaskResult;
/// Handles hook execution.
fn handle_hook(&self, command: &str) -> TaskResult;
}
/// Executes tasks from a dependency graph.
pub struct Executor<H: TaskHandler> {
graph: DependencyGraph,
handler: Arc<H>,
dry_run: bool,
}
impl<H: TaskHandler + 'static> Executor<H> {
/// Creates a new executor.
#[tracing::instrument(level = "trace", skip_all)]
pub fn new(graph: DependencyGraph, handler: H) -> Self {
Self {
graph,
handler: Arc::new(handler),
dry_run: false,
}
}
/// Sets dry run mode.
pub fn dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
/// Executes tasks sequentially.
#[tracing::instrument(skip(self))]
pub fn execute_sequential(&self) -> Result<ExecutionReport, ExecutionError> {
let order = self
.graph
.topological_sort()
.map_err(|e| ExecutionError::TaskFailed {
task_id: "scheduler".to_string(),
message: e,
})?;
let mut report = ExecutionReport::new();
for task_id in order {
if let Some(node) = self.graph.get_node(&task_id) {
let result = self.execute_node(node);
report.record(&task_id, result.is_ok());
result?;
}
}
Ok(report)
}
/// Executes tasks in parallel batches.
#[tracing::instrument(skip(self))]
pub fn execute_parallel(&self) -> Result<ExecutionReport, ExecutionError> {
let batches =
self.graph
.get_parallel_batches()
.map_err(|e| ExecutionError::TaskFailed {
task_id: "scheduler".to_string(),
message: e,
})?;
let report = Arc::new(Mutex::new(ExecutionReport::new()));
let errors = Arc::new(Mutex::new(Vec::new()));
for batch in batches {
let batch_errors: Vec<ExecutionError> = batch
.par_iter()
.filter_map(|task_id| {
if let Some(node) = self.graph.get_node(task_id) {
let result = self.execute_node(node);
report.lock().unwrap().record(task_id, result.is_ok());
result.err()
} else {
None
}
})
.collect();
if !batch_errors.is_empty() {
errors.lock().unwrap().extend(batch_errors);
break;
}
}
let errors = Arc::try_unwrap(errors).unwrap().into_inner().unwrap();
if let Some(err) = errors.into_iter().next() {
return Err(err);
}
Ok(Arc::try_unwrap(report).unwrap().into_inner().unwrap())
}
#[tracing::instrument(level = "trace", skip(self))]
fn execute_node(&self, node: &Node) -> TaskResult {
if self.dry_run {
return Ok(());
}
match &node.data {
TaskData::Dotfile {
source,
target,
template,
} => self.handler.handle_dotfile(source, target, *template),
TaskData::Package { name, manager } => self.handler.handle_package(name, manager),
TaskData::Secret { source, target } => self.handler.handle_secret(source, target),
TaskData::Hook { command } => self.handler.handle_hook(command),
TaskData::Custom(_) => Ok(()),
}
}
}
/// Execution result summary.
#[derive(Debug, Default)]
pub struct ExecutionReport {
pub succeeded: Vec<String>,
pub failed: Vec<String>,
}
impl ExecutionReport {
/// Creates an empty report.
pub fn new() -> Self {
Self::default()
}
/// Records a task result.
pub fn record(&mut self, task_id: &str, success: bool) {
if success {
self.succeeded.push(task_id.to_string());
} else {
self.failed.push(task_id.to_string());
}
}
/// Returns the total task count.
pub fn total(&self) -> usize {
self.succeeded.len() + self.failed.len()
}
/// Returns the success rate as a fraction.
pub fn success_rate(&self) -> f64 {
if self.total() == 0 {
1.0
} else {
self.succeeded.len() as f64 / self.total() as f64
}
}
}

View file

@ -1,11 +1,9 @@
//! Task planning and execution.
pub mod dag;
pub mod executor;
pub mod scheduler;
pub use dag::DependencyGraph;
pub use executor::Executor;
pub use scheduler::{
DotfileConflict, DotfileValidation, DotfileWarning, Scheduler, validate_dotfile_targets,
};

View file

@ -124,6 +124,8 @@ pub struct DotfileWarning {
pub struct DotfileValidation {
/// Indices in dependency order (respecting target relationships).
pub ordered_indices: Vec<usize>,
/// Batches of indices that can be deployed in parallel.
pub parallel_batches: Vec<Vec<usize>>,
/// Errors that prevent deployment.
pub errors: Vec<DotfileConflict>,
/// Warnings that should be shown to user.
@ -234,7 +236,7 @@ pub fn validate_dotfile_targets(
}
// Get execution order via topological sort
let ordered_indices = match graph.topological_sort() {
let ordered_indices: Vec<usize> = match graph.topological_sort() {
Ok(ids) => ids
.into_iter()
.filter_map(|id| id.strip_prefix("dotfile_").and_then(|s| s.parse().ok()))
@ -245,8 +247,23 @@ pub fn validate_dotfile_targets(
}
};
// Get parallel batches from the DAG
let parallel_batches = match graph.get_parallel_batches() {
Ok(batches) => batches
.into_iter()
.map(|batch| {
batch
.into_iter()
.filter_map(|id| id.strip_prefix("dotfile_").and_then(|s| s.parse().ok()))
.collect()
})
.collect(),
Err(_) => ordered_indices.iter().map(|&i| vec![i]).collect(),
};
DotfileValidation {
ordered_indices,
parallel_batches,
errors,
warnings,
}
@ -279,6 +296,7 @@ mod tests {
deploy: DeployMode::Copy,
link_patterns: Vec::new(),
copy_patterns: Vec::new(),
exclude_paths: Vec::new(),
}
}

View file

@ -38,6 +38,9 @@ pub enum TypeError {
got: usize,
span: std::ops::Range<usize>,
},
#[error("await can only be used inside async functions")]
AwaitOutsideAsync(std::ops::Range<usize>),
}
impl TypeError {
@ -70,6 +73,10 @@ impl TypeError {
format!("expected {} arguments, got {}", expected, got),
span.clone(),
),
TypeError::AwaitOutsideAsync(span) => (
"await can only be used inside async functions".to_string(),
span.clone(),
),
};
Report::build(ReportKind::Error, filename, span.start)
@ -89,6 +96,7 @@ impl TypeError {
pub struct TypeChecker {
env: TypeEnv,
errors: Vec<TypeError>,
in_async_context: bool,
}
impl TypeChecker {
@ -98,6 +106,7 @@ impl TypeChecker {
Self {
env: TypeEnv::new(),
errors: Vec::new(),
in_async_context: true, // top-level is implicitly async
}
}
@ -163,9 +172,12 @@ impl TypeChecker {
if decl.params.iter().any(|p| p.name == "self") {
// Method context
}
let old_async = self.in_async_context;
self.in_async_context = decl.is_async;
for body_stmt in &decl.body {
self.check_statement(body_stmt);
}
self.in_async_context = old_async;
self.env.pop_scope();
}
@ -273,8 +285,26 @@ impl TypeChecker {
}
Statement::Dotfile(dotfile) => {
self.infer_expr(&dotfile.source, &stmt.span);
self.infer_expr(&dotfile.target, &stmt.span);
let source_ty = self.infer_expr(&dotfile.source, &stmt.span);
// dotfile: source accepts path, str (pattern with wildcards), or list
if !matches!(
source_ty,
Type::Str | Type::Path | Type::List(_) | Type::Any | Type::Unknown
) {
self.errors.push(TypeError::TypeMismatch {
expected: "path, str, or [path]".to_string(),
got: source_ty.display(),
span: stmt.span.clone(),
});
}
let target_ty = self.infer_expr(&dotfile.target, &stmt.span);
if matches!(target_ty, Type::List(_)) {
self.errors.push(TypeError::TypeMismatch {
expected: "path".to_string(),
got: target_ty.display(),
span: stmt.span.clone(),
});
}
if let Some(ref when) = dotfile.when {
let when_ty = self.infer_expr(when, &stmt.span);
if !when_ty.is_compatible(&Type::Bool) {
@ -634,13 +664,29 @@ impl TypeChecker {
Type::Function(param_types, Box::new(return_ty))
}
Expr::Await(expr) => self.infer_expr(expr, span),
Expr::Await(expr) => {
if !self.in_async_context {
self.errors.push(TypeError::AwaitOutsideAsync(span.clone()));
}
self.infer_expr(expr, span)
}
Expr::Path(left, right) => {
self.infer_expr(left, span);
let left_ty = self.infer_expr(left, span);
self.infer_expr(right, span);
// If left is already a list (chained glob), result is a list
if matches!(left_ty, Type::List(_)) {
return Type::List(Box::new(Type::Path));
}
// Check if either operand has literal wildcards
if Self::expr_has_glob_wildcards(left) || Self::expr_has_glob_wildcards(right) {
Type::List(Box::new(Type::Path))
} else {
Type::Path
}
}
Expr::HomePath(_) => Type::Path,
@ -692,9 +738,10 @@ impl TypeChecker {
"starts_with" | "ends_with" => Type::Bool,
"read_file" | "read_file_lines" => Type::Str,
"file_exists" | "dir_exists" | "is_symlink" => Type::Bool,
"list_dir" | "glob" | "walk_dir" => Type::List(Box::new(Type::Path)),
"home" | "config_dir" | "config_path" | "data_dir" | "cache_dir" | "temp_dir"
| "temp_file" => Type::Path,
"list_dir" | "walk_dir" => Type::List(Box::new(Type::Path)),
"home" | "config_dir" | "data_dir" | "cache_dir" | "temp_dir" | "temp_file" => {
Type::Path
}
"path_join" | "path_parent" | "path_filename" | "path_extension" | "read_link" => {
Type::Path
}
@ -748,6 +795,49 @@ impl TypeChecker {
Type::List(Box::new(Type::Any))
}
}
// Parallel builtins
"par_map" | "par_filter" | "par_flat_map" | "par_sort_by" | "par_batch" => {
if !args.is_empty() {
let list_ty = self.infer_expr(&args[0], span);
if let Type::List(inner) = list_ty {
if name == "par_filter" {
return Type::List(inner);
}
return Type::List(Box::new(Type::Any));
}
}
Type::List(Box::new(Type::Any))
}
"par_any" | "par_all" => Type::Bool,
"par_find" => {
if !args.is_empty() {
let list_ty = self.infer_expr(&args[0], span);
if let Type::List(inner) = list_ty {
return Type::Optional(inner);
}
}
Type::Optional(Box::new(Type::Any))
}
"par_partition" => {
if !args.is_empty() {
let list_ty = self.infer_expr(&args[0], span);
if let Type::List(_) = list_ty {
return Type::List(Box::new(list_ty));
}
}
Type::List(Box::new(Type::List(Box::new(Type::Any))))
}
"par_reduce" => Type::Any,
"par_min_by" | "par_max_by" => {
if !args.is_empty() {
let list_ty = self.infer_expr(&args[0], span);
if let Type::List(inner) = list_ty {
return Type::Optional(inner);
}
}
Type::Optional(Box::new(Type::Any))
}
"par_for_each" => Type::None,
// Debug/print functions return None
"print" | "println" => Type::None,
"dbg" => {
@ -775,6 +865,14 @@ impl TypeChecker {
"first" | "last" => Type::Optional(Box::new(Type::Any)),
"contains" => Type::Bool,
"map" | "filter" | "sort" | "reverse" | "unique" => Type::List(Box::new(Type::Any)),
"par_map" | "par_filter" | "par_flat_map" | "par_sort_by" | "par_batch" => {
Type::List(Box::new(Type::Any))
}
"par_any" | "par_all" => Type::Bool,
"par_find" | "par_min_by" | "par_max_by" => Type::Optional(Box::new(Type::Any)),
"par_partition" => Type::List(Box::new(Type::List(Box::new(Type::Any)))),
"par_reduce" => Type::Any,
"par_for_each" => Type::None,
"fold" => Type::Any,
"join" => Type::Str,
_ => Type::Any,
@ -797,6 +895,14 @@ impl TypeChecker {
}
}
/// Checks if an expression is a string literal containing glob wildcards.
fn expr_has_glob_wildcards(expr: &Expr) -> bool {
match expr {
Expr::Literal(Literal::Str(s)) => s.contains('*') || s.contains('?') || s.contains('['),
_ => false,
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn resolve_type(&self, ty: &TypeAnnotation) -> Type {
match ty {