numpy array in shared memory for multiprocessing in Python 3

By | 2017/12/27

Python 3로 작업할 때 속도 향상을 위해 multiprocessing이 필요할 때가 있습니다. 그 때 가끔 shared memory를 써야할 때가 있더군요.

문제는 되도록이면 이를 하지 않으려고 하고 있고 실제로 그렇게 해오지만, 가끔 질문을 받거나 필요할 때 어떻게 하는지 까먹어서 고생입니다.

그래서 여기에 간단히 그 둘의 차이를 확연하게 알 수 있는 간단한 예제 코드를 첨부하였습니다. 아마 이를 보면 후에 저도 바로 이해를 할 수 있을 것입니다.

 

Code

import numpy as np
from multiprocessing import Pool, Array
import ctypes


def func1(args):
    x_idx, y_idx, size, mat = args

    mat[x_idx, y_idx] += 2 * size * x_idx + y_idx

    return x_idx, y_idx, mat[x_idx, y_idx]


def init_shared(_shared, size):
    global shared_mat

    shared_mat = np.frombuffer(_shared.get_obj()).reshape((size, size))


def func2(args):
    x_idx, y_idx, size = args

    shared_mat[x_idx, y_idx] += 2 * size * x_idx + y_idx

    return x_idx, y_idx, shared_mat[x_idx, y_idx]


class Iter1:
    def __init__(self, mat, size):
        self.mat = mat
        self.x_idx = -1
        self.y_idx = 0
        self.size = size

    def __iter__(self):
        return self

    def __next__(self):
        self.x_idx += 1
        if self.x_idx >= self.size:
            self.y_idx += 1
            self.x_idx = 0

            if self.y_idx >= self.size:
                raise StopIteration
                return 0

        return self.x_idx, self.y_idx, self.size, self.mat


class Iter2:
    def __init__(self, size):
        self.x_idx = -1
        self.y_idx = 0
        self.size = size

    def __iter__(self):
        return self

    def __next__(self):
        self.x_idx += 1
        if self.x_idx >= self.size:
            self.y_idx += 1
            self.x_idx = 0

            if self.y_idx >= self.size:
                raise StopIteration
                return 0

        return self.x_idx, self.y_idx, self.size


def main():
    size = 3

    print("1. Make children processes without shared memory")
    mat = np.zeros((size, size))
    print("- Before the matrix in parent process:")
    print(mat)

    print("- Result from children processes")
    with Pool(processes=4) as pool:
        args = Iter1(mat, size)
        result_by_doc = pool.imap_unordered(func1, args)
        for x_idx, y_idx, val in result_by_doc:
            print(x_idx, y_idx, val)

    print("- After the matrix in parent process:")
    print(mat)

    print()

    print("2. Make children processes with shared memory")
    shared_loc = Array(ctypes.c_double, size * size)
    mat_shared = np.frombuffer(shared_loc.get_obj())
    mat_shared[:] = mat.reshape(size*size).copy()

    print("- Before the matrix in parent process:")
    print(mat_shared.reshape((size, size)))

    print("- Result from children processes")
    with Pool(processes=4, initializer=init_shared, initargs=(shared_loc, size,)) as pool:
        args = Iter2(size)
        result_by_doc = pool.imap_unordered(func2, args)
        for x_idx, y_idx, val in result_by_doc:
            print(x_idx, y_idx, val)

    print("- After the matrix in parent process:")
    print(mat_shared.reshape((size, size)))


if __name__ == "__main__":
    main()

 

Result

1. Make children processes without shared memory
- Before the matrix in parent process:
[[ 0.  0.  0.]
[ 0.  0.  0.]
[ 0.  0.  0.]]
- Result from children processes
0 0 0.0
1 0 6.0
2 0 12.0
0 1 1.0
1 1 7.0
2 1 13.0
0 2 2.0
1 2 8.0
2 2 14.0
- After the matrix in parent process:
[[ 0.  0.  0.]
[ 0.  0.  0.]
[ 0.  0.  0.]]

2. Make children processes with shared memory
- Before the matrix in parent process:
[[ 0.  0.  0.]
[ 0.  0.  0.]
[ 0.  0.  0.]]
- Result from children processes
0 0 0.0
1 0 6.0
2 0 12.0
0 1 1.0
1 1 7.0
2 1 13.0
0 2 2.0
1 2 8.0
2 2 14.0
- After the matrix in parent process:
[[  0.   1.   2.]
[  6.   7.   8.]
[ 12.  13.  14.]]

 

참고자료

Leave a Reply