2010-06-25 24 views
32

Quizás esta es una pregunta tonta, pero parece que no puedo encontrar una respuesta obvia.Concurrent Set Queue

Necesito una cola FIFO concurrente que contenga solo valores únicos. Intentar agregar un valor que ya existe en la cola simplemente ignora ese valor. Lo cual, si no fuera por la seguridad del hilo, sería trivial. ¿Hay una estructura de datos en Java o tal vez un snipit de código en las interwebs que exhibe este comportamiento?

+0

Desafortunadamente el término "cola" es ambigua, ya que a algunos lectores que implícitamente significa "cola FIFO", mientras que a los demás tiene el significado más general de 'java.util.Queue', que básicamente significa cualquier colección que tenga * algún * concepto de un" elemento principal ", ya sea que ese elemento sea el primero en entrar o no. ¡Asi que! ¿Cuál es? –

+0

FIFO, lo siento por la omisión =) –

Respuesta

5

Si desea una mejor concurrencia que la sincronización completa, hay una manera que conozco de hacerlo, utilizando un ConcurrentHashMap como mapa de respaldo. El siguiente es un boceto solamente.

public final class ConcurrentHashSet<E> extends ForwardingSet<E> 
    implements Set<E>, Queue<E> { 
    private enum Dummy { VALUE } 

    private final ConcurrentMap<E, Dummy> map; 

    ConcurrentHashSet(ConcurrentMap<E, Dummy> map) { 
    super(map.keySet()); 
    this.map = Preconditions.checkNotNull(map); 
    } 

    @Override public boolean add(E element) { 
    return map.put(element, Dummy.VALUE) == null; 
    } 

    @Override public boolean addAll(Collection<? extends E> newElements) { 
    // just the standard implementation 
    boolean modified = false; 
    for (E element : newElements) { 
     modified |= add(element); 
    } 
    return modified; 
    } 

    @Override public boolean offer(E element) { 
    return add(element); 
    } 

    @Override public E remove() { 
    E polled = poll(); 
    if (polled == null) { 
     throw new NoSuchElementException(); 
    } 
    return polled; 
    } 

    @Override public E poll() { 
    for (E element : this) { 
     // Not convinced that removing via iterator is viable (check this?) 
     if (map.remove(element) != null) { 
     return element; 
     } 
    } 
    return null; 
    } 

    @Override public E element() { 
    return iterator().next(); 
    } 

    @Override public E peek() { 
    Iterator<E> iterator = iterator(); 
    return iterator.hasNext() ? iterator.next() : null; 
    } 
} 

Todo no es sol con este enfoque. No tenemos una forma decente de seleccionar un elemento principal que no sea el entrySet().iterator().next() del mapa de respaldo, y el resultado es que el mapa se desequilibra cada vez más a medida que pasa el tiempo. Este desequilibrio es un problema tanto debido a mayores colisiones de cubeta como a una mayor contención de segmentos.

Nota: este código usa Guava en algunos lugares.

+3

¿Cómo preserva esto el orden de 'Queue'? El orden de iteración depende de la implementación del mapa de respaldo. – erickson

+1

¿Conservas qué orden? No veo ningún requisito sobre el pedido, a menos que se suponga que quiere una cola * FIFO * ... He agregado un comentario para preguntar. –

+0

@NitsanWakart La pregunta que estaba respondiendo era cómo obtener una 'Cola '. –

5

No hay una colección incorporada que hace esto. Existen algunas implementaciones concurrentes de Set que se pueden usar junto con un Queue concurrente.

Por ejemplo, un elemento se agrega a la cola solo después de que se haya agregado correctamente al conjunto, y cada elemento eliminado de la cola se elimina del conjunto. En este caso, el contenido de la cola, lógicamente, es realmente lo que haya en el conjunto, y la cola solo se utiliza para rastrear el pedido y proporcionar operaciones eficientes take() y poll() que se encuentran únicamente en un BlockingQueue.

+1

Una de mis implementaciones utilizado un LinkedHashSet por lo que sólo tenía una estructura de datos y podría depender del orden. Sin embargo, el algoritmo detrás de ConcurrentQueue es bastante más sofisticado que el uso de bloqueos de sincronización y me preguntaba si había una colección de rendimiento que tuviera la restricción de exclusividad adicional. –

+0

@Ambience - No hay tal colección. Mi técnica permite el uso de un concurrente 'Queue' (ya sea' LinkedBlockingQueue' si necesita tomar '()' 'ConcurrentLinkedQueue' o si sólo necesita' poll() '), preservando pedido FIFO, al tiempo que añade' Set' parecido unicidad. – erickson

4

A java.util.concurrent.ConcurrentLinkedQueue te lleva la mayor parte del camino hasta allí.

Envuelva el ConcurrentLinkedQueue con su propia clase que comprueba la exclusividad de un complemento. Tu código debe ser seguro para subprocesos.

+1

Una vez envuelto, que podría no ser seguro para subprocesos más, incluso si se basa en una 'ConcurrentLinkedQueue'. – finnw

+0

@finnw: debidamente anotado. Actualicé mi respuesta. –

+1

La implementación de mi primer pase hace exactamente esto, pero me preocupa el costo de invocar .contains() en una cola respaldada por una lista vinculada y también la sincronización de los métodos de cola niega completamente los beneficios del algoritmo subyacente de ConcurrentLinkdQueue. –

4

Usaría un LinkedHashSet sincronizado hasta que haya suficiente justificación para considerar alternativas. El principal beneficio que una solución más concurrente podría ofrecer es la división de bloqueo.

El enfoque simultáneo más simple sería a ConcurrentHashMap (que actúa como un conjunto) y ConcurrentLinkedQueue. El orden de las operaciones proporcionaría la restricción deseada. Una oferta() primero realizaría un CHM # putIfAbsent() y si se inserta satisfactoriamente en el CLQ. Una encuesta() tomaría de la CLQ y luego la eliminaría del CHM. Esto significa que consideramos una entrada en nuestra cola si está en el mapa y la CLQ proporciona el orden. El rendimiento podría ajustarse aumentando el nivel de concurrencia del mapa. Si eres tolerante con la habilidad adicional, entonces un CHM # get() barato podría actuar como una precondición razonable (pero puede sufrir por ser una vista un poco obsoleta).

2

¿Qué quiere decir con una cola simultánea con Set semántica? Si te refieres a una estructura verdaderamente concurrente (en oposición a una estructura segura para subprocesos) entonces yo diría que estás pidiendo un poni.

¿Qué sucede, por ejemplo, si llama al put(element) y detecta que algo ya está allí y se elimina inmediatamente? Por ejemplo, ¿qué significa en su caso si offer(element) || queue.contains(element) devuelve false?

Este tipo de cosas a menudo necesitan pensado de forma ligeramente diferente en un mundo concurrente con la frecuencia nada es lo que parece a menos que deje el mundo (bloqueo hacia abajo). De lo contrario, generalmente estás mirando algo en el pasado. Entonces, ¿qué estás tratando de hacer?

+1

Observación interesante, si no es una respuesta =) ¿Está por debajo del umbral del punto de comentario? –

+0

Yo estaba :-) Perdón por eso. –

0

Quizás extender ArrayBlockingQueue. Para poder acceder al bloqueo (de acceso del paquete), tuve que poner mi subclase dentro del mismo paquete. Advertencia: no he probado esto.

package java.util.concurrent; 

import java.util.Collection; 
import java.util.concurrent.locks.ReentrantLock; 

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> { 

    public DeDupingBlockingQueue(int capacity) { 
     super(capacity); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair) { 
     super(capacity, fair); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 
     super(capacity, fair, c); 
    } 

    @Override 
    public boolean add(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return false; 
      return super.add(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public void put(E e) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lockInterruptibly(); //Should this be lock.lock() instead? 
     try { 
      if (contains(e)) return; 
      super.put(e); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 
0

Una respuesta simple para una cola de objetos únicos pueden ser los siguientes:

import java.util.concurrent.ConcurrentLinkedQueue; 

public class FinalQueue { 

    class Bin { 
     private int a; 
     private int b; 

     public Bin(int a, int b) { 
      this.a = a; 
      this.b = b; 
     } 

     @Override 
     public int hashCode() { 
      return a * b; 
     } 

     public String toString() { 
      return a + ":" + b; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (this == obj) 
       return true; 
      if (obj == null) 
       return false; 
      if (getClass() != obj.getClass()) 
       return false; 
      Bin other = (Bin) obj; 
      if ((a != other.a) || (b != other.b)) 
       return false; 
      return true; 
     } 
    } 

    private ConcurrentLinkedQueue<Bin> queue; 

    public FinalQueue() { 
     queue = new ConcurrentLinkedQueue<Bin>(); 
    } 

    public synchronized void enqueue(Bin ipAddress) { 
     if (!queue.contains(ipAddress)) 
      queue.add(ipAddress); 
    } 

    public Bin dequeue() { 
     return queue.poll(); 
    } 

    public String toString() { 
     return "" + queue; 
    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     FinalQueue queue = new FinalQueue(); 
     Bin a = queue.new Bin(2,6); 

     queue.enqueue(a); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(14, 3)); 
     queue.enqueue(queue.new Bin(13, 9)); 
     queue.enqueue(queue.new Bin(18, 3)); 
     queue.enqueue(queue.new Bin(14, 7)); 
     Bin x= queue.dequeue(); 
     System.out.println(x.a); 
     System.out.println(queue.toString()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println(queue.toString()); 
    } 
}