2011-04-05 10 views
36

El trozo de código que tengo se ve un poco de lo que de esta manera:¿Se copian datos de solo lectura en diferentes procesos para multiprocesamiento?

glbl_array = # a 3 Gb array 

def my_func(args, def_param = glbl_array): 
    #do stuff on args and def_param 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    pool.map(my_func, range(1000)) 

¿Hay una manera de asegurarse de que (o fomentar) que los diferentes procesos no recibe una copia de glbl_array pero comparte. Si no hay forma de detener la copia, iré con una matriz memmapped, pero mis patrones de acceso no son muy regulares, por lo que espero que las matrices memmapped sean más lentas. Lo anterior parecía ser lo primero que se debe intentar. Esto está en Linux. Solo quería algunos consejos de Stackoverflow y no quiero molestar a los administradores de sistemas. ¿Cree que ayudará si el segundo parámetro es un objeto inmutable genuino como glbl_array.tostring().

+0

Pensé que los diferentes procesos no pueden compartir las variables de memoria – Andrey

+8

@Andrey: Entonces aprendiste algo hoy :) –

Respuesta

83

Usted puede utilizar el material de memoria compartida de multiprocessing junto con Numpy con bastante facilidad:

import multiprocessing 
import ctypes 
import numpy as np 

shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10) 
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) 
shared_array = shared_array.reshape(10, 10) 

#-- edited 2015-05-01: the assert check below checks the wrong thing 
# with recent versions of Numpy/multiprocessing. That no copy is made 
# is indicated by the fact that the program prints the output shown below. 
## No copy was made 
##assert shared_array.base.base is shared_array_base.get_obj() 

# Parallel processing 
def my_func(i, def_param=shared_array): 
    shared_array[i,:] = i 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=4) 
    pool.map(my_func, range(10)) 

    print shared_array

que imprime

[[ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.] 
[ 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] 
[ 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.] 
[ 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.] 
[ 4. 4. 4. 4. 4. 4. 4. 4. 4. 4.] 
[ 5. 5. 5. 5. 5. 5. 5. 5. 5. 5.] 
[ 6. 6. 6. 6. 6. 6. 6. 6. 6. 6.] 
[ 7. 7. 7. 7. 7. 7. 7. 7. 7. 7.] 
[ 8. 8. 8. 8. 8. 8. 8. 8. 8. 8.] 
[ 9. 9. 9. 9. 9. 9. 9. 9. 9. 9.]]

However, Linux has copy-on-write semantics on fork(), por lo que incluso sin utilizar multiprocessing.Array, los datos no se copiarán a menos que se escribe.

+0

¡Excelente! esa fue una gran respuesta. Una duda, la definición de shared_array, shared_array_base necesita estar protegida por 'if __name__ == '__main __':'. Mi preocupación es que cada vez que se cargue el módulo, se redefinirán y costará espacio adicional. Pero puedo estar equivocado. – san

+0

La única restricción wrt. multiproceso es que shared_array_base está definida antes de llamar a 'pool.map'. 'fork()' y 'multiprocessing.Pool' no volverán a importar módulos, por lo que lo único que debe tener cuidado es con la asignación de memoria dentro de' my_func() '. –

+6

Solo para observar, en Python fork() en realidad significa copiar en el acceso (porque solo acceder al objeto cambiará su ref-count). –

2

Para aquellos atascados con Windows, que no es compatible con fork() (a menos que use CygWin), la respuesta de pv no funciona. Globales no están disponibles para procesos secundarios.

En su lugar, debe pasar la memoria compartida durante la inicialización del Pool/Process como tal:

#! /usr/bin/python 

import time 

from multiprocessing import Process, Queue, Array 

def f(q,a): 
    m = q.get() 
    print m 
    print a[0], a[1], a[2] 
    m = q.get() 
    print m 
    print a[0], a[1], a[2] 

if __name__ == '__main__': 
    a = Array('B', (1, 2, 3), lock=False) 
    q = Queue() 
    p = Process(target=f, args=(q,a)) 
    p.start() 
    q.put([1, 2, 3]) 
    time.sleep(1) 
    a[0:3] = (4, 5, 6) 
    q.put([4, 5, 6]) 
    p.join() 

(que no es numpy y no es un buen código pero ilustra el punto ;-)

3

El siguiente código funciona en Win7 y Mac (quizás en Linux, pero no probado).

import multiprocessing 
import ctypes 
import numpy as np 

#-- edited 2015-05-01: the assert check below checks the wrong thing 
# with recent versions of Numpy/multiprocessing. That no copy is made 
# is indicated by the fact that the program prints the output shown below. 
## No copy was made 
##assert shared_array.base.base is shared_array_base.get_obj() 

shared_array = None 

def init(shared_array_base): 
    global shared_array 
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) 
    shared_array = shared_array.reshape(10, 10) 

# Parallel processing 
def my_func(i): 
    shared_array[i, :] = i 

if __name__ == '__main__': 
    shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10) 

    pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,)) 
    pool.map(my_func, range(10)) 

    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) 
    shared_array = shared_array.reshape(10, 10) 
    print shared_array 
0

Si usted está buscando una opción que funciona de manera eficiente en Windows, y funciona bien para los patrones de acceso irregulares, ramificación, y otros escenarios en los que puede que tenga que analizar las diferentes matrices basadas en una combinación de una memoria compartida la matriz y los datos locales del proceso, el conjunto de herramientas de mathDict en el paquete ParallelRegression fue diseñado para manejar esta situación exacta.

Cuestiones relacionadas