2011-09-12 12 views
6

Necesito un generador de bytes que genere valores desde Byte.MIN_VALUE a Byte.MAX_VALUE. Cuando alcanza MAX_VALUE, debería comenzar de nuevo desde MIN_VALUE.¿Qué hay de malo con este generador de secuencia de bytes seguro para subprocesos?

He escrito el código usando AtomicInteger (ver a continuación); sin embargo, el código no parece comportarse correctamente si se accede simultáneamente y si se hace artificialmente lento con Thread.sleep() (si no duerme, funciona bien, sin embargo, sospecho que es demasiado rápido para que aparezcan los problemas de concurrencia).

El código (con algo de código de depuración añadido):

public class ByteGenerator { 

    private static final int INITIAL_VALUE = Byte.MIN_VALUE-1; 

    private AtomicInteger counter = new AtomicInteger(INITIAL_VALUE); 
    private AtomicInteger resetCounter = new AtomicInteger(0); 

    private boolean isSlow = false; 
    private long startTime; 

    public byte nextValue() { 
     int next = counter.incrementAndGet(); 
     //if (isSlow) slowDown(5); 
     if (next > Byte.MAX_VALUE) { 
      synchronized(counter) { 
       int i = counter.get(); 
       //if value is still larger than max byte value, we reset it 
       if (i > Byte.MAX_VALUE) { 
        counter.set(INITIAL_VALUE); 
        resetCounter.incrementAndGet(); 
        if (isSlow) slowDownAndLog(10, "resetting"); 
       } else { 
        if (isSlow) slowDownAndLog(1, "missed"); 
       } 
       next = counter.incrementAndGet(); 
      } 
     } 
     return (byte) next; 
    } 

    private void slowDown(long millis) { 
     try { 
      Thread.sleep(millis); 
     } catch (InterruptedException e) { 
     } 
    } 
    private void slowDownAndLog(long millis, String msg) { 
     slowDown(millis); 
     System.out.println(resetCounter + " " 
          + (System.currentTimeMillis()-startTime) + " " 
          + Thread.currentThread().getName() + ": " + msg); 
    } 

    public void setSlow(boolean isSlow) { 
     this.isSlow = isSlow; 
    } 
    public void setStartTime(long startTime) { 
     this.startTime = startTime; 
    } 

} 

Y, la prueba:

public class ByteGeneratorTest { 

    @Test 
    public void testGenerate() throws Exception { 
     ByteGenerator g = new ByteGenerator(); 
     for (int n = 0; n < 10; n++) { 
      for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
       assertEquals(i, g.nextValue()); 
      } 
     } 
    } 

    @Test 
    public void testGenerateMultiThreaded() throws Exception { 
     final ByteGenerator g = new ByteGenerator(); 
     g.setSlow(true); 
     final AtomicInteger[] counters = new AtomicInteger[Byte.MAX_VALUE-Byte.MIN_VALUE+1]; 
     for (int i = 0; i < counters.length; i++) { 
      counters[i] = new AtomicInteger(0); 
     } 
     Thread[] threads = new Thread[100]; 
     final CountDownLatch latch = new CountDownLatch(threads.length); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i] = new Thread(new Runnable() { 
       public void run() { 
        try { 
         for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
          byte value = g.nextValue(); 
          counters[value-Byte.MIN_VALUE].incrementAndGet(); 
         } 
        } finally { 
         latch.countDown(); 
        } 
       } 
      }, "generator-client-" + i); 
      threads[i].setDaemon(true); 
     } 
     g.setStartTime(System.currentTimeMillis()); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i].start(); 
     } 
     latch.await(); 
     for (int i = 0; i < counters.length; i++) { 
      System.out.println("value #" + (i+Byte.MIN_VALUE) + ": " + counters[i].get()); 
     } 
     //print out the number of hits for each value 
     for (int i = 0; i < counters.length; i++) { 
      assertEquals("value #" + (i+Byte.MIN_VALUE), threads.length, counters[i].get()); 
     } 
    } 

} 

El resultado en mi máquina de 2 hilos es que el valor # -128 obtiene 146 golpes (todos ellos deberían obtener 100 hits por igual ya que tenemos 100 hilos).

Si alguien tiene alguna idea, ¿qué hay de malo en este código? Soy todo oídos/ojos.

ACTUALIZACIÓN: para los que tienen prisa y no quieren que desplazarse hacia abajo, la forma correcta (y la más corta y la más elegante) para resolver esto en Java sería así:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

Gracias, Heinz!

Respuesta

5

Usted toma la decisión de incrementarAndGet() en función de un valor antiguo de counter.get(). El valor del contador puede alcanzar MAX_VALUE nuevamente antes de realizar la operación incrementAndGet() en el contador.

if (next > Byte.MAX_VALUE) { 
    synchronized(counter) { 
     int i = counter.get(); //here You make sure the the counter is not over the MAX_VALUE 
     if (i > Byte.MAX_VALUE) { 
      counter.set(INITIAL_VALUE); 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      if (isSlow) slowDownAndLog(1, "missed"); //the counter can reach MAX_VALUE again if you wait here long enough 
     } 
     next = counter.incrementAndGet(); //here you increment on return the counter that can reach >MAX_VALUE in the meantime 
    } 
} 

Para que funcione hay que asegurarse de que no se tomen decisiones sobre la información obsoleta. Restablezca el contador o devuelva el valor anterior.

public byte nextValue() { 
    int next = counter.incrementAndGet(); 

    if (next > Byte.MAX_VALUE) { 
     synchronized(counter) { 
      next = counter.incrementAndGet(); 
      //if value is still larger than max byte value, we reset it 
      if (next > Byte.MAX_VALUE) { 
       counter.set(INITIAL_VALUE + 1); 
       next = INITIAL_VALUE + 1; 
       resetCounter.incrementAndGet(); 
       if (isSlow) slowDownAndLog(10, "resetting"); 
      } else { 
       if (isSlow) slowDownAndLog(1, "missed"); 
      } 
     } 
    } 

    return (byte) next; 
} 
+0

¿Por qué tiene counter.incrementAndGet() en el bloque de sincronización, que es como un incremento múltiple para todo el hilo que entra en esa sección y luego se restablece a "INITIAL_VALUE + 1" –

3

Su bloque sincronizado contiene solo el cuerpo if. Debería envolver todo el método, incluida la declaración if. O simplemente haga que su método nextValue esté sincronizado. Por cierto, en este caso no necesitas las variables atómicas en absoluto.

Espero que esto funcione para usted. Intente utilizar las variables atómicas solo si realmente necesita el código de mayor rendimiento, es decir, la declaración synchronized le molesta. En mi humilde opinión, en la mayoría de los casos, no es así.

+0

Su respuesta no explica por qué el enfoque actual no funciona. Según tengo entendido, debería funcionar: usa una variable atómica cuando no está sincronizada y se sincroniza cuando se realizan operaciones no atómicas. Entonces, ¿por qué no funciona? –

2

Si te entiendo correctamente, te importa que los resultados de nextValue estén en el rango de Byte.MIN_VALUE y Byte.MAX_VALUE y no te importa el valor almacenado en el contador. Entonces puede asignar números enteros en bytes de tal manera que se requiere un comportamiento de enumeración se expone:

private static final int VALUE_RANGE = Byte.MAX_VALUE - Byte.MIN_VALUE + 1; 
private final AtomicInteger counter = new AtomicInteger(0); 

public byte nextValue() { 
    return (byte) (counter.incrementAndGet() % VALUE_RANGE + Byte.MIN_VALUE - 1); 
} 

cuidado, este es el código no probado. Pero la idea debería funcionar.

+0

Si bien esta es una idea muy clara (¿cómo es que no pensé en ello :-P) y realmente funciona? Todavía no explica por qué mi solución original no funciona. –

+0

Bueno, en su solución original es la siguiente falla. Es posible que ocurra un cambio de contexto entre el primer bloque 'incrementAndGet()' y 'synchronized'. Es decir. dos hilos obtienen un valor 'next 'que está más allá de Byte.MAX_VALUE. Uno de esos hilos restablecerá el contador, el otro devolverá un valor demasiado grande. – jmg

+0

No, si ambos obtienen un valor demasiado grande, el primero restablecerá el contador y el siguiente obtendrá un nuevo valor del contador (ya reiniciado). –

1

He codificado la siguiente versión de nextValue usando compareAndSet que está diseñado para ser utilizado en un bloque no sincronizado. Pasó las pruebas unitarias:

Ah, e introduje nuevas constantes para MIN_VALUE y MAX_VALUE, pero puede ignorarlas si lo prefiere.

static final int LOWEST_VALUE = Byte.MIN_VALUE; 
static final int HIGHEST_VALUE = Byte.MAX_VALUE; 

private AtomicInteger counter = new AtomicInteger(LOWEST_VALUE - 1); 
private AtomicInteger resetCounter = new AtomicInteger(0); 

public byte nextValue() { 
    int oldValue; 
    int newValue; 

    do { 
     oldValue = counter.get(); 
     if (oldValue >= HIGHEST_VALUE) { 
      newValue = LOWEST_VALUE; 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      newValue = oldValue + 1;  
      if (isSlow) slowDownAndLog(1, "missed"); 
     } 
    } while (!counter.compareAndSet(oldValue, newValue)); 
    return (byte) newValue; 
} 

compareAndSet() trabaja en conjunto con get() para gestionar la concurrencia.

Al comienzo de su sección crítica, realiza un get() para recuperar el valor anterior. A continuación, realiza alguna función que depende solo del valor anterior para calcular un nuevo valor. Luego usa compareAndSet() para establecer el nuevo valor.Si el AtomicInteger ya no es igual al valor anterior en el momento en que se ejecuta compareAndSet() (debido a la actividad concurrente), falla y debe comenzar de nuevo.

Si tiene una cantidad extrema de simultaneidad y el tiempo de cálculo es largo, es posible que el compareAndSet() falle varias veces antes de que tenga éxito y puede valer la pena reunir estadísticas sobre eso si le preocupa.

No estoy sugiriendo que este sea un enfoque mejor o peor que un simple bloque sincronizado como otros han sugerido, pero yo personalmente probablemente usaría un bloque sincronizado para simplificar.

EDIT: Voy a responder a su pregunta real "¿Por qué no funciona la mía?"

su código tiene:

int next = counter.incrementAndGet(); 
    if (next > Byte.MAX_VALUE) { 

A medida que estas dos líneas no están protegidos por un bloque sincronizado, múltiples hilos pueden ejecutarlas simultáneamente y todos obtener valores de next>Byte.MAX_VALUE. Todos ellos pasarán al bloque sincronizado y establecerán counter en INITIAL_VALUE (uno tras otro mientras esperan el uno al otro).

A lo largo de los años, se ha escrito mucho sobre las dificultades de tratar de obtener un ajuste de rendimiento al no sincronizar cuando no parece necesario. Por ejemplo, consulte Double Checked Locking

+0

Sí, está mal, pero también creo que su explicación no es correcta. El segundo subproceso no volverá a restablecerse en el contador porque hay un 'int i = counter.get();' en el bloque sincronizado. –

+0

Si bien esto parece un bloqueo doblemente comprobado, no debería sufrir los mismos problemas, ya que AtomicInteger debería garantizar que todos los subprocesos vean el mismo valor de contador (sin almacenamiento en caché). ¿O hay algo más pasando? –

8

Inicialmente, Java almacenó todos los campos como valores de 4 u 8 bytes, incluso en corto y en byte. Las operaciones en los campos simplemente harían un poco de enmascaramiento para reducir los bytes. Así podríamos muy fácilmente hacer esto:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

pequeño rompecabezas de la diversión, gracias :-) Neeme

Cuestiones relacionadas