2012-02-19 15 views
6

El siguiente código añade tareas que realizan algún tipo de procesamiento de archivos del almacén de blob, se ejecuta en un B2 backend por lo que no tiene límite de tiempo de espera:appengine, python: ¿Hay una pérdida de memoria en taskqueue.add()?

for task in tasks: 
    tools.debug("add_tasks_to_process_files", "adding_task") 

    taskqueue.add(\ 
      name=("Process_%s_files---%s--%s--%s--%s" % \ 
         (len(tasks[task]), task[1], task[0], task[2], int(time.time()))),\ 
      queue_name="files-processor",\ 
      url="/analytics/process_files/",\ 
      params={"processing_task": json.dumps({"profile": task, "blobs_to_process": tasks[task]})}) 

tareas es un diccionario de la siguiente forma:

{ 
    (x1,y1,z1): ["blob_key", "blob_key"... (limited to 35 keys)], 
    (x2,y2,z2): ["blob_key", "blob_key"...], 
    . 
    . 
    . 
} 

x1, y1, z1 son todas las cadenas

tools.debug es una función que escribí que envía mensajes a mi Sever local utilizando urlfetch (por lo que no tiene que esperar a 20m en ser capaz de leer los registros):

def debug(location, message, params=None, force=False): 
    if not (settings.REMOTE_DEBUG or settings.LOCALE_DEBUG or force): 
     return 

    if params is None: 
     params = {} 

    params["memory"] = runtime.memory_usage().current() 
    params["instance_id"] = settings.INSTANCE_ID 

    debug_message = "%s/%s?%s" % (urllib2.quote(location), urllib2.quote(message), "&".join(["%s=%s" % (p, urllib2.quote(unicode(params[p]).encode("utf-8"))) for p in params])) 

    if settings.REMOTE_DEBUG or force: 
     fetch("%s/%s" % (settings.REMOTE_DEBUGGER, debug_message)) 

    if settings.LOCALE_DEBUG or force: 
     logging.debug(debug_message) 

desde tools.debug no estaba en el código cuando por primera vez no, que estoy seguro que no es la causa de los problemas de memoria.

me dio este mensaje:

/add_tasks_to_process_files/ 500 98812ms 0kb instance=0 AppEngine-Google; (+http://code.google.com/appengine): 
    A serious problem was encountered with the process that handled this request, causing it to exit. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application. (Error code 201) 

Y justo después de que:

/_ah/stop 500 110ms 0kb 
Exceeded soft private memory limit with 283.406 MB after servicing 1 requests total 

de nuevo, lo he recibido por el código anterior sin la línea: tools.debug("add_tasks_to_process_files", "adding_task")

Ahora, déjame mostrarle lo que veo en mi depurador:

1 2012-1-19 14:41:38 [processors-backend] processors-backend-initiated instance_id: 1329662498, memory: 18.05078125, backend_instance_url: http://0.processors.razoss-dock-dev.appspot.com, backend_load_balancer_url: http://processors.razoss-dock-dev.appspot.com 
2 2012-1-19 14:41:39 [AddTasksToProcessFiles] start instance_id: 1329662498, files_sent_to_processing_already_in_previous_failed_attempts: 0, memory: 19.3828125 
3 2012-1-19 14:41:59 [AddTasksToProcessFiles] add_tasks_to_process_files-LOOP_END total_tasks_to_add: 9180, total_files_added_to_tasks: 9184, task_monitor.files_sent_to_processing: 0, total_files_on_tasks_dict: 9184, instance_id: 1329662498, memory: 56.52734375 
4 2012-1-19 14:42:0 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 57.81640625 
5 2012-1-19 14:42:0 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 57.81640625 
6 2012-1-19 14:42:1 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 57.9375 
7 2012-1-19 14:42:2 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 57.9375 
8 2012-1-19 14:42:2 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 58.03125 
. 
. 
. 
2183 2012-1-19 14:53:45 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 280.66015625 
2184 2012-1-19 14:53:45 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 280.66015625 
2185 2012-1-19 14:53:45 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 281.0 
2 186 2012-1-19 14:53:46 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 281.0 
2187 2012-1-19 14:53:46 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 281.0 
2188 2012-1-19 14:53:46 [add_tasks_to_process_files] adding_task instance_id: 1329662498, memory: 281.3828125 

rastro completo: http://pastebin.com/CcPDU6s7

¿Hay una pérdida de memoria en taskqueue.add()?

Gracias

+0

¿Cómo evalúa el uso de la memoria? – proppy

+0

¿Tiene un patrón de uso de memoria similar si elimina la llamada a blobstore api de sus tareas? – proppy

+0

Hola, proppy: He encontrado la causa: el appstats fue habilitado ... gracias de todos modos – theosp

Respuesta

3

Si bien esto no responde a tu pregunta en particular, ¿ha intentado Queue añadir tareas en lotes?

http://code.google.com/appengine/docs/python/taskqueue/queues.html#Queue_add

Puede añadir hasta 100 tareas a la vez.

http://code.google.com/appengine/docs/python/taskqueue/overview-push.html#Quotas_and_Limits_for_Push_Queues

código no probado.

queue = taskqueue.Queue(name="files-processor") 
while tasks: 
    queue.add(taskqueue.Task(...) for k,v in (tasks.popitem() for _ in range(min(len(tasks),100)))) 

Si aún desea utilizar tasks otra parte habría que cambiar esta construcción ligeramente (o hacer una copia).

+0

¡No lo hice, pero definitivamente lo haré! gracias – theosp

+0

Reemplazar el código original con esto nos solucionó el problema (ahora funciona para la carga máxima que queremos admitir). Puede ser debido a menos llamadas a queue.add. No intenté ver si el envío de x100 tareas más causaría el problema original: falta de tiempo, y no planificamos nuestras cargas para producir esta cantidad de tareas ... ¡muchas gracias! – theosp

+0

Y, una pregunta: ¿hay alguna razón por la que decidió usar range() en lugar de xrange()? – theosp