emout.distributed package¶
Submodules¶
emout.distributed.client module¶
Dask cluster lifecycle management (start / stop / connect).
- emout.distributed.client.cleanup_saved_server_state(state, client=None)[source]¶
Best-effort cleanup for a stale saved server session.
- Return type:
- emout.distributed.client.connect(address=None, *, name=None, timeout='5s', security=None, require_workers=False, worker_timeout=5.0)[source]¶
Connect to a running emout server.
If address is omitted, auto-detect from the saved server state.
- emout.distributed.client.ensure_client_has_workers(client, *, state=None, timeout=5.0, poll=0.5)[source]¶
Wait briefly for at least one worker, then fail fast if none arrive.
- emout.distributed.client.get_cluster_info(state, timeout='3s')[source]¶
Fetch scheduler information for a saved server state.
- emout.distributed.client.no_worker_reason(state, info=None)[source]¶
Describe why a scheduler currently has no usable workers.
- Return type:
- emout.distributed.client.query_worker_job_states(state, timeout=3.0)[source]¶
Query SLURM job states for tracked worker jobs.
Returns
Nonewhen job IDs are unavailable orsqueuecannot be used.
- emout.distributed.client.start_cluster(scheduler_ip=None, scheduler_port=None, partition=None, processes=None, threads=None, cores=None, memory=None, walltime=None, env_mods=None, logdir=None, *, server_name='default', protocol=None, security_files=None)[source]¶
Start a Dask cluster and return a client.
- emout.distributed.client.state_lost_workers(state, info=None)[source]¶
Return
Trueif the saved server likely lost all worker jobs.- Return type:
emout.distributed.clusters module¶
simple_dask_cluster.py
Minimal wrapper library for automatically performing the following from Python code, targeting SLURM-based supercomputers: - Start a Dask Scheduler - Submit Dask Workers via sbatch - Connect a Client
Usage example:
from simple_dask_cluster import SimpleDaskCluster
# Create the cluster object
cluster = SimpleDaskCluster(
scheduler_ip="10.10.64.1",
scheduler_port=8786,
partition="gr20001a",
processes=1,
threads=1,
cores=1,
memory="4G",
walltime="01:00:00",
env_mods=["module load Anaconda3", "conda activate dask_env"],
logdir="/home/b/b36291/large0/exp_dipole/logs",
sbatch_extra=None, # Pass additional sbatch options as a list
)
# Start the scheduler in the background
cluster.start_scheduler()
# Submit multiple workers (here 2)
cluster.submit_worker(jobs=2)
# Get a client and run distributed computations
client = cluster.get_client()
# e.g. call client.compute() with dask.array operations
# Clean up
client.close()
cluster.stop_scheduler()
# (SLURM jobs expire at walltime or can be cancelled with scancel)
- class emout.distributed.clusters.SimpleDaskCluster(scheduler_ip, scheduler_port=8786, partition='gr20001a', processes=1, threads=1, cores=1, memory='4G', walltime='01:00:00', env_mods=None, logdir=None, sbatch_extra=None, protocol='tcp', security_files=None)[source]¶
Bases:
objectManage a Dask Scheduler and Workers (sbatch) together from Python.
- get_client(timeout=30.0)[source]¶
Return a dask.distributed.Client. On first call, attempt to connect. The Client will automatically retry even if no workers have connected to the scheduler yet.
emout.distributed.config module¶
Network and cluster configuration helpers.
Resolves local IP addresses and checks port availability for distributed execution.
emout.distributed.utils module¶
Utility for dispatching computation to a local or Dask backend.
- emout.distributed.utils.run_backend(func, *args, **kwargs)[source]¶
Execute func locally or on the connected Dask cluster.
If a Dask client is active, submits func via
client.submit; otherwise falls back to a direct local call.- Parameters:
func (callable) – The function to execute.
*args – Positional arguments forwarded to func.
**kwargs – Keyword arguments forwarded to func.
- Returns:
The return value of func.
- Return type:
Module contents¶
Optional Dask-based distributed computing subsystem.
Available only on Python >= 3.10 with dask and distributed
installed. Provides cluster management, remote figure recording,
and remote backtrace / field rendering.