2012-08-28 14 views
6

Implementé un grupo de subprocesos con boost::asio y algunos números boost::thread que llaman a boost::asio::io_service::run(). Sin embargo, un requisito que me han dado es tener una forma de supervisar todos los hilos para "salud". Mi intención es hacer un simple objeto centinela que se pueda pasar a través del grupo de subprocesos; si se completa, podemos suponer que el subproceso aún está procesando el trabajo.boost :: asio, grupos de subprocesos y supervisión de subprocesos

Sin embargo, dada mi implementación, no estoy seguro de cómo (si) puedo controlar todos los subprocesos en el grupo de manera confiable. Simplemente he delegado la función de subproceso en boost::asio::io_service::run(), por lo que publicar un objeto centinela en la instancia io_service no garantiza qué subproceso realmente enviará centinela y hará el trabajo.

Una opción puede ser simplemente insertar periódicamente el centinela, y esperar que sea recogido por cada hilo al menos una vez en un período de tiempo razonable, pero obviamente no es ideal.

Tome el siguiente ejemplo. Debido a la forma en que el manejador está codificado, en este caso podemos ver que cada hilo hará la misma cantidad de trabajo, pero en realidad no tendré control de la implementación del manejador, algunos pueden durar mucho tiempo mientras que otros serán casi inmediato.

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

Respuesta

2

La solución que he usado se basa en el hecho de que soy dueño de la implementación de los objetos de la piscina banda de rodadura. Creé un tipo de contenedor que actualizará las estadísticas y copiará los controladores definidos por el usuario que se publican en el grupo de subprocesos. Solo este tipo de envoltorio se publica en el subyacente io_service. Este método me permite hacer un seguimiento de los controladores que se publican/ejecutan, sin tener que ser intrusivo en el código de usuario.

Aquí hay una simplificada y simplificada ejemplo:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

Salida:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

puede agregar el código a su respuesta @Chad? –

+0

Hecho. Feliz por cualquier comentario sobre él. – Chad

6

Usted puede utilizar una instancia io_service común entre todos los hilos y una instancia io_service privada para cada hilo. Cada hilo ejecutará un método como este:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

De esta manera, si usted quiere asegurarse de que alguna tarea se ejecuta en un hilo, es suficiente para publicar esta tarea en su io_service propiedad.

para publicar un trabajo en su grupo de subprocesos que puede hacer:

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

Un enfoque interesante, pero estoy buscando algo un poco más sencillo. Si no aparece nada más en los próximos días, puedo aceptar esta respuesta. – Chad

+0

Creo que no hay una solución más simple usando boot asio. Desarrollé una solución como esta con pocos códigos y funciona. –

Cuestiones relacionadas