2012-09-13 22 views
12

Lo que me gustaría hacer es crear una especie de "tubería" (como una tubería entre procesos), pero entre las corrientes ios dentro del mismo programa. Tengo una función que requiere un flujo de entrada como argumento, pero mis datos provienen de un flujo de salida. Entonces, ¿hay una forma estándar de canalizar la salida de un std::ostream en la entrada de un std::istream?C++ conecta la secuencia de salida a la corriente de entrada

+2

¿El std :: stringstream se adapta a sus necesidades? Si no, explica por qué. – AProgrammer

+1

Hay un iostream (observe que tiene un 'i' y un' o' al principio). Usted bombea datos en un extremo y sale por ahí. Es eso lo que quieres. –

+0

-1 pregunta no especificada. –

Respuesta

13

Puede crear un std::streambuf donde la salida va a un búfer y std::overflow() bloques cuando el búfer está lleno. En el otro extremo, tendría un búfer de entrada que bloquea en underflow() cuando el búfer se vacía. Obviamente, leer y escribir estaría en dos hilos diferentes.

El asunto complicado es cómo sincronizar los dos almacenamientos intermedios: las secuencias no usan ninguna operación de sincronización mientras acceden a los almacenamientos intermedios. Solo cuando se llama a cualquiera de las funciones virtuales, puede interceptar la operación y manejar la sincronización. Por otro lado, no usar un buffer es bastante ineficiente. La forma en que abordaría este problema es mediante el uso de un buffer de salida relativamente pequeño (por ejemplo 256 char s) y también anula sync() para usar esta función para la transferencia de caracteres al buffer de entrada. El streambuf usaría un mutex para la sincronización y una variable de condición para bloquear en un búfer de entrada completo en la salida y un búfer de entrada vacío en la entrada. Para admitir un apagado limpio, también debe haber una función que establezca un indicador de que no ingresará más entrada y que todas las demás operaciones de salida fallarán.

La creación de la implementación real revela que dos búferes no son suficientes: los subprocesos que acceden a la entrada y el búfer de salida pueden estar activos cuando los otros bloques de búfer respectivos. Por lo tanto, se necesita un tercer buffer intermedio. Con este pequeño cambio al plan anterior, a continuación se muestra un código (utiliza pequeños buffers para asegurarse de que haya desbordamientos y subdesbordamientos reales, para un uso real, al menos el búfer de entrada probablemente sea más grande).

// threadbuf.cpp              -*-C++-*- 
// ---------------------------------------------------------------------------- 
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de   
//                  
// Permission is hereby granted, free of charge, to any person   
// obtaining a copy of this software and associated documentation  
// files (the "Software"), to deal in the Software without restriction, 
// including without limitation the rights to use, copy, modify,   
// merge, publish, distribute, sublicense, and/or sell copies of   
// the Software, and to permit persons to whom the Software is   
// furnished to do so, subject to the following conditions:    
//                  
// The above copyright notice and this permission notice shall be  
// included in all copies or substantial portions of the Software.  
//                  
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,  
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES  
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND    
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT   
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,   
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR   
// OTHER DEALINGS IN THE SOFTWARE. 
// ---------------------------------------------------------------------------- 


#include <algorithm> 
#include <condition_variable> 
#include <iostream> 
#include <mutex> 
#include <stdexcept> 
#include <streambuf> 
#include <string> 
#include <thread> 

// ---------------------------------------------------------------------------- 

class threadbuf 
    : public std::streambuf 
{ 
private: 
    typedef std::streambuf::traits_type traits_type; 
    typedef std::string::size_type  string_size_t; 

    std::mutex    d_mutex; 
    std::condition_variable d_condition; 
    std::string    d_out; 
    std::string    d_in; 
    std::string    d_tmp; 
    char*     d_current; 
    bool     d_closed; 

public: 
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64) 
     : d_out(std::max(string_size_t(1), out_size), ' ') 
     , d_in(std::max(string_size_t(1), in_size), ' ') 
     , d_tmp(std::max(string_size_t(1), in_size), ' ') 
     , d_current(&this->d_tmp[0]) 
     , d_closed(false) 
    { 
     this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1); 
     this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]); 
    } 
    void close() 
    { 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      this->d_closed = true; 
      while (this->pbase() != this->pptr()) { 
       this->internal_sync(lock); 
      } 
     } 
     this->d_condition.notify_all(); 
    } 

private: 
    int_type underflow() 
    { 
     if (this->gptr() == this->egptr()) 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      while (&this->d_tmp[0] == this->d_current && !this->d_closed) { 
       this->d_condition.wait(lock); 
      } 
      if (&this->d_tmp[0] != this->d_current) { 
       std::streamsize size(this->d_current - &this->d_tmp[0]); 
       traits_type::copy(this->eback(), &this->d_tmp[0], 
            this->d_current - &this->d_tmp[0]); 
       this->setg(this->eback(), this->eback(), this->eback() + size); 
       this->d_current = &this->d_tmp[0]; 
       this->d_condition.notify_one(); 
      } 
     } 
     return this->gptr() == this->egptr() 
      ? traits_type::eof() 
      : traits_type::to_int_type(*this->gptr()); 
    } 
    int_type overflow(int_type c) 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     if (!traits_type::eq_int_type(c, traits_type::eof())) { 
      *this->pptr() = traits_type::to_char_type(c); 
      this->pbump(1); 
     } 
     return this->internal_sync(lock) 
      ? traits_type::eof() 
      : traits_type::not_eof(c); 
    } 
    int sync() 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     return this->internal_sync(lock); 
    } 
    int internal_sync(std::unique_lock<std::mutex>& lock) 
    { 
     char* end(&this->d_tmp[0] + this->d_tmp.size()); 
     while (this->d_current == end && !this->d_closed) { 
      this->d_condition.wait(lock); 
     } 
     if (this->d_current != end) 
     { 
      std::streamsize size(std::min(end - d_current, 
              this->pptr() - this->pbase())); 
      traits_type::copy(d_current, this->pbase(), size); 
      this->d_current += size; 
      std::streamsize remain((this->pptr() - this->pbase()) - size); 
      traits_type::move(this->pbase(), this->pptr(), remain); 
      this->setp(this->pbase(), this->epptr()); 
      this->pbump(remain); 
      this->d_condition.notify_one(); 
      return 0; 
     } 
     return traits_type::eof(); 
    } 
}; 

// ---------------------------------------------------------------------------- 

static void writer(std::ostream& out) 
{ 
    for (std::string line; std::getline(std::cin, line);) 
    { 
     out << "writer: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

static void reader(std::istream& in) 
{ 
    for (std::string line; std::getline(in, line);) 
    { 
     std::cout << "reader: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

int main() 
{ 
    try 
    { 
     threadbuf sbuf; 
     std::ostream out(&sbuf); 
     std::istream in(&sbuf); 

     std::thread write(&::writer, std::ref(out)); 
     std::thread read(&::reader, std::ref(in)); 

     write.join(); 
     sbuf.close(); 
     read.join(); 
    } 
    catch (std::exception const& ex) 
    { 
     std::cerr << "ERROR: " << ex.what() << "\n"; 
    } 
} 
+1

+1 por el esfuerzo; el OP ciertamente buscaba una solución mucho más rápida y fácil. – Walter

+0

Bueno, usar el código de arriba ** es ** rápido para snybody pero yo :) He visto solicitudes similares antes, es decir, puede ser útil para otros también. ... y fue un ejercicio interesante implementar realmente lo que describí antes. Finalmente: ¡no estoy al tanto de una solución más fácil! –

+0

Usted es una leyenda de flirteo Dietmar. Estoy usando esto en una prueba unitaria y funciona de maravilla. Gracias. – MattSmith

Cuestiones relacionadas