2011-05-11 19 views
6

Estoy implementando una capa orientada a eventos sobre los Sockets de Java, y me preguntaba si había una manera de determinar si hay datos pendientes de leer.Redes de Java: evented Socket/InputStream

Mi enfoque normal sería leer desde el socket en un búfer, y llamar a las devoluciones proporcionadas cuando el búfer se llena en una cantidad determinada de bytes (que podría ser 0, si la devolución de llamada debe activarse cada vez que llega), pero sospecho que Java ya está haciendo el buffer para mí.

¿Es confiable el método available() de InputStream para esto? ¿Debo simplemente read() y hacer mi propio búfer en la parte superior del zócalo? ¿O hay otra manera?

Respuesta

8

En pocas palabras, no. available() no es confiable (al menos no fue para mí). Recomiendo usar java.nio.channels.SocketChannel conectado con Selector y SelectionKey. Esta solución se basa algo en los eventos, pero es más complicada que los simples.

Para los clientes:

  1. canal de toma Construct (socket), abrir un selector (selector = Selector.open();).
  2. Uso de no bloqueo socket.configureBlocking(false);
  3. selector de Registro para las conexiones socket.register(selector, SelectionKey.OP_CONNECT);
  4. Conectar socket.connect(new InetSocketAddress(host, port));
  5. A ver si hay algo nuevo selector.select();
  6. Si el "nuevo" se refiere a una conexión con éxito, registrar el selector para OP_READ; si el "nuevo" se refiere a los datos disponibles, simplemente lea desde el socket.

Sin embargo, para que sea asíncrono, debe configurar un hilo separado (a pesar de que el socket se crea como no bloqueado, el hilo se bloqueará de todos modos) que verifica si algo ha llegado o no. Para los servidores, hay ServerSocketChannel y usa OP_ACCEPT para ello.

Como referencia, esto es mi código (cliente), que debería dar una pista:

private Thread readingThread = new ListeningThread(); 

/** 
    * Listening thread - reads messages in a separate thread so the application does not get blocked. 
    */ 
private class ListeningThread extends Thread { 
    public void run() { 
    running = true; 
    try { 
    while(!close) listen(); 
    messenger.close(); 
    } 
    catch(ConnectException ce) { 
    doNotifyConnectionFailed(ce); 
    } 
    catch(Exception e) { 
// e.printStackTrace(); 
    messenger.close(); 
    } 
    running = false; 
    } 
} 

/** 
    * Connects to host and port. 
    * @param host Host to connect to. 
    * @param port Port of the host machine to connect to. 
    */ 
public void connect(String host, int port) { 
    try { 
    SocketChannel socket = SocketChannel.open(); 
    socket.configureBlocking(false); 
    socket.register(this.selector, SelectionKey.OP_CONNECT); 
    socket.connect(new InetSocketAddress(host, port)); 
    } 
    catch(IOException e) { 
    this.doNotifyConnectionFailed(e); 
    } 
} 

/** 
    * Waits for an event to happen, processes it and then returns. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    iter.remove(); 
    // check validity 
    if(key.isValid()) { 
    // if connectable... 
    if(key.isConnectable()) { 
    // ...establish connection, make messenger, and notify everyone 
    SocketChannel client = (SocketChannel)key.channel(); 
    // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast 
    if(client!=null && client.finishConnect()) { 
     client.register(this.selector, SelectionKey.OP_READ); 
    } 
    } 
    // if readable, tell messenger to read bytes 
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { 
    // read message here 
    } 
    } 
    } 
} 

/** 
    * Starts the client. 
    */ 
public void start() { 
    // start a reading thread 
    if(!this.running) { 
    this.readingThread = new ListeningThread(); 
    this.readingThread.start(); 
    } 
} 

/** 
    * Tells the client to close at nearest possible moment. 
    */ 
public void close() { 
    this.close = true; 
} 

Y para el servidor:

/** 
    * Constructs a server. 
    * @param port Port to listen to. 
    * @param protocol Protocol of messages. 
    * @throws IOException when something goes wrong. 
    */ 
public ChannelMessageServer(int port) throws IOException { 
    this.server = ServerSocketChannel.open(); 
    this.server.configureBlocking(false); 
    this.server.socket().bind(new InetSocketAddress(port)); 
    this.server.register(this.selector, SelectionKey.OP_ACCEPT); 
} 

/** 
    * Waits for event, then exits. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    // do something with the connected socket 
    iter.remove(); 
    if(key.isValid()) this.process(key); 
    } 
} 

/** 
    * Processes a selection key. 
    * @param key SelectionKey. 
    * @throws IOException when something is wrong. 
    */ 
protected void process(SelectionKey key) throws IOException { 
    // if incoming connection 
    if(key.isAcceptable()) { 
    // get client 
    SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); 
    try { 
    client.configureBlocking(false); 
    client.register(this.selector, SelectionKey.OP_READ); 
    } 
    catch(Exception e) { 
    // catch 
    } 
    } 
    // if readable, tell messenger to read 
    else if(key.isReadable()) { 
    // read 
    } 
} 

Espero que esto ayude.

+0

No entiendo. No necesitas un hilo por separadoLos sockets que no bloquean no bloquean por definición. Simplemente use OP_READ correctamente, y un bucle de lectura correcto que se detiene cuando read arroja cero. – EJP

+0

@EJP: No estoy en desacuerdo; sin embargo, me pareció que, independientemente del bloqueo, la lectura del zócalo seguía bloqueada, incluso si no había nada que leer. Podría ser que hice algo mal, sin embargo. Sugiero al asker que intente lo que dices, y en caso de que no funcione, prueba los hilos. – Sorrow

+0

Lo que casi con seguridad hizo fue un ciclo while read() devolvió cero. Es por eso que lo mencioné. Eso no es bloqueo, eso es bucle. – EJP

0

disponible() solo le dirá si puede leer datos sin tener que ir al SO. No es muy útil aquí.

Puede realizar una lectura de bloqueo o no bloqueante como prefiera. Una lectura sin bloqueo solo regresa cuando no hay datos para leer, así puede ser lo que desee.

+2

Incorrecto. available() le dirá la suma de los datos en el BufferedInputStream/BufferedReader, si está utilizando uno, y el búfer de recepción del zócalo, que es una estructura de datos del kernel. Si los datos están solo en el búfer de recepción del zócalo, usted tendría que 'ir al SO' para obtenerlo, pero no * bloqueará * en el proceso. Como dice el Javadoc. Sin embargo, si es, por ejemplo, un SSLSocket, disponible() siempre devuelve cero. – EJP