2011-08-24 27 views
6

Tengo una serie de colecciones de MongoDB que toman varios documentos JSON de varias fuentes de transmisión. En otras palabras, hay una serie de procesos que continuamente insertan datos en un conjunto de colecciones de MongoDB.MongoDb Transmisión de datos insertados en tiempo real (o casi en tiempo real)

Necesito una forma de transmitir los datos de MongoDB a aplicaciones posteriores. Así que quiero un sistema que conceptualmente se parece a esto:

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

O esto:

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

La pregunta es ¿cómo puedo flujo de datos de Mongo sin tener que sondear continuamente/consultar la base de datos?

La pregunta respuesta obvia sería "¿Por qué no cambiar los procesos de aplicaciones de streaming para enviar mensajes a una cola como el conejo, Cero o ActiveMQ que a su vez tiene los envían a sus procesos de Mongo Streaming y Mongo a la vez como esto":

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

en un mundo ideal, sí que sería bueno, pero necesitamos Mongo para asegurar que los mensajes se guardan en primer lugar, para evitar duplicados y garantizar que los identificadores son todos generados etc. Mongo tiene que sentarse en el medio como la persistencia capa.

Entonces, ¿cómo puedo transmitir mensajes de una colección de Mongo (sin usar GridFS, etc.) a estas aplicaciones de descarga. La escuela básica de pensamiento ha sido simplemente sondear documentos nuevos y cada documento que se recopile lo actualice agregando otro campo a los documentos JSON almacenados en la base de datos, muy similar a un indicador de proceso en una tabla SQL que almacena una marca de tiempo procesada. Es decir. cada 1 segundo sondeo para documentos donde se procesó == nulo .... add processed = now() .... document. update.

¿Existe un método más eficiente/más computacionalmente eficiente?

FYI - Estos son todos los procesos de Java.

¡Salud!

Respuesta

3

Si está escribiendo en una colección con mayúscula (o colecciones), puede usar tailablecursor para insertar datos nuevos en la transmisión o en una cola de mensajes desde donde se puede transmitir. Sin embargo, esto no funcionará para una colección no limitada.

+0

Gracias por el enlace. Lamentablemente, no utilizar colecciones limitadas, aunque no es una mala característica para un servicio de mensajería. Suena como un índice en el indicador procesado y el sondeo es la única opción ... Si un elemento de índice es nulo, ¿todavía se hace referencia en el índice o la consulta de null aún significa análisis de recopilación? – NightWolf

+1

O I spoze podríamos hacer que una colección con tope en un tamaño fijo actuara como un caché, luego sacar los artículos en una compra 1 y ponerlos nuevamente en una colección común. La pregunta entonces es ¿cómo guardamos nuestro cursor de posición entre ejecuciones de aplicaciones? Supongo que solo usamos un campo _id generado automáticamente por Mongo y seleccionamos todo lo que sea mayor que ese campo de ID ... ¿Están todos los _ID generados por Mongo en orden creciente? – NightWolf

+1

Los índices almacenan entradas para 'null'. Si está rastreando una colección con tope, necesita almacenar la última entrada que vio (puede almacenar esto como lo desee, usando otra colección mongo funcionará bien), y luego comience su cursor de disponibilidad en ese elemento usando '$ min 'y' omitir (1) 'para reanudar. Consulte http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta

Cuestiones relacionadas