5

Estoy diseñando un sistema que debe analizar un gran número de transacciones de usuarios y producir medidas agregadas (como tendencias, etc.). El sistema debería funcionar rápido, ser robusto y escalable. El sistema está basado en Java (en Linux).Diseño del sistema de procesamiento analítico en tiempo real

Los datos provienen de un sistema que genera archivos de registro (basados ​​en CSV) de transacciones de usuario. El sistema genera un archivo cada minuto y cada archivo contiene las transacciones de diferentes usuarios (ordenados por tiempo), cada archivo puede contener miles de usuarios.

Una estructura de datos de muestra para un archivo CSV:

10: 30: 01, el usuario 1, ...
10: 30: 01, el usuario 1, ...
10:30:02 , 78 de usuario, ...
10: 30: 02, el usuario 2, ...
10: 30: 03, el usuario 1, ...
10: 30: 04, el usuario 2, ...
. . .

El sistema que estoy planeando debe procesar los archivos y realizar algunos análisis en tiempo real. Tiene que recopilar la entrada, enviarla a varios algoritmos y otros sistemas y almacenar los resultados calculados en una base de datos. La base de datos no contiene los registros de entrada reales, sino solo el análisis agregado de alto nivel sobre las transacciones. Por ejemplo tendencias y etc.

El primer algoritmo que planeo usar requiere para un mejor funcionamiento de al menos 10 registros de usuario, si no puede encontrar 10 registros después de 5 minutos, debe usar los datos disponibles.

Me gustaría utilizar Storm para la implementación, pero preferiría dejar esta discusión en el nivel de diseño tanto como sea posible.

Una lista de los componentes del sistema:

  1. Una tarea que supervisa los archivos entrantes cada minuto.

  2. Una tarea que lee el archivo, lo analiza y lo pone a disposición para otros componentes y algoritmos del sistema.

  3. Un componente para almacenar 10 registros para un usuario (no más de 5 minutos), cuando se recopilan 10 registros o pasan 5 minutos, es hora de enviar los datos al algoritmo para su posterior procesamiento. Dado que el requisito es proporcionar al menos 10 registros para el algoritmo, pensé en usar Storm Field Grouping (lo que significa que se llama a la misma tarea para el mismo usuario) y rastrear la colección de 10 registros de usuario dentro de la tarea, por supuesto planea tener varias de estas tareas, cada una maneja una porción de los usuarios.

  4. Existen otros componentes que funcionan en una sola transacción, para ellos planeo crear otras tareas que reciban cada transacción a medida que se analizan (en paralelo a otras tareas).

Necesito tu ayuda con el n. ° 3.

¿Cuáles son las mejores prácticas para diseñar un componente de este tipo? Es obvio que necesita mantener los datos de 10 registros por usuario. Un mapa de valores clave puede ayudar. ¿Es mejor tener el mapa administrado en la tarea en sí o usar un caché distribuido? Por ejemplo, Redis es un almacén de valores clave (nunca lo había usado antes).

Gracias por su ayuda

Respuesta

5

He trabajado bastante con redis. Por lo tanto, voy a comentar en su idea de utilizar Redis

# 3 tiene 3 requisitos

  1. Buffer por usuario

  2. Buffer durante 10 Tareas

  3. debe expirar cada 5 minutos

1. Buffer P Usuario er: Redis es solo una tienda de valores clave. Aunque es compatible con una amplia variedad de datatypes, siempre son valores asignados a una clave STRING. Por lo tanto, debe decidir cómo identificar a un usuario de forma única en caso de que necesite un búfer por usuario. Porque en redis nunca obtendrá un error cuando anule un nuevo valor clave. Una solución podría ser verificar la existencia antes de escribir.

2. Tampón para 10 tareas: Obviamente, puede implementar un queue en redis. Pero la restricción de su tamaño le queda a usted. Ejemplo: usando LPUSH y LTRIM o usando LLEN para verificar la longitud y decidir si activa su proceso. La clave asociada con esta cola debe ser la que usted decidió en la parte 1.

3. El tampón caduca en 5 min: Esta es una tarea más difícil. En redis cada clave independientemente del tipo de datos subyacente que tiene el valor, puede tener un expiry. Pero el proceso de caducidad es silencioso. No se le notificará al vencimiento de ninguna clave. Por lo tanto, perderá silenciosamente su memoria intermedia si usa esta propiedad. Una solución para esto es tener un índice. Significa que el índice correlacionará una marca de tiempo con las claves que deben expirar en ese valor de marca de tiempo. Luego, en segundo plano, puede leer el índice cada minuto y eliminar manualmente la clave [después de la lectura] de redis y llamar al proceso deseado con los datos del búfer. Para tener dicho índice, puede mirar Sorted Sets. Donde la marca de tiempo será su score y establecer member serán las claves [clave única por usuario decidido en la parte 1 que se asigna a una cola] que desea eliminar en esa marca de tiempo. Usted puede hacer zrangebyscore para leer todos los miembros del conjunto con la marca de tiempo especificada

general:

Usar lista Redis para implementar una cola.

Use LLEN para asegurarse de no exceder su límite de 10.

Cada vez que cree una nueva lista, realice una entrada en el índice [Conjunto ordenado] con Calificación como Current Timestamp + 5 min y valor como la clave de la lista.

Cuando LLEN llega a 10, recuerde leer y luego quitar la clave del índice [conjunto ordenado] y de la base de datos [eliminar la lista de teclas->]. Luego activa tu proceso con datos.

Por cada minuto mínimo, genere la marca de tiempo actual, lea el índice y para cada clave, lea los datos, luego elimine la clave de db y active su proceso.

Esta podría ser mi manera de implementarlo. Puede haber alguna otra forma mejor para modelar sus datos en Redis

0

Para modificar las preferencias 1 & 2: [Apache Flume o Kafka]

Para su requisito # 3: [Esper perno en el interior de la tormenta. En Redis para lograr esto, tendrá que volver a escribir Esper Logic.]

Cuestiones relacionadas