2012-02-29 17 views
29

Estoy tratando de explorar Protocol Buffer (PB) en la plataforma Linux y mi lenguaje de codificación es C++. Encontré ejemplos en los documentos en línea del búfer de protocolo pero nada específico para el envío y recepción de zócalos (O lo he omitido por completo :)). Así que decidí agregar el mensaje Longitud antes del mensaje real y enviarlo a través del socket. Apreciaría si alguien puede sugerir una solución mejor de la que planeo hacer y también hay algo preparado en PB para crear tales paquetes.Protocolo Buffer over socket en C++

Pero todavía termino con un problema en el lado del servidor donde tengo que descodificar el paquete. Decir si el cliente envía un paquete de 10 bytes en el que el primer byte 4 es la longitud del paquete; Pero es imposible saber la longitud antes de decodificar el paquete. Entonces, incluso si leo los primeros 4 bytes, ¿cómo deduzco el valor con medio paquete de lectura usando Protocolo Buffer?

Respuesta

25

Por fin pude hacerlo funcionar. Estoy publicando el código aquí para que uno pueda revisarlo y comentarlo, y si alguien quiere implementarlo en C++, este fragmento de código puede ayudar. Es un código en mal estado, mi intención era hacer que Protobuf funcionara de manera prefijada. Tomé el código del servidor del cliente de un sitio que no recuerdo y lo he modificado para adaptarlo a protobuf. Aquí el servidor primero se asoma al zócalo y obtiene la longitud del paquete total y luego lee el zócalo real para leer todo el paquete. Puede haber un trillón de maneras de hacerlo, pero para una solución rápida lo hice de esta manera. Pero necesito encontrar una mejor manera de evitar 2 recv por paquete, pero en mi condición todos los mensajes son de diferente tamaño, así que esta es la única forma que supongo.

archivo Proto

message log_packet { 
    required fixed64 log_time =1; 
    required fixed32 log_micro_sec =2; 
    required fixed32 sequence_no =3; 
    required fixed32 shm_app_id =4; 
    required string packet_id =5; 
    required string log_level=6; 
    required string log_msg=7; 
    } 

Protocolo Código búfer Cliente Servidor de códigos

#include <unistd.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/message.h> 
#include <google/protobuf/descriptor.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 


using namespace google::protobuf::io; 

using namespace std; 
int main(int argv, char** argc){ 

/* Coded output stram */ 

log_packet payload ; 

payload.set_log_time(10); 
payload.set_log_micro_sec(10); 
payload.set_sequence_no(1); 
payload.set_shm_app_id(101); 
payload.set_packet_id("TST"); 
payload.set_log_level("DEBUG"); 
payload.set_log_msg("What shall we say then"); 

cout<<"size after serilizing is "<<payload.ByteSize()<<endl; 
int siz = payload.ByteSize()+4; 
char *pkt = new char [siz]; 
google::protobuf::io::ArrayOutputStream aos(pkt,siz); 
CodedOutputStream *coded_output = new CodedOutputStream(&aos); 
coded_output->WriteVarint32(payload.ByteSize()); 
payload.SerializeToCodedStream(coded_output); 

     int host_port= 1101; 
     char* host_name="127.0.0.1"; 

     struct sockaddr_in my_addr; 

     char buffer[1024]; 
     int bytecount; 
     int buffer_len=0; 

     int hsock; 
     int * p_int; 
     int err; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n",errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n",errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = inet_addr(host_name); 
     if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       if((err = errno) != EINPROGRESS){ 
         fprintf(stderr, "Error connecting socket %d\n", errno); 
         goto FINISH; 
       } 
     } 




     for (int i =0;i<10000;i++){ 
      for (int j = 0 ;j<10;j++) { 

       if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) { 
         fprintf(stderr, "Error sending data %d\n", errno); 
         goto FINISH; 
       } 
       printf("Sent bytes %d\n", bytecount); 
       usleep(1); 
     } 
     } 
     delete pkt; 

FINISH: 
     close(hsock); 

} 

búfer Protocolo

#include <fcntl.h> 
#include <string.h> 
#include <stdlib.h> 
#include <errno.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <resolv.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 

using namespace std; 
using namespace google::protobuf::io; 



void* SocketHandler(void*); 

int main(int argv, char** argc){ 

     int host_port= 1101; 

     struct sockaddr_in my_addr; 

     int hsock; 
     int * p_int ; 
     int err; 

     socklen_t addr_size = 0; 
     int* csock; 
     sockaddr_in sadr; 
     pthread_t thread_id=0; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n", errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n", errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = INADDR_ANY ; 

     if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); 
       goto FINISH; 
     } 
     if(listen(hsock, 10) == -1){ 
       fprintf(stderr, "Error listening %d\n",errno); 
       goto FINISH; 
     } 

     //Now lets do the server stuff 

     addr_size = sizeof(sockaddr_in); 

     while(true){ 
       printf("waiting for a connection\n"); 
       csock = (int*)malloc(sizeof(int)); 
       if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){ 
         printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); 
         pthread_create(&thread_id,0,&SocketHandler, (void*)csock); 
         pthread_detach(thread_id); 
       } 
       else{ 
         fprintf(stderr, "Error accepting %d\n", errno); 
       } 
     } 

FINISH: 
;//oops 
} 

google::protobuf::uint32 readHdr(char *buf) 
{ 
    google::protobuf::uint32 size; 
    google::protobuf::io::ArrayInputStream ais(buf,4); 
    CodedInputStream coded_input(&ais); 
    coded_input.ReadVarint32(&size);//Decode the HDR and get the size 
    cout<<"size of payload is "<<size<<endl; 
    return size; 
} 

void readBody(int csock,google::protobuf::uint32 siz) 
{ 
    int bytecount; 
    log_packet payload; 
    char buffer [siz+4];//size of the payload and hdr 
    //Read the entire buffer including the hdr 
    if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     } 
    cout<<"Second read byte count is "<<bytecount<<endl; 
    //Assign ArrayInputStream with enough memory 
    google::protobuf::io::ArrayInputStream ais(buffer,siz+4); 
    CodedInputStream coded_input(&ais); 
    //Read an unsigned integer with Varint encoding, truncating to 32 bits. 
    coded_input.ReadVarint32(&siz); 
    //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
    //from reading beyond that length.Limits are used when parsing length-delimited 
    //embedded messages 
    google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); 
    //De-Serialize 
    payload.ParseFromCodedStream(&coded_input); 
    //Once the embedded message has been parsed, PopLimit() is called to undo the limit 
    coded_input.PopLimit(msgLimit); 
    //Print the message 
    cout<<"Message is "<<payload.DebugString(); 

} 

void* SocketHandler(void* lp){ 
    int *csock = (int*)lp; 

     char buffer[4]; 
     int bytecount=0; 
     string output,pl; 
     log_packet logp; 

     memset(buffer, '\0', 4); 

     while (1) { 
     //Peek into the socket and get the packet size 
     if((bytecount = recv(*csock, 
         buffer, 
           4, MSG_PEEK))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     }else if (bytecount == 0) 
       break; 
     cout<<"First read byte count is "<<bytecount<<endl; 
     readBody(*csock,readHdr(buffer)); 
     } 

FINISH: 
     free(csock); 
    return 0; 
} 
+0

Gracias a un compañero mucho. –

+0

+1 para la pregunta y +1 para la respuesta, esto fue útil. –

+1

Gracias! Esto fue muy útil con un problema que estaba teniendo –

10

Desafortunadamente, protobuf no proporciona una manera para "envase" (delimitación) sus mensajes:

Si desea escribir varios mensajes en un único archivo o un arroyo, que depende de usted para mantener seguimiento de donde termina un mensaje y comienza el siguiente . El formato de cable del protocolo de búfer no es de delimitación propia, por lo que los analizadores de búfer de protocolo no pueden determinar dónde termina un mensaje en su propio . La forma más fácil de resolver este problema es escribir el tamaño de en cada mensaje antes de escribir el mensaje.

(de su documentation)

Así que, básicamente recomiendan la misma solución que llegó a.

+0

Gracias Tamas así que iré con mi enfoque – punith