mpi4pyの使い方

December 21, 2017 (Updated on: August 15, 2023)
by Keichi Takahashi

これは MPI Advent Calendar 2017 の21日目の記事です。 この記事では、MPIのPythonバインディングである MPI for Python (mpi4py) を紹介したいと思います。

mpi4pyは多くのスパコンにプリインストールされており、PythonからMPIを呼ぶ際は、 ほぼこれ一択のようです。 mpi4pyを利用しているアプリケーションの一例として、 PFI/PFNさんの分散深層学習フレームワーク ChainerMN があげられます。

基本

pipでインストールできます。

$ pip install mpi4py

基本的には、MPIの提供する関数を素直にバインディングしています。 下記はHello, worldです:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print(f"Hello, world! from rank {rank} out of {size}")

実行は、通常のMPIアプリケーションと同様にmpiexecでokです。

$ mpiexec -np 4 python3 test.py
Hello, world! from rank 0 out of 4
Hello, world! from rank 2 out of 4
Hello, world! from rank 3 out of 4
Hello, world! from rank 1 out of 4

ちなみに、MPI_Init()MPI_Finalize()は、mpi4pyモジュールの初回import時と、プロセス終了時に、 それぞれ自動的に呼ばれています。

通信関数

1対1や集団通信などの各種通信関数は、 MPI.Comm クラスのメソッド になっています。 本家MPIとの違いとして、Pythonのオブジェクトをpickle (シリアライズ) して送る小文字のメソッド (send(), recv(), bcast() scatter(), gather(), etc.) と、バッファを直接送る頭文字が大文字のメソッド (Send(), Recv(), Bcast() Scatter(), Gather(), etc.) の2系統に分かれています。

1対1通信でPythonのオブジェクトを送受信する例:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    req = comm.isend("foo", dest=1, tag=0)
elif rank == 1:
    req = comm.irecv(source=0, tag=0)

req.wait()

Pythonのオブジェクトをブロードキャストする例:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    msg = {"a": 123, "b": [456, 789]}
elif rank == 1:
    msg = None

comm.bcast(msg, root=0)

1対1通信でnumpyのndarrayを送受信する例:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()


if rank == 0:
    buf = np.arange(100, dtype="float64")
    req = comm.Isend(buf, dest=1, tag=0)
elif rank == 1:
    buf = np.empty(100, dtype="float64")
    req = comm.Irecv(buf, source=0, tag=0)

req.wait()

なお、 numpy.ndarray 以外にも、組み込み型の bytes や標準ライブラリの array.array など、バッファプロトコルを実装している型なら使えます。

numpyのndarrayをバッファをAllreduceする例:

from mpi4py import MPI
import numpy as np


comm = MPI.COMM_WORLD

sendbuf = np.arange(100, dtype="float64")
recvbuf = np.empty(sendbuf.shape, dtype="float64")

comm.Allreduce(sendbuf, recvbuf, MPI.SUM)

マスタ・ワーカ型の並列化

mpi4pyは、 concurrent.futures.Executor を継承した MPIPoolExecutor というクラスを提供しています。このクラスを使用すると、 ThreadPoolExecutorProcessPoolExecutor と同様のインターフェースで、 embarassingly parallelな計算を簡単に並列分散化することができます。

from mpi4py.futures import MPIPoolExecutor

def compute(x):
    # Some heavy computation
    return x * 2

if __name__ == "__main__":

    with MPIPoolExecutor() as executor:
        image = executor.map(compute, range(100))

内部では、MPI-2の動的プロセス作成機能 (MPI_Comm_spwan()) を使って ワーカプロセスを立ち上げているため、MPICHでは下記のコマンドで起動します:

$ mpiexec -usize 5 -np 1 python3 pool.py

Open MPIでは -usze の代わりに OMPI_UNIVERSE_SIZE という環境変数を設定すれば良いらしいのですが、 なぜかエラーを吐いて起動できません…本家に issue を立てたので、何か分かり次第追記します。

追記 どうもOpenMPIのバグのようです。とりあえずは、 下記のワークアラウンドで動きました。

$ MPI4PY_MAX_WORKERS=4 mpiexec --oversubscribe -np 1 python3 pool.py

まとめ

MPIのPythonバインディング mpi4py を紹介しました。 ここで紹介した以外にも、片側通信やMPI-IOなど、MPI-1/2/3のほとんどの機能が ラップされています。 Pythonアプリケーションから手軽にMPIの機能を呼び出したいときに、 ぜひ使ってみてください。