2011-07-26 30 views
62

Un programa que crea varios procesos que funcionan en una cola de unión, Q, y puede eventualmente manipular un diccionario global D para almacenar resultados. (para que cada proceso secundario pueda usar D para almacenar su resultado y también ver qué resultados están produciendo los otros procesos secundarios)Multiproceso de Python: ¿cómo comparto un dict entre múltiples procesos?

Si imprimo el diccionario D en un proceso hijo, veo las modificaciones que se han realizado en él (es decir, en D). Pero después de que el proceso principal se une a Q, si imprimo D, ¡es un dict vacío!

Entiendo que es un problema de sincronización/bloqueo. ¿Puede alguien decirme qué está pasando aquí y cómo puedo sincronizar el acceso a D?

Respuesta

15

multiprocesamiento no es como el enhebrado. Cada proceso secundario obtendrá una copia de la memoria del proceso principal. En general, el estado se comparte a través de la comunicación (tuberías/enchufes), señales o memoria compartida.

multiprocesamiento hace algunas abstracciones disponibles para su caso de uso - estado compartido que está considerados locales mediante el uso de servidores proxy o compartido de memoria: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

secciones pertinentes:

+0

Muchas gracias. Me condujo a la solución/a: multiprocesamiento.Manager(). Dict(). – dop

92

Una respuesta general implica el uso de un objeto Manager. Adaptado de la documentación:

from multiprocessing import Process, Manager 

def f(d): 
    d[1] += '1' 
    d['2'] += 2 

if __name__ == '__main__': 
    manager = Manager() 

    d = manager.dict() 
    d[1] = '1' 
    d['2'] = 2 

    p1 = Process(target=f, args=(d,)) 
    p2 = Process(target=f, args=(d,)) 
    p1.start() 
    p2.start() 
    p1.join() 
    p2.join() 

    print d 

Salida:

$ python mul.py 
{1: '111', '2': 6} 
+2

Gracias senderle. De hecho, D = multiprocesamiento.Manager(). Dict() resuelve mi problema. Estaba usando D = dict(). – dop

+0

funciona con Manager(). Dict() pero no Manager(). List() –

+0

@Coc, funciona perfectamente para mí, una vez que cambié las cadenas por las iniciales y preasigné la lista. Debes estar cometiendo un error en alguna parte. – senderle

1

Tal vez usted puede intentar pyshmht, compartiendo la extensión tabla hash basado memoria para Python.

Aviso

  1. No está completamente probado, sólo para su referencia.

  2. Actualmente carece de mecanismos de bloqueo/sem para multiprocesamiento.

8

Me gustaría compartir mi propio trabajo que es más rápido que dict del gerente y es más simple y más estable que la biblioteca pyshmht que utiliza toneladas de memoria y no funciona para Mac OS. Aunque mi dict solo funciona para cadenas simples y es inmutable actualmente. Utilizo la implementación de sondeo lineal y almacene las claves y pares de valores en un bloque de memoria separado después de la tabla.

from mmap import mmap 
import struct 
from timeit import default_timer 
from multiprocessing import Manager 
from pyshmht import HashTable 


class shared_immutable_dict: 
    def __init__(self, a): 
     self.hs = 1 << (len(a) * 3).bit_length() 
     kvp = self.hs * 4 
     ht = [0xffffffff] * self.hs 
     kvl = [] 
     for k, v in a.iteritems(): 
      h = self.hash(k) 
      while ht[h] != 0xffffffff: 
       h = (h + 1) & (self.hs - 1) 
      ht[h] = kvp 
      kvp += self.kvlen(k) + self.kvlen(v) 
      kvl.append(k) 
      kvl.append(v) 

     self.m = mmap(-1, kvp) 
     for p in ht: 
      self.m.write(uint_format.pack(p)) 
     for x in kvl: 
      if len(x) <= 0x7f: 
       self.m.write_byte(chr(len(x))) 
      else: 
       self.m.write(uint_format.pack(0x80000000 + len(x))) 
      self.m.write(x) 

    def hash(self, k): 
     h = hash(k) 
     h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) 
     return h 

    def get(self, k, d=None): 
     h = self.hash(k) 
     while True: 
      x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] 
      if x == 0xffffffff: 
       return d 
      self.m.seek(x) 
      if k == self.read_kv(): 
       return self.read_kv() 
      h = (h + 1) & (self.hs - 1) 

    def read_kv(self): 
     sz = ord(self.m.read_byte()) 
     if sz & 0x80: 
      sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 
     return self.m.read(sz) 

    def kvlen(self, k): 
     return len(k) + (1 if len(k) <= 0x7f else 4) 

    def __contains__(self, k): 
     return self.get(k, None) is not None 

    def close(self): 
     self.m.close() 

uint_format = struct.Struct('>I') 


def uget(a, k, d=None): 
    return to_unicode(a.get(to_str(k), d)) 


def uin(a, k): 
    return to_str(k) in a 


def to_unicode(s): 
    return s.decode('utf-8') if isinstance(s, str) else s 


def to_str(s): 
    return s.encode('utf-8') if isinstance(s, unicode) else s 


def mmap_test(): 
    n = 1000000 
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'mmap speed: %d gets per sec' % (n/(default_timer() - start_time)) 


def manager_test(): 
    n = 100000 
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'manager speed: %d gets per sec' % (n/(default_timer() - start_time)) 


def shm_test(): 
    n = 1000000 
    d = HashTable('tmp', n) 
    d.update({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'shm speed: %d gets per sec' % (n/(default_timer() - start_time)) 


if __name__ == '__main__': 
    mmap_test() 
    manager_test() 
    shm_test() 

En mis resultados de rendimiento portátil son:

mmap speed: 247288 gets per sec 
manager speed: 33792 gets per sec 
shm speed: 691332 gets per sec 

ejemplo de uso simple:

ht = shared_immutable_dict({'a': '1', 'b': '2'}) 
print ht.get('a') 
+5

Github? ¿Documentación? ¿cómo podemos usar esta herramienta? – Pavlos

Cuestiones relacionadas