2012-08-29 11 views
9

Estoy usando una aplicación Clojure para acceder a los datos de una API web. Voy a hacer muchas solicitudes, y muchas de las solicitudes darán lugar a que se realicen más solicitudes, por lo que quiero mantener las URL de solicitud en una cola que dejará 60 segundos entre descargas posteriores.Colas de trabajo en Clojure

Siguiendo this blog post puse esto juntos:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

Si incluyo un (future (process-queue-item (take! my-queue))) en mi código en algún lugar entonces en el REPL que pueda (offer! my-queue "something") y veo el ">> algo" impreso inmediatamente. ¡Hasta aquí todo bien! Pero necesito que la cola dure todo el tiempo que mi programa esté activo. La llamada (future ...) que acabo de mencionar funciona para sacar un elemento de la cola, una vez que esté disponible, pero quiero algo que mire continuamente la cola y llame al process-queue-item siempre que haya algo disponible.

Además, contrariamente al amor habitual de Clojure por la concurrencia, quiero asegurarme de que solo se realice una solicitud a la vez y que mi programa espere 60 segundos para realizar cada solicitud posterior.

Creo que this Stack Overflow question es relevante, pero no estoy seguro de cómo adaptarlo para hacer lo que quiero. ¿Cómo consulto continuamente mi cola y me aseguro de que solo se ejecute una solicitud a la vez?

+0

¿Por qué quiere sondear continuamente pero solo envía cada 60 segundos? ¿Las encuestas solo una vez cada 60 segundos lograrían lo mismo? – mamboking

+0

@maboking Casi, sí. El único inconveniente de ese enfoque sería agregar el primer elemento a la cola: si al programa le toma cinco segundos descubrir cuál será la primera URL de solicitud, se quedará allí durante 55 segundos hasta que se compruebe la cola. El programa será bastante largo de todos modos, así que supongo que no es un problema. – bdesham

+0

¿está evitando un programador de tareas? Por ejemplo, este, https://github.com/zcaudate/cronj (también hay una lista de otras bibliotecas en el archivo Léame de ese repositorio) – georgek

Respuesta

0

Terminé de rodar mi propia pequeña biblioteca, que llamé simple-queue. Puede leer la documentación completa en GitHub, pero aquí está la fuente en su totalidad. No voy a mantener esta respuesta actualizada, así que si desea utilizar esta biblioteca, obtenga la fuente de GitHub.

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

Un ejemplo del uso de esta cola para devolver valores:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

Las llamadas a q/process se bloquean de modo que habrá un retraso de 30 segundos entre las dos def declaraciones.

Un ejemplo del uso de esta cola exclusivamente para efectos secundarios:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

Ahora las llamadas a q/add retorno inmediato.

2

Aquí hay un fragmento de código del a project I did for fun. No es perfecto, pero puedo darte una idea de cómo resolví el problema de "espera 55 segundos para el primer elemento". Básicamente pasa por promesas, usando futuros para procesar las cosas de inmediato o hasta que una promesa "esté disponible".

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

Tal vez podría hacer algo similar? El código no es genial, y probablemente podría volver a escribirse para usar lazy-seq, ¡pero eso es solo un ejercicio que todavía no he hecho!

0

Este es posiblemente loco, pero siempre se puede utilizar una función como esta para crear una secuencia perezosa ralentizado:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

Básicamente, esto asegurará un retraso entre cada una de las invocaciones de funciones.

Se puede utilizar con algo como lo siguiente, proporcionar un retardo en milisegundos:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

O, alternativamente, usted puede poner su llamada a la función dentro de la secuencia con algo como:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

Obviamente , en ambos casos, puede reemplazar (rand-int 10) con el código que esté utilizando para realizar/activar una descarga.

+0

Si estoy leyendo este derecho, todos los elementos de 'coll' tendrían que conocerse antes de ejecutar' slow-seq', ¿no? Me gustaría algo que le permita agregar elementos dinámicamente sin problemas. Específicamente, si el resultado de una llamada API es que necesito hacer otra llamada API, ¿permitirá esta función que esa segunda llamada se coloque en la cola? – bdesham