2012-02-28 16 views
18

Estoy escribiendo un archivo de gran tamaño con Node.js utilizando un writable stream:archivos de gran tamaño por escrito con Node.js

var fs  = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' }); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     stream.write(lines[i]); 
    } 
} 

Me pregunto si este esquema es seguro sin necesidad de utilizar drain evento? Si no es (lo que creo que es el caso), ¿cuál es el patrón para escribir datos grandes arbitrarios en un archivo?

Respuesta

13

Así es como finalmente lo hice. La idea detrás de esto es crear una secuencia de lectura que implemente la interfaz ReadStream y luego usar el método pipe() para canalizar datos a la secuencia de escritura.

var fs = require('fs'); 
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' }); 
var readStream = new MyReadStream(); 

readStream.pipe(writeStream); 
writeStream.on('close', function() { 
    console.log('All done!'); 
}); 

El ejemplo de MyReadStream clase puede ser tomada de mangosta QueryStream.

+12

¿Por qué necesita un ReadStream() cuando solo estamos interesados ​​en escribir cosas en un archivo? – krjampani

+0

@nab gracias. Cuando se canaliza, parece que no está agregando '\ r \ n' para el avance de línea, así que concat cada línea a uno ... – loretoparisi

9

La idea detrás de drenaje es que se usaría para probar aquí:

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     stream.write(lines[i]); //<-- the place to test 
    } 
} 

la que no lo está. Por lo tanto, necesitaría realizar una nueva configuración para que sea "reentrante".

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     var written = stream.write(lines[i]); //<-- the place to test 
     if (!written){ 
      //do something here to wait till you can safely write again 
      //this means prepare a buffer and wait till you can come back to finish 
      // lines[i] -> remainder 
     } 
    } 
} 

Sin embargo, ¿significa esto que usted necesita para mantener getLines tampón, así mientras espera?

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines, 
    buffer = { 
    remainingLines = [] 
    }; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     var written = stream.write(lines[i]); //<-- the place to test 
     if (!written){ 
      //do something here to wait till you can safely write again 
      //this means prepare a buffer and wait till you can come back to finish 
      // lines[i] -> remainder 
      buffer.remainingLines = lines.slice(i); 
      break; 
      //notice there's no way to re-run this once we leave here. 
     } 
    } 
} 

stream.on('drain',function(){ 
    if (buffer.remainingLines.length){ 
    for (var i = 0; i < buffer.remainingLines.length; i++) { 
     var written = stream.write(buffer.remainingLines[i]); //<-- the place to test 
     if (!written){ 
     //do something here to wait till you can safely write again 
     //this means prepare a buffer and wait till you can come back to finish 
     // lines[i] -> remainder 
     buffer.remainingLines = lines.slice(i); 
     } 
    } 
    } 
}); 
+3

No es necesario usar su propio búfer. Node.js ha hecho por ti. Lea el archivo de origen nodejs-source/lib/fs.js # WriteStream.prototype.write – ayanamist

2

[Editar] El Node.js actualizado writable.write(...) API docs dicen:

[El] valor de retorno es estrictamente consultivo. PUEDE continuar escribiendo, incluso si devuelve falso. Sin embargo, las escrituras se almacenarán en la memoria, por lo que es mejor no hacer esto excesivamente. En su lugar, espere el evento de drenaje antes de escribir más datos.

[Original] Desde el (el énfasis es mío) stream.write(...) documentation:

devoluciones true si la cadena se ha volcado a la memoria intermedia del núcleo. Devuelve false para indicar que el buffer del kernel está lleno, y los datos se enviarán en el futuro.

que interpretan que esto significa que la función de "escritura" devuelve true si la cadena dada fue escrito inmediatamente a la memoria intermedia del sistema operativo subyacente o false si aún no se ha escrito, pero será escrito por la función de escritura (por ejemplo, fue presumiblemente amortiguado para ti por WriteStream) para que no tengas que volver a llamar "escribir".

+1

pero "Al escribir un descriptor de archivo de esta manera, cerrar el descriptor antes de que la ruta se agote corre el riesgo de enviar un FD inválido (cerrado)". me hace pensar que el buffer está lleno significa que no puede aceptar más código de ti. Honestamente, no lo sé, y solo di mi mejor estimación como respuesta aquí. – jcolebrand

+0

@jcolebrand: ya, tampoco lo sé, pero supongo que el evento "drenar" solo indica que el sistema operativo está listo para escribir de inmediato, en caso de que realmente desee evitar el almacenamiento en búfer de cualquier tipo, ya sea suyo o del método WriteStream "write". Sin embargo, los documentos para "drain" mencionan "* safe to write again *", que es una mala elección de redacción o evidencia en contra de mi interpretación. – maerics

+0

dat Link 404's. – Alan

2

Encontré que las transmisiones son una forma deficiente para tratar con archivos de gran tamaño, esto es porque no puede establecer un tamaño de búfer de entrada adecuado (al menos no conozco una buena forma de hacerlo). Esto es lo que hago:

var fs = require('fs'); 

var i = fs.openSync('input.txt', 'r'); 
var o = fs.openSync('output.txt', 'w'); 

var buf = new Buffer(1024 * 1024), len, prev = ''; 

while(len = fs.readSync(i, buf, 0, buf.length)) { 

    var a = (prev + buf.toString('ascii', 0, len)).split('\n'); 
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : ''; 

    var out = ''; 
    a.forEach(function(line) { 

     if(!line) 
      return; 

     // do something with your line here 

     out += line + '\n'; 
    }); 

    var bout = new Buffer(out, 'ascii'); 
    fs.writeSync(o, bout, 0, bout.length); 
} 

fs.closeSync(o); 
fs.closeSync(i); 
+0

¿Tiene algún punto de referencia entre las pruebas' readStream/writeStream' y 'readSync/writeSync' para confirmar esto ¿responder? Gracias. – loretoparisi

1

La forma más limpia de manejar esto es para que su generador de línea de un readable stream - llamémosla lineReader.A continuación, el siguiente manejaría automáticamente los tampones y drenan bien para usted:

lineReader.pipe(fs.createWriteStream('someFile.txt')); 

Si no desea hacer una corriente de fácil lectura, se puede escuchar a write 's de salida de búfer plenitud y responder así:

var i = 0, n = lines.length; 
function write() { 
    if (i === n) return; // A callback could go here to know when it's done. 
    while (stream.write(lines[i++]) && i < n); 
    stream.once('drain', write); 
} 
write(); // Initial call. 

Un ejemplo más de esta situación se puede encontrar here.

1

Varias respuestas sugeridas a esta pregunta se han perdido por completo.

Este módulo puede ayudar https://www.npmjs.org/package/JSONStream

Sin embargo, vamos a suponer que la situación descrita y escribir el código de nosotros mismos. Está leyendo desde un MongoDB como una secuencia, con ObjectMode = true de forma predeterminada.

Esto dará lugar a problemas si intenta transmitir directamente al archivo, algo así como el error "No válido de cadena/búfer".

La solución a este tipo de problema es muy simple.

Simplemente ponga otra Transformación entre legible y escribible para adaptar apropiadamente el Objeto legible a una Cadena escribible. Código

Muestra Solución:

var fs = require('fs'), 
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }), 
    stream = require('stream'), 
    stringifier = new stream.Transform(); 
stringifier._writableState.objectMode = true; 
stringifier._transform = function (data, encoding, done) { 
    this.push(JSON.stringify(data)); 
    this.push('\n'); 
    done(); 
} 
rowFeedDao.getRowFeedsStream(merchantId, jobId) 
.pipe(stringifier) 
.pipe(writeStream).on('error', function (err) { 
    // handle error condition 
} 
0

Si no sucede que tiene un flujo de entrada no se puede utilizar fácilmente la tubería. Ninguno de los anteriores funcionó para mí, el evento de drenaje no se dispara. Resuelto de la siguiente manera (basado en la respuesta de Tylers):

var lines[]; // some very large array 
var i = 0; 

function write() { 
    if (i < lines.length) { 
     wstream.write(lines[i]), function(err){ 
      if (err) { 
       console.log(err); 
      } else { 
       i++; 
       write(); 
      } 
     }); 
    } else { 
     wstream.end(); 
     console.log("done"); 
    } 
}; 
write(); 
Cuestiones relacionadas