2009-08-30 22 views

Respuesta

2

Existe un error que impide la verdadera FIFO.
Lea here.

¡Una forma alternativa de crear una configuración de multiprocesamiento de cola de prioridad sería sin duda genial!

+0

Sí, pero ¿hay una buena manera de hacer cola de prioridad? ¿Es posible? (A menos que tenga que escribir específicamente un proceso para administrar, que creo que será aún más difícil y más propenso a errores) –

+0

Aún no he encontrado una forma de hacerlo, pero lo mantendré informado. –

5

Por desgracia, es en ninguna parte tan simple como cambiar disciplina de cola de un buen viejo Queue.Queue: este último es, de hecho, diseñado para ser una subclase de acuerdo con un patrón de plantilla-método, y anulando sólo los métodos de enlace _put y/o _get puede fácilmente permite cambiar la disciplina de colas (en 2.6 LIFO explícito y se ofrecen implementaciones de prioridad, pero fueron fáciles de realizar incluso en versiones anteriores de Python).

Para el multiprocesamiento, en el caso general (lectores múltiples, escritores múltiples), no veo ninguna solución para cómo implementar colas de prioridad, excepto para renunciar a la naturaleza distribuida de la cola; designar un proceso auxiliar especial que no hace más que manejar colas, enviar (esencialmente) RPCs para crear una cola con una disciplina específica, hacer puts y llegar a ella, obtener información al respecto, & c. Por lo tanto, uno tendría los problemas habituales para asegurarse de que cada proceso conoce la ubicación del auxiliar (host y puerto, por ejemplo), etc. (más fácil si el proceso genera siempre al iniciar el proceso principal). Un problema bastante grande, especialmente si uno quiere hacerlo con buen rendimiento, salvaguarda contra los fallos de aux proc (requiriendo replicación de datos a procesos esclavos, "elección maestra" distribuida entre esclavos si falla el maestro, & c), y así sucesivamente. Hacerlo desde cero suena como el trabajo de un doctorado. Uno podría comenzar desde el trabajo Johnson's, o a cuestas en un enfoque muy general como ActiveMQ.

Algunos casos especiales (por ejemplo, un solo lector, un solo escritor) pueden ser más fáciles, y resultan ser más rápidos para su área limitada de aplicación; pero luego debe elaborarse una especificación específicamente restringida para esa área limitada, y los resultados no constituirían una "cola de multiprocesamiento" (de propósito general), sino que se aplicaría únicamente al conjunto restringido de requisitos.

1

Si bien esta no es una respuesta, tal vez pueda ayudarlo a desarrollar una cola de multiprocesamiento.

Aquí es un simple prioridad de la cola clase que escribí usando matriz de Python:

class PriorityQueue(): 
    """A basic priority queue that dequeues items with the smallest priority number.""" 
    def __init__(self): 
     """Initializes the queue with no items in it.""" 
     self.array = [] 
     self.count = 0 

    def enqueue(self, item, priority): 
     """Adds an item to the queue.""" 
     self.array.append([item, priority]) 
     self.count += 1 

    def dequeue(self): 
     """Removes the highest priority item (smallest priority number) from the queue.""" 
     max = -1 
     dq = 0 
     if(self.count > 0): 
      self.count -= 1 

      for i in range(len(self.array)): 
       if self.array[i][1] != None and self.array[i][1] > max: 
        max = self.array[i][1] 

      if max == -1: 
       return self.array.pop(0) 
      else: 
       for i in range(len(self.array)): 
        if self.array[i][1] != None and self.array[i][1] <= max: 
         max = self.array[i][1] 
         dq = i 
       return self.array.pop(dq) 

    def requeue(self, item, newPrio): 
     """Changes specified item's priority.""" 
     for i in range(len(self.array)): 
      if self.array[i][0] == item: 
       self.array[i][1] = newPrio 
       break 

    def returnArray(self): 
     """Returns array representation of the queue.""" 
     return self.array 

    def __len__(self): 
     """Returnes the length of the queue.""" 
     return self.count 
0

Dependiendo de sus necesidades usted podría utilizar el sistema operativo y el sistema de archivos en un número de maneras. ¿Qué tan grande crecerá la cola y qué tan rápido debe ser? Si la cola puede ser grande pero está dispuesto a abrir un par de archivos para cada acceso de cola, puede usar una implementación de BTree para almacenar la cola y el bloqueo de archivos para hacer cumplir el acceso exclusivo. Lento pero robusto

Si la cola se mantendrá relativamente pequeña y se necesita que sea rápido que podría ser capaz de utilizar la memoria compartida en algunos sistemas operativos ...

Si la cola será pequeño (1000 de entradas) y se no necesita que sea realmente rápido, podría usar algo tan simple como un directorio con archivos que contengan los datos con bloqueo de archivos. Esta sería mi preferencia si pequeña y lenta está bien.

Si la cola puede ser grande y quiere que sea rápida en promedio, entonces probablemente debería utilizar un proceso de servidor dedicado como sugiere Alex. Sin embargo, esto es un dolor en el cuello.

¿Cuáles son sus requisitos de rendimiento y tamaño?

1

Tenía el mismo estuche de uso. Pero con un número finito de prioridades.

Lo que estoy terminando haciendo es crear una cola por prioridad, y mis trabajadores de proceso intentarán obtener los elementos de esas colas, comenzando con la cola más importante a la menos importante (pasando de una cola a la otra) se hace cuando la cola está vacía)

+0

Parece la respuesta más razonablemente factible en la página. – speedplane

+1

Hmm ... pensándolo bien, esto todavía no es fácil de hacer perfectamente bien. Por ejemplo, cuando la cola de mayor prioridad está vacía y pasa a la siguiente, ¿cómo se puede evitar una condición de carrera en la que se llene la cola de mayor prioridad mientras se comprueba la siguiente cola más alta? – speedplane

0

Inspirado por la sugerencia de @ user211505, armé algo rápido y sucio.

Tenga en cuenta que esta no es una solución completa para el difícil problema de las colas de prioridad en entornos de producción de multiprocesamiento. Sin embargo, si solo está metiendo la pata o necesita algo para un proyecto corto, es probable que se ajuste a la factura:

from time import sleep 
from datetime import datetime 
from Queue import Empty 
from multiprocessing import Queue as ProcessQueue 

class SimplePriorityQueue(object): 
    ''' 
    Simple priority queue that works with multiprocessing. Only a finite number 
    of priorities are allowed. Adding many priorities slow things down. 

    Also: no guarantee that this will pull the highest priority item 
    out of the queue if many items are being added and removed. Race conditions 
    exist where you may not get the highest priority queue item. However, if 
    you tend to keep your queues not empty, this will be relatively rare. 
    ''' 
    def __init__(self, num_priorities=1, default_sleep=.2): 
     self.queues = [] 
     self.default_sleep = default_sleep 
     for i in range(0, num_priorities): 
      self.queues.append(ProcessQueue()) 

    def __repr__(self): 
     return "<Queue with %d priorities, sizes: %s>"%(len(self.queues), 
        ", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()), 
           enumerate(self.queues)))) 

    qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues)) 

    def get(self, block=True, timeout=None): 
     start = datetime.utcnow() 
     while True: 
      for q in self.queues: 
       try: 
        return q.get(block=False) 
       except Empty: 
        pass 
      if not block: 
       raise Empty 
      if timeout and (datetime.utcnow()-start).total_seconds > timeout: 
       raise Empty 

      if timeout: 
       time_left = (datetime.utcnow()-start).total_seconds - timeout 
       sleep(time_left/4) 
      else: 
       sleep(self.default_sleep) 

    get_nowait = lambda(self): self.get(block=False) 

    def put(self, priority, obj, block=False, timeout=None): 
     if priority < 0 or priority >= len(self.queues): 
      raise Exception("Priority %d out of range."%priority) 
     # Block and timeout don't mean much here because we never set maxsize 
     return self.queues[priority].put(obj, block=block, timeout=timeout) 
Cuestiones relacionadas