2012-09-26 14 views
6

Comencé una pregunta similar en another thread, pero luego me enfocaba en cómo usar OpenCV. Al no haber logrado lo que originalmente quería, preguntaré aquí exactamente lo que quiero.La forma más rápida de calcular la distancia euclidiana mínima entre dos matrices que contienen vectores de alta dimensión

Tengo dos matrices. Matrix a es 2782x128 y Matrix b es 4000x128, ambos valores de char sin signo. Los valores se almacenan en una única matriz. Para cada vector en a, necesito el índice del vector en b con la distancia euclidiana más cercana.

Ok, ahora mi código para lograr esto:

#include <windows.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <cstdio> 
#include <math.h> 
#include <time.h> 
#include <sys/timeb.h> 
#include <iostream> 
#include <fstream> 
#include "main.h" 

using namespace std; 

void main(int argc, char* argv[]) 
{ 
    int a_size; 
    unsigned char* a = NULL; 
    read_matrix(&a, a_size,"matrixa"); 
    int b_size; 
    unsigned char* b = NULL; 
    read_matrix(&b, b_size,"matrixb"); 

    LARGE_INTEGER liStart; 
    LARGE_INTEGER liEnd; 
    LARGE_INTEGER liPerfFreq; 
    QueryPerformanceFrequency(&liPerfFreq); 
    QueryPerformanceCounter(&liStart); 

    int* indexes = NULL; 
    min_distance_loop(&indexes, b, b_size, a, a_size); 

    QueryPerformanceCounter(&liEnd); 

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart)/long double(liPerfFreq.QuadPart) << "s." << endl; 

    if (a) 
    delete[]a; 
if (b) 
    delete[]b; 
if (indexes) 
    delete[]indexes; 
    return; 
} 

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath) 
{ 
    ofstream myfile; 
    float f; 
    FILE * pFile; 
    pFile = fopen (matrixPath,"r"); 
    fscanf (pFile, "%d", &matrix_size); 
    *matrix = new unsigned char[matrix_size*128]; 

    for (int i=0; i<matrix_size*128; ++i) 
    { 
     unsigned int matPtr; 
     fscanf (pFile, "%u", &matPtr); 
     matrix[i]=(unsigned char)matPtr; 
    } 
    fclose (pFile); 
} 

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size) 
{ 
    const int descrSize = 128; 

    *indexes = (int*)malloc(a_size*sizeof(int)); 
    int dataIndex=0; 
    int vocIndex=0; 
    int min_distance; 
    int distance; 
    int multiply; 

    unsigned char* dataPtr; 
    unsigned char* vocPtr; 
    for (int i=0; i<a_size; ++i) 
    { 
     min_distance = LONG_MAX; 
     for (int j=0; j<b_size; ++j) 
     { 
      distance=0; 
      dataPtr = &a[dataIndex]; 
      vocPtr = &b[vocIndex]; 

      for (int k=0; k<descrSize; ++k) 
      { 
       multiply = *dataPtr++-*vocPtr++; 
       distance += multiply*multiply; 
       // If the distance is greater than the previously calculated, exit 
       if (distance>min_distance) 
        break; 
      } 

      // if distance smaller 
      if (distance<min_distance) 
      { 
       min_distance = distance; 
       (*indexes)[i] = j; 
      } 
      vocIndex+=descrSize; 
     } 
     dataIndex+=descrSize; 
     vocIndex=0; 
    } 
} 

Y adjuntan los archivos con matrices de muestras.

matrixa matrixb

estoy usando windows.h sólo para calcular el tiempo, por lo que si desea probar el código en otra plataforma que no sea Windows, basta con cambiar windows.h cabecera y cambiar la forma de calcular el tiempo de consumo.

Este código en mi computadora es de aproximadamente 0.5 segundos. El problema es que tengo otro código en Matlab que hace lo mismo en 0.05 segundos. En mis experimentos, recibo varias matrices como la matriz a por segundo, por lo que 0.5 segundos es demasiado.

Ahora el código MATLAB para calcular esto:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab)); 
[minz index]=min(d,[],2); 

Ok. El código de Matlab está usando eso (x-a)^2 = x^2 + a^2 - 2ab.

Así que mi próximo intento fue hacer lo mismo. Borré mi propio código para hacer los mismos cálculos, pero fue 1.2 segundos aprox.

Luego, traté de usar diferentes bibliotecas externas. El primer intento fue Eigen:

const int descrSize = 128; 
MatrixXi a(a_size, descrSize); 
MatrixXi b(b_size, descrSize); 
MatrixXi ab(a_size, b_size); 

unsigned char* dataPtr = matrixa; 
for (int i=0; i<nframes; ++i) 
{ 
    for (int j=0; j<descrSize; ++j) 
    { 
     a(i,j)=(int)*dataPtr++; 
    } 
} 
unsigned char* vocPtr = matrixb; 
for (int i=0; i<vocabulary_size; ++i) 
{ 
    for (int j=0; j<descrSize; ++j) 
    { 
     b(i,j)=(int)*vocPtr ++; 
    } 
} 
ab = a*b.transpose(); 
a.cwiseProduct(a); 
b.cwiseProduct(b); 
MatrixXi aa = a.rowwise().sum(); 
MatrixXi bb = b.rowwise().sum(); 
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2(); 

int* index = NULL; 
index = (int*)malloc(nframes*sizeof(int)); 
for (int i=0; i<nframes; ++i) 
{ 
    d.row(i).minCoeff(&index[i]); 
} 

cuesta Este código Eigen 1,2 aprox sólo por la línea que dice: ab = a * b.transpose();

Se usó también un código similar usando opencv, y el costo de ab = a * b.transpose(); fue 0.65 segundos.

Por lo tanto, es realmente molesto que matlab pueda hacer lo mismo tan rápido y no puedo hacerlo en C++. Por supuesto, ser capaz de ejecutar mi experimento sería genial, pero creo que la falta de conocimiento es lo que realmente me molesta. ¿Cómo puedo lograr al menos el mismo rendimiento que en Matlab? Cualquier tipo de solución es bienvenida. Quiero decir, cualquier biblioteca externa (gratis si es posible), bucle de desenrollar cosas, elementos de plantilla, instrucciones SSE (sé que existen), cosas de caché. Como dije, mi principal objetivo es aumentar mi conocimiento para poder codificar este tipo de pensamientos con un rendimiento más rápido.

Gracias de antemano

EDIT: más código sugeridos por David Hammen. Fundí las matrices en int antes de hacer ningún cálculo. Aquí está el código:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size) 
{ 
    const int descrSize = 128; 

    int* a_int; 
    int* b_int; 

    LARGE_INTEGER liStart; 
    LARGE_INTEGER liEnd; 
    LARGE_INTEGER liPerfFreq; 
    QueryPerformanceFrequency(&liPerfFreq); 
    QueryPerformanceCounter(&liStart); 

    a_int = (int*)malloc(a_size*descrSize*sizeof(int)); 
    b_int = (int*)malloc(b_size*descrSize*sizeof(int)); 

    for(int i=0; i<descrSize*a_size; ++i) 
     a_int[i]=(int)a[i]; 
    for(int i=0; i<descrSize*b_size; ++i) 
     b_int[i]=(int)b[i]; 

    QueryPerformanceCounter(&liEnd); 

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart)/long double(liPerfFreq.QuadPart) << "s." << endl; 

    *indexes = (int*)malloc(a_size*sizeof(int)); 
    int dataIndex=0; 
    int vocIndex=0; 
    int min_distance; 
    int distance; 
    int multiply; 

    /*unsigned char* dataPtr; 
    unsigned char* vocPtr;*/ 
    int* dataPtr; 
    int* vocPtr; 
    for (int i=0; i<a_size; ++i) 
    { 
     min_distance = LONG_MAX; 
     for (int j=0; j<b_size; ++j) 
     { 
      distance=0; 
      dataPtr = &a_int[dataIndex]; 
      vocPtr = &b_int[vocIndex]; 

      for (int k=0; k<descrSize; ++k) 
      { 
       multiply = *dataPtr++-*vocPtr++; 
       distance += multiply*multiply; 
       // If the distance is greater than the previously calculated, exit 
       if (distance>min_distance) 
        break; 
      } 

      // if distance smaller 
      if (distance<min_distance) 
      { 
       min_distance = distance; 
       (*indexes)[i] = j; 
      } 
      vocIndex+=descrSize; 
     } 
     dataIndex+=descrSize; 
     vocIndex=0; 
    } 
} 

todo el proceso es ahora 0,6, y los bucles de colada al comienzo son 0.001 segundos. Tal vez hice algo mal?

EDIT2: ¿Algo sobre Eigen? Cuando busco libs externas, siempre hablan sobre Eigen y su velocidad. Hice algo mal? Aquí un código simple usando Eigen que muestra que no es tan rápido. Tal vez me falta alguna configuración o alguna bandera, o ...

MatrixXd A = MatrixXd::Random(1000, 1000); 
MatrixXd B = MatrixXd::Random(1000, 500); 
MatrixXd X; 

Este código es de aproximadamente 0.9 segundos.

+0

¿Ha compilado todas las pruebas en modo de lanzamiento? –

+0

Puede resultarle molesto que Matlab supere el rendimiento de su código, pero yo, que uso mucho Matlab, lo encuentro muy satisfactorio. No tengo muchos consejos concretos para usted, pero la clave para mejorar el rendimiento de este tipo de código suele ser hacer un uso óptimo (o al menos muy bueno) de la jerarquía de memoria en las CPU modernas. Otro factor a considerar es que gran parte de la funcionalidad principal de Matlab ahora tiene múltiples subprocesos para su ejecución en CPU de núcleos múltiples, no tengo claro que ninguno de sus propios códigos tenga múltiples hilos; eso probablemente tendría algún impacto en el rendimiento. –

+0

No sé cómo ayudarte a hacer que tu código C/C++ sea más rápido (tu código se parece más a C que a C++. Evidencia: 'malloc') aún, pero puedo ver cómo puedes hacer que tu código Matlab sea más rápido: elimina el 'sqrt'. Dado dos números no negativos a y b, sqrt (a)> sqrt (b) ⇔ a> b. –

Respuesta

2

Una cosa que definitivamente te está doliendo en tu código C++ es que tiene un bote de conversiones de char a int. Por carga de barco, me refiero a conversiones de hasta 2 * 2782 * 4000 * 128 caracteres a int. Esas conversiones char a int son lentas, muy lentas.

Puede reducir esto a (2782 + 4000) * 128 este tipo de conversiones mediante la asignación de un par de int matrices, uno 2782 * 128 y la otra 4000 * 128, para contener el contenido-fundido a número entero de su char* a y char* b matrices. Trabaje con estas matrices int* en lugar de sus matrices char*.

Otro problema podría ser su uso de int contra long. No trabajo en Windows, por lo que esto podría no ser aplicable. En las máquinas en las que trabajo, int es de 32 bits y long ahora tiene 64 bits. 32 bits es más que suficiente porque 255 * 255 * 128 < 256 * 256 * 128 = 2 .

Eso obviamente no es el problema.

Lo que llama la atención es que el código en cuestión no está calculando la enorme matriz de 2728 por 4000 que está creando el código de Matlab. Lo que es aún más sorprendente es que es muy probable que Matlab haga esto con dobles en lugar de con enteros, y todavía está superando a los pantalones con el código C/C++.

Un gran problema es la memoria caché. Esa matriz 4000 * 128 es demasiado grande para la memoria caché de nivel 1, y está iterando sobre esa gran matriz 2782 veces. Tu código está haciendo demasiada espera en la memoria. Para solucionar este problema, trabaje con fragmentos más pequeños de la matriz b para que su código funcione con caché de nivel 1 durante el mayor tiempo posible.

Otro problema es la optimización if (distance>min_distance) break;. Sospecho que esto es realmente una des-optimización. Tener las pruebas if dentro de su ciclo más interno es a menudo una mala idea. Atraviesa ese producto interno lo más rápido posible. Aparte de los cálculos desperdiciados, no hay daño en deshacerse de esta prueba. Algunas veces es mejor hacer cálculos aparentemente innecesarios si al hacerlo se puede eliminar una rama en un bucle más interno. Este es uno de esos casos. Quizás pueda resolver su problema simplemente eliminando esta prueba. Intenta hacer eso.

Volviendo al problema de caché, debe deshacerse de esta rama para poder dividir las operaciones sobre la matriz a y b en trozos más pequeños, trozos de no más de 256 filas a la vez. Así es como muchas filas de 128 caracteres sin firmar caben en uno de los dos cachés L1 del moderno chip Intel. Dado que 250 divide 4000, busque dividir lógicamente esa matriz b en 16 fragmentos. Es posible que desee formar ese gran conjunto de productos internos 2872 por 4000, pero hágalo en trozos pequeños. Puede agregar ese if (distance>min_distance) break; de nuevo, pero hágalo en un nivel de fragmento en lugar de a nivel de byte por byte.

Debería poder vencer a Matlab porque casi con certeza funciona con dobles, pero puede trabajar con caracteres y caracteres sin signo.

+0

Gracias David, pero el casting es de aproximadamente 0.001 segundos. Intenté también poner las matrices a y b en matrices int *, y el rendimiento fue peor. –

+0

Estás lanzando no importa qué, y esos lanzamientos son lentos. Esta línea, 'multiplicar = * dataPtr ++ - * vocPtr ++; 'implica dos conversiones de char sin signo a int. Los resultados de' * dataPtr' y '* vocPtr' se convierten en enteros, luego se restan. Hiciste algo mal si hiciste los moldes antes de tiempo y obtuve peor rendimiento. –

+0

Ok, agregué una nueva respuesta haciendo ca anterior pinchar en matrices. Editar: No puedo agregar una nueva respuesta, estoy editando la publicación original. Será demasiado grande, creo ... –

1

La matriz multiplicada generalmente utiliza el peor patrón de acceso de caché posible para una de las dos matrices, y la solución es transponer una de las matrices y utilizar un algoritmo de multiplicación especializado que funciona con datos almacenados de esa manera.

Su matriz ya está almacenada transpuesta. Al transponerlo al orden normal y luego usar una multiplicación de matriz normal, estás matando completamente el rendimiento.

Escribe tu propio bucle de multiplicación de matriz que invierte el orden de los índices en la segunda matriz (que tiene el efecto de transponerlo, sin mover realmente nada y romper el comportamiento de la memoria caché). Y pase a su compilador las opciones que tenga para habilitar la auto-vectorización.

+0

Creo que no entiendo tu punto. En mi propia implementación de la distancia mínima (primer código), no estoy transponiendo nada. En el caso del código Eigen, necesito transponerlo. –

+0

@ min.yong.yoon: Bueno, ¿has mirado el código generado por el compilador para ese ciclo de multiplicación? ¿Está usando las instrucciones de SSE2? –

+0

Intenté activar las instrucciones SSE2 en mi configuración del proyecto Visul Studio 10, pero el tiempo de consumo es el mismo, y el código generado para el ciclo es el mismo también –

3

Como observó, su código está dominado por el producto de la matriz que representa aproximadamente 2.8e9 operaciones aritméticas. Yopu dice que Matlab (o más bien el MKL altamente optimizado) lo calcula en aproximadamente 0.05s. Esto representa una tasa de 57 GFLOPS que muestra que no solo utiliza la vectorización sino también el multihilo. Con Eigen, puede habilitar el multi-threading compilando con OpenMP habilitado (-fopenmp con gcc). En mi computadora de 5 años (2,66 Ghz Core2), usando flotadores y 4 hilos, su producto tarda aproximadamente 0.053s, y 0.16s sin OpenMP, por lo que debe haber algo mal con sus banderas de compilación. Para resumen, para obtener lo mejor de Eigen:

  • compilar en modo 64bits
  • flotadores de uso (dobles son dos veces más lenta debido a la vectorización)
  • permitir OpenMP
  • si su CPU tiene Hyper-Threading , deshabilítelo o defina la variable de entorno OMP_NUM_THREADS en la cantidad de núcleos físicos (esto es muy importante, de lo contrario, el rendimiento será muy malo)
  • si tiene otra tarea ejecutándose, podría ser una buena idea reducirla OMP_NUM_THREADS a nb_cores-1
  • utilice el compilador más reciente que pueda, GCC, clang e ICC son los mejores, MSVC suele ser más lento.
Cuestiones relacionadas