2012-06-06 11 views
14

¿Cómo implemento un búfer FIFO al que puedo agregar de manera eficiente trozos de bytes de tamaño arbitrario al jefe y desde el cual puedo extraer fragmentos de bytes arbitrariamente dimensionados desde la cola ?Cola FIFO eficiente para trozos de bytes de tamaño arbitrario en Python

Antecedentes:

tengo una clase que lee bytes de objetos de archivo-como en trozos de tamaño arbitrario y es en sí mismo un objeto de fichero del cual los clientes pueden leer los bytes en trozos de tamaño arbitrario.

La forma en que tengo esto implementado es que cada vez que un cliente quiere leer un trozo de bytes, la clase lee repetidamente de los objetos subyacentes tipo archivo (con tamaños de fragmentos apropiados para esos objetos) y agrega los bytes al encabezado de una cola FIFO hasta que haya suficientes bytes en la cola para servir un trozo del tamaño solicitado al cliente. A continuación, coloca esos bytes fuera de la cola de la cola y los devuelve al cliente.

Tengo un problema de rendimiento que ocurre cuando el tamaño del fragmento para los objetos subyacentes tipo archivo es mucho mayor que el tamaño del fragmento que usan los clientes cuando leen de la clase.

Supongamos que el tamaño del fragmento para los objetos subyacentes tipo archivo es de 1 MiB y el tamaño del fragmento con el que el cliente lee es de 1 KiB. La primera vez que el cliente solicita 1 KiB, la clase debe leer 1 MiB y agregarlo a la cola FIFO. Luego, para esa solicitud y las 1023 solicitudes subsiguientes, la clase debe mostrar 1 KiB en la cola de la cola FIFO, que disminuye gradualmente en tamaño de 1 MiB a 0 bytes, en cuyo momento el ciclo comienza de nuevo.

Actualmente lo he implementado con un objeto StringIO. Escribir nuevos bytes al final del objeto StringIO es rápido, pero la eliminación de bytes desde el principio es muy lenta, porque se debe crear un nuevo objeto StringIO, que contenga una copia de todo el búfer anterior menos el primer fragmento de bytes.

ASÍ QUE las preguntas que tratan problemas similares tienden a apuntar al contenedor deque. Sin embargo, deque se implementa como una lista doblemente vinculada. Escribir un fragmento para el deque requeriría dividir el fragmento en objetos, cada uno con un solo byte. El deque luego agregaría dos punteros a cada objeto para almacenar, probablemente aumentando los requisitos de memoria al menos en un orden de magnitud en comparación con los bytes. Además, tomaría mucho tiempo recorrer la lista enlazada y tratar con cada objeto para dividir los fragmentos en objetos y unir los objetos en trozos.

Respuesta

13

he implementado actualmente esto con un objeto StringIO. Escribir nuevos bytes al final del objeto StringIO es rápido, pero eliminar los bytes desde el principio es muy lento, porque un nuevo objeto StringIO, que contiene una copia del búfer completo anterior menos el primer fragmento de bytes, debe ser creado.

En realidad la forma más típica de la aplicación de FIFO es de dos utilización envoltura alrededor de inercia con dos punteros como tales:

enter image description hereimage source

Ahora, se puede implementar que con StringIO() usando .seek() para leer/escribe desde la ubicación apropiada.

+1

Ooh, +1 para el envolvente. No había pensado en eso. Sin embargo, necesitaría saber el tamaño máximo por adelantado; de hecho, supongo que se podría cultivar según sea necesario ... – Cameron

+0

¡Gracias! Esto se ve perfecto. Hice un experimento con StringIO que indica que se expandirá automáticamente para acomodar esto. Por ejemplo, si el tamaño actual del objeto StringIO es de 10 bytes y PUTPT (la ubicación de búsqueda) está en el índice 5, escribir un fragmento de 20 bytes expande automáticamente el objeto StringIO a 25 bytes, conservando los primeros 5 bytes y sobrescribiendo el resto. Sin embargo, si GETPT está actualmente después de PUTPT, se requiere algo más de lógica. –

+0

Implementé esta idea en mi respuesta a continuación. ¡Aclamaciones! – Cameron

3

¿Puede suponer cualquier cosa sobre las cantidades esperadas de lectura/escritura?

La fragmentación de los datos en, por ejemplo, fragmentos de 1024 bytes y el uso de deque [1] podrían funcionar mejor; podrías leer N pedazos completos, luego un último trozo para dividir y poner el resto al principio de la cola.

1) collections.deque

class collections.deque([iterable[, maxlen]])

devuelve un nuevo objeto deque inicializado de izquierda a derecha (utilizando append()) con los datos de iterable. Si no se especifica iterable, el nuevo deque está vacío.

Deques son una generalización de las pilas y las colas (el nombre se pronuncia "cubierta" y es la abreviatura de "cola doble"). Deques es compatible con la seguridad de subprocesos, la memoria eficiente se agrega y emerge de cualquier lado del deque con aproximadamente el mismo rendimiento de O (1) en cualquier dirección. ...

9

actualización: Aquí está una implementación de la técnica de búfer circular de vartec's answer (sobre la base de mi respuesta original, conservado por debajo de los curiosos):

from cStringIO import StringIO 

class FifoFileBuffer(object): 
    def __init__(self): 
     self.buf = StringIO() 
     self.available = 0 # Bytes available for reading 
     self.size = 0 
     self.write_fp = 0 

    def read(self, size = None): 
     """Reads size bytes from buffer""" 
     if size is None or size > self.available: 
      size = self.available 
     size = max(size, 0) 

     result = self.buf.read(size) 
     self.available -= size 

     if len(result) < size: 
      self.buf.seek(0) 
      result += self.buf.read(size - len(result)) 

     return result 


    def write(self, data): 
     """Appends data to buffer""" 
     if self.size < self.available + len(data): 
      # Expand buffer 
      new_buf = StringIO() 
      new_buf.write(self.read()) 
      self.write_fp = self.available = new_buf.tell() 
      read_fp = 0 
      while self.size <= self.available + len(data): 
       self.size = max(self.size, 1024) * 2 
      new_buf.write('0' * (self.size - self.write_fp)) 
      self.buf = new_buf 
     else: 
      read_fp = self.buf.tell() 

     self.buf.seek(self.write_fp) 
     written = self.size - self.write_fp 
     self.buf.write(data[:written]) 
     self.write_fp += len(data) 
     self.available += len(data) 
     if written < len(data): 
      self.write_fp -= self.size 
      self.buf.seek(0) 
      self.buf.write(data[written:]) 
     self.buf.seek(read_fp) 

Respuesta original (sustituida por la uno arriba):

Puede usar un buffer y rastrear el índice de inicio (leer el puntero del archivo), compactándolo de vez en cuando cuando se pone demasiado l arge (esto debería producir un rendimiento amortizado bastante bueno).

Por ejemplo, envolver un objeto StringIO así:

from cStringIO import StringIO 
class FifoBuffer(object): 
    def __init__(self): 
     self.buf = StringIO() 

    def read(self, *args, **kwargs): 
     """Reads data from buffer""" 
     self.buf.read(*args, **kwargs) 

    def write(self, *args, **kwargs): 
     """Appends data to buffer""" 
     current_read_fp = self.buf.tell() 
     if current_read_fp > 10 * 1024 * 1024: 
      # Buffer is holding 10MB of used data, time to compact 
      new_buf = StringIO() 
      new_buf.write(self.buf.read()) 
      self.buf = new_buf 
      current_read_fp = 0 

     self.buf.seek(0, 2) # Seek to end 
     self.buf.write(*args, **kwargs) 

     self.buf.seek(current_read_fp) 
+3

+1 Esto es increíble. Gracias por la implementación completa. –

+0

@Roger: no hay problema. Pensé que podría ser útil algún día ;-) – Cameron

+0

Solo por curiosidad, ¿es más rápido? –

Cuestiones relacionadas