emout.distributed package

Submodules

emout.distributed.client module

emout.distributed.client.start_cluster(scheduler_ip=None, scheduler_port=None, partition=None, processes=None, threads=None, cores=None, memory=None, walltime=None, env_mods=None, logdir=None)[source]
emout.distributed.client.stop_cluster()[source]

emout.distributed.clusters module

simple_dask_cluster.py

京大スパコン(SLURM)向けに、Python コードから - Dask Scheduler の起動 - Dask Worker の sbatch 投入 - Client 接続 をすべて自動的に行えるようにするミニマルなラッパーライブラリです。

使い方(例):

from simple_dask_cluster import SimpleDaskCluster

# クラスを生成
cluster = SimpleDaskCluster(
    scheduler_ip="10.10.64.1",
    scheduler_port=8786,
    partition="gr20001a",
    processes=1,
    threads=1,
    cores=1,
    memory="4G",
    walltime="01:00:00",
    env_mods=["module load Anaconda3", "conda activate dask_env"],
    logdir="/home/b/b36291/large0/exp_dipole/logs",
    sbatch_extra=None,  # 追加 sbatch オプションがあればリストで渡す
)

# Scheduler をバックグラウンドで起動
cluster.start_scheduler()
# Worker を複数投入(ここでは 2 台)
cluster.submit_worker(jobs=2)
# Client を取得して分散計算を実行
client = cluster.get_client()
# ⇒ たとえば dask.array を使った処理を client.compute() で呼べる

# 最後に後始末
client.close()
cluster.stop_scheduler()
# (SLURM ジョブ自体は SLURM の期限(walltime)が来るか、scancel で落とす)
class emout.distributed.clusters.SimpleDaskCluster(scheduler_ip, scheduler_port=8786, partition='gr20001a', processes=1, threads=1, cores=1, memory='4G', walltime='01:00:00', env_mods=None, logdir=None, sbatch_extra=None)[source]

Bases: object

Dask Scheduler と Worker(sbatch)を Python でまとめて管理するクラス。

close_client()[source]

もし Client が生きていれば close する。

get_client(timeout=30.0)[source]

dask.distributed.Client を返す。初回呼び出し時に実際に接続を試みる。 Scheduler にワーカーがまだつながっていなくても、 Client が自動的にリトライしてくれる設計です。

Return type:

Client

start_scheduler(no_dashboard=True)[source]

バックグラウンドで dask-scheduler を立ち上げる。 self._sched_proc に Popen オブジェクトを保持する。

stop_scheduler()[source]

起動中の Scheduler を停止する (kill)。

submit_worker(jobs=1)[source]

Worker (dask-worker) を sbatch で投げる。 jobs の数だけ SLURM ジョブを投入し、それぞれの JOBID を返す。

emout.distributed.config module

class emout.distributed.config.DaskConfig[source]

Bases: object

property cores: int
property env_mods: list[str]
property logdir: Path
property memory: str
property partition: str
property processes: int
property scheduler_ip: str
property scheduler_port: int
property threads: int
property walltime: str

emout.distributed.utils module

emout.distributed.utils.run_backend(func, *args, **kwargs)[source]
  • compute=True: 即 execute(func) して結果を返す

  • compute=False: Dask Future を返す (Client が無ければ即時実行して返す)

Module contents