emout.distributed package

Submodules

emout.distributed.client module

Dask cluster lifecycle management (start / stop / connect).

emout.distributed.client.cleanup_saved_server_state(state, client=None)[source]

Best-effort cleanup for a stale saved server session.

Return type:

None

emout.distributed.client.connect(address=None, *, name=None, timeout='5s', security=None, require_workers=False, worker_timeout=5.0)[source]

Connect to a running emout server.

If address is omitted, auto-detect from the saved server state.

emout.distributed.client.ensure_client_has_workers(client, *, state=None, timeout=5.0, poll=0.5)[source]

Wait briefly for at least one worker, then fail fast if none arrive.

Return type:

dict[str, Any]

emout.distributed.client.get_cluster_info(state, timeout='3s')[source]

Fetch scheduler information for a saved server state.

Return type:

dict[str, Any]

emout.distributed.client.no_worker_reason(state, info=None)[source]

Describe why a scheduler currently has no usable workers.

Return type:

str

emout.distributed.client.query_worker_job_states(state, timeout=3.0)[source]

Query SLURM job states for tracked worker jobs.

Returns None when job IDs are unavailable or squeue cannot be used.

Return type:

dict[int, str] | None

emout.distributed.client.start_cluster(scheduler_ip=None, scheduler_port=None, partition=None, processes=None, threads=None, cores=None, memory=None, walltime=None, env_mods=None, logdir=None, *, server_name='default', protocol=None, security_files=None)[source]

Start a Dask cluster and return a client.

emout.distributed.client.state_lost_workers(state, info=None)[source]

Return True if the saved server likely lost all worker jobs.

Return type:

bool

emout.distributed.client.stop_cluster(address=None, *, name=None, state=None, timeout='5s')[source]

Stop a running Dask cluster.

emout.distributed.client.worker_jobs_active(state)[source]

Return whether tracked worker jobs still exist in SLURM.

Return type:

bool | None

emout.distributed.clusters module

simple_dask_cluster.py

Minimal wrapper library for automatically performing the following from Python code, targeting SLURM-based supercomputers: - Start a Dask Scheduler - Submit Dask Workers via sbatch - Connect a Client

Usage example:

from simple_dask_cluster import SimpleDaskCluster

# Create the cluster object
cluster = SimpleDaskCluster(
    scheduler_ip="10.10.64.1",
    scheduler_port=8786,
    partition="gr20001a",
    processes=1,
    threads=1,
    cores=1,
    memory="4G",
    walltime="01:00:00",
    env_mods=["module load Anaconda3", "conda activate dask_env"],
    logdir="/home/b/b36291/large0/exp_dipole/logs",
    sbatch_extra=None,  # Pass additional sbatch options as a list
)

# Start the scheduler in the background
cluster.start_scheduler()
# Submit multiple workers (here 2)
cluster.submit_worker(jobs=2)
# Get a client and run distributed computations
client = cluster.get_client()
# e.g. call client.compute() with dask.array operations

# Clean up
client.close()
cluster.stop_scheduler()
# (SLURM jobs expire at walltime or can be cancelled with scancel)
class emout.distributed.clusters.SimpleDaskCluster(scheduler_ip, scheduler_port=8786, partition='gr20001a', processes=1, threads=1, cores=1, memory='4G', walltime='01:00:00', env_mods=None, logdir=None, sbatch_extra=None, protocol='tcp', security_files=None)[source]

Bases: object

Manage a Dask Scheduler and Workers (sbatch) together from Python.

close_client()[source]

Close the Client if it is alive.

get_client(timeout=30.0)[source]

Return a dask.distributed.Client. On first call, attempt to connect. The Client will automatically retry even if no workers have connected to the scheduler yet.

start_scheduler(no_dashboard=True)[source]

Start dask-scheduler in the background. Store the Popen object in self._sched_proc.

stop_scheduler()[source]

Stop the running Scheduler (kill).

submit_worker(jobs=1)[source]

Submit Workers (dask-worker) via sbatch. Submit jobs SLURM jobs and return their JOBIDs.

emout.distributed.config module

Network and cluster configuration helpers.

Resolves local IP addresses and checks port availability for distributed execution.

class emout.distributed.config.DaskConfig[source]

Bases: object

Environment-variable-based Dask cluster configuration.

property cores: int
property env_mods: list[str]
property logdir: Path
property memory: str
property partition: str
property processes: int
property protocol: str
property scheduler_ip: str
property scheduler_port: int
property threads: int
property walltime: str

emout.distributed.utils module

Utility for dispatching computation to a local or Dask backend.

emout.distributed.utils.run_backend(func, *args, **kwargs)[source]

Execute func locally or on the connected Dask cluster.

If a Dask client is active, submits func via client.submit; otherwise falls back to a direct local call.

Parameters:
  • func (callable) – The function to execute.

  • *args – Positional arguments forwarded to func.

  • **kwargs – Keyword arguments forwarded to func.

Returns:

The return value of func.

Return type:

object

Module contents

Optional Dask-based distributed computing subsystem.

Available only on Python >= 3.10 with dask and distributed installed. Provides cluster management, remote figure recording, and remote backtrace / field rendering.