読者です 読者をやめる 読者になる 読者になる

Stream今昔物語

node.js

この記事は Node.js Advent Calendar 2014 の 1日目の記事です。

こんにちは、代表です。

Stream大好きなみんなのためにStreamの過去と現在、そして未来についてお話するよ!!
Streamを何故使うのかっていう話と歴史的な話をします!!

Streamとは

データの流れを扱うための抽象化されたモジュールです。もうみんな耳にタコかもしれませんが、Streamを使うとデータの流れを綺麗に書くことができます。ちなみに今に始まった概念ではなくて、C++にもC#にもStreamがありますし、Java8の新機能にもStreamがあります。「データを扱うときの抽象化された流れ」を指す広義のStreamという意味では今日の言語ではだいたい実装されています。

Node.jsでは、以下のリンクが参考になるでしょう。


Node.js の Stream API で「データの流れ」を扱う方法 - Block Rockin’ Codes

Streamを何故使うべきなのか

まずそもそもStreamをなんで使うべきなのかっていう話です。
とりあえずこれを見てください。

ファイルからデータを読み込んでhttpレスポンスを作る

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    // fsのreadFileを行ってdata.txtを読み込む
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        // res.endでdataの中身を送る
        res.end(data);
    });
});
server.listen(8000);

これは一般的なhttpサーバーの例ですが、fs.readFileメソッドdata.txtを読み込み、コールバックでレスポンスとして返しています。一応普通に動きますがちょっとした問題があります。
それは、一旦callbackの第二引数にあるdataの値を生成するためにdata.txtの値をメモリ中に持ってそれからres.endに渡しているっていう事ですね。もしもdata.txtがめっちゃでっかいサイズのデータだったらメモリを食う結果になります。

    // fsのreadFileを行ってdata.txtを読み込む
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        // res.endでdataの中身を送る
        // このdataがすごく大きいとメモリを食う
        res.end(data);
    });

そこでこういう事が無いようにStreamを使いましょうというのがよく言われる話です。
Streamを使って書くとこうなります。


Streamを使ってレスポンスする

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    // ここで Streamを作る
    var stream = fs.createReadStream(__dirname + '/data.txt');
    // resにpipeする
    stream.pipe(res);
});
server.listen(8000);

こうすることで解決されます。こうやってStreamに書き換えることで2ついいことがありますね。一つはメモリ効率。もう一つはcallbackのネストが一つ減りましたね。

逆に失われたのはdataを加工しにくくなりました。前だったらdataを加工する場合はこう書くことができました。


レスポンスデータを加工する

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    // fsのreadFileを行ってdata.txtを読み込む
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        // dataを加工する、最後にhogeって付け足す。
        var newData = data + "hoge";
        res.end(newData);
    });
});
server.listen(8000);


じゃあ逆にStream使って加工したい場合はどうするのがいいんでしょう?
そう、データを加工するためのStreamを作るんです。このStreamをTransformStreamって言います。

coreのモジュールだけでこのTransformStreamを作ってもいいのですが、ここではgulpなどでお馴染みのthrough2を使います。


Streamを使ってレスポンスデータを加工する

var http = require('http');
var fs = require('fs');
var through2 = require('through2');

var server = http.createServer(function (req, res) {
    // 末尾に hoge って出す streamを作る
    var hogeStream = through2(
      function transform(chunk, enc, next){
        // ここでは何もしない、ただcallbackを呼ぶだけ
        // 流れているデータに対して値を加工したい場合はここで処理する
        next(null, chunk);
      },
      function flush(cb){
        // 最後だけ"hoge"っていう文字列を入れる
        this.push("hoge");
        cb();
      }
    );
    var stream = fs.createReadStream(__dirname + '/data.txt');
    // resにpipeする
    stream.pipe(hogeStream).pipe(res);
});
server.listen(8000);

ここまでのStreamの話を抑えておきます。

1. Streamはデータの流れを扱う奴
2. pipeを使うとReadableとWritableのStreamを繋げることができるようになる。
3. データを加工するにはTransformStreamを使う

でかいJSONを扱う時はJSONStreamを使おう

Streamの例で実践的な話としてはnpmのパッケージ一覧だったり、twitterのtimeline APIで取れるような大きなJSONを扱う場合は、絶対JSONStreamを使ったほうが良いです。

Node.jsのJSONをparseしたりstringifyする際に組み込みのJSONオブジェクトを使うとそのタイミングでJavaScriptオブジェクトにしてくれたり、文字列にしてくれたりしますが、これも先程の例と同じで一旦Streamからメモリに貯めてオブジェクトにする必要があります。

Streamをこよなく愛するものはJSONをParseする際にJSONStreamっていうDominic Tarrが作っているやつを使いましょう。これは組み込みのJSONを使わずに自前でParserを実装していて、それをStreamで流すようにしています。なので、Streamの流れを止めること無くJSONをparseしたりstringifyしたりすることができます。

var http = require('http');
var fs = require("fs");
var JSONStream = require("JSONStream");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/package.json');
    // package.jsonからversionの値だけ引っこ抜いてresponseに返す
    stream.pipe(JSONStream.parse("version")).pipe(res);
});
server.listen(8000);

parseに渡す時にkeyを指定すれば、その値だけ取ることもできますし、keyだけじゃなくてもう全て必要なときは以下のようにする必要があります。

var http = require('http');
var fs = require("fs");
var JSONStream = require("JSONStream");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/package.json');
    // package.jsonからversionの値だけ引っこ抜いてresponseに返す
    stream.pipe(JSONStream.parse()).on("root", function(root){
        res.end(root.version);
    });
});
server.listen(8000);

ただこの場合は、結局全てを一旦Objectにしているので、JSON.parseと非同期になる以外は変わりません。基本的には大きなファイルを扱う際は全てをオブジェクトにするんじゃなくて、スクレイピングする要領で一部のデータだけを扱って下さい。

Before Stream

こっからはNode.jsにおけるStreamの歴史です。

そもそもNode.jsはlibuv(Event駆動なIO処理) + V8 JavaScriptエンジンというだけでそこにStreamなんてものは最初からは無かった。http, fs, tcpそれぞれ違うイベントを持ってたし、そうなるとそれぞれのデータのやり取りの方法は分かっても統一的なインタフェースが無いから覚えにくかった。

たとえば、httpリクエストは今でこそPOSTの時のbodyを受け取るイベントは"data"イベント(Stream2では"readable"イベント)だけど、前までは"body"みたいなイベントだった。これだとインタフェースが統一されてないから覚えなきゃいけないことが多い。

Stream0

それからStreamが生まれた。これはv0.1の頃。

これはReadableStreamを以下のように書くことにしていた。

// readable stream
// データ読み込み
readable.emit('data', chunk);
// 読み込み終了
readable.emit('end');

// writable stream
// データ書き込み
writable.write(chunk);
// 書き込み終了
writable.end();

んで、Readable Streamと Writable Streamを結ぶメソッドを作られた。
これが、util.pumpっていうメソッドだ。これを使うと、以下のようにStreamを結ぶことができる。

util.pump(readable, writable);

ただこのメソッドv0.1の頃の遺産でもう今は使わない
実際のStreamはdataとend以外のerrorとかcloseみたいな色んなイベントを扱うようになってしまったし。Streamにカスタマイズ性があることを考慮していくとutilpumpっていうメソッド単体で拡張するのは難しいっていう判断があったんだと思う。

かくしてこのStreamに大改修が行われていく。

Stream1

さて、こっからは馴染みやすいStreamになっていく。

readable.on("data", function(data){
  // データ読み込み処理
});
readable.on("end", function(){
  // 読み込み終了処理
});
// 読み込み一時停止
readable.pause();
// 読み込み再開
readable.resume();

// データ書き込み処理、書き込めなかったらfalseが返る
writable.write(data);
// データ書き込み終了処理
writable.end();
writable.on("drain", function(){
  // 再びwriteできるようになった時の処理
});

またStream1になるとpipeメソッドが生まれてreadableとwritableのStreamを以下のように繋げることができるようになる。

readable.pipe(writable);

これによってStreamを改善しつつ、JavaScriptライクな書き方ができるようになった。

// こんな感じで書けるようになった。
a.pipe(b);
b.pipe(c);
c.pipe(d);
// さらにv0.6からはpipeの第一引数のstreamが戻り値になるようになったので、chainメソッドっぽく書けるようになった。
a.pipe(b).pipe(c).pipe(d);

これでめでたしめでたし、と思ったけどこっからが問題。
Stream1にはいくつか欠点があった。

Stream1の欠点

  • dataイベントの受信(readable.on("data", function(){}))の前にデータが送信された場合にデータを取り逃してしまうことがある。
  • これを防ぐために書き出しの前にReadableStreamが受信してるかチェックして、なかったらデータをメモリ中に貯めておくっていう、所謂Bufferingを作る必要がある。でもこれはとても難しい。ユーザーのライブラリで下手に作るとメモリを馬鹿みたいに食うようになっちゃったりするし、メモリリークの元になっちゃう。
  • 他にもIOが不安定な環境でStreamを使ったりするとpause(読み込み一時停止)とresume(読み込み再開)のメソッド呼び出しが多発してしまうので多少CPU負荷がかかる

というわけでシンプルでいいよね、と思ってたStream1だったけど、悪い側面も出てきてしまった。この問題を解決するためにStream2が導入されたんだ。

Stream2

内部にバッファ機構を持ったStream、これでStream1の欠点を解消している。バッファがあるからデータが受信されるようになるまでデータをロストすることなくメモリ中に値を持てるし、IOが不安定な環境でもバッファリングすることでCPU負荷も改善できるようになったんだ。

readable.on("readable", function(){
  // データ読み込み処理がreadableイベントになり、
  // データをもらうときはreadメソッドを明示的に呼び出すようになった。
  // readしない場合はbufferに溜まり続ける
  var data = readable.read();
});
readable.on("end", function(){
  // 読み込み終了処理
});
// 読み込み一時停止
readable.pause();
// 読み込み再開
readable.resume();

// データ書き込み処理、書き込めなかったらfalseが返る
writable.write(data);
// データ書き込み終了処理
writable.end();
writable.on("drain", function(){
  // 再びwriteできるようになった時の処理
}); 

さて、大きな違いは見てもらったとおり、dataイベントじゃなくて、readableイベントになって、bufferから明示的にデータを読み込むようになったことだ。残念だけど、古いNode.jsの本とか記事だとまだdataイベントのままになっていることが多くてどうしてもStream2の恩恵を受けていないことが多い。

ちなみにStream2のもう一つの特徴としては、Stream1のインタフェースは後方互換のために残しているってことだ。dataイベントを受け付けるようにしててもStream1モード(old mode)になって動いてくれる。

いやーめでたしめでたし、Stream1から改善された。と思ったけど、こっからがv0.12以降のFutureの話だ。
Stream2にも欠点があった。

Stream2の欠点

  • Stream1のモードに一回なっちゃうとStream2には戻れなくなる、Stream1とStream2を混ぜて使えない
  • readableイベントで能動的にdataを取りつつもdataイベントでデータを受動的に受けたい場合もある

Stream3

v0.12ではこのStream3が入ることで欠点を解消してくれる。
Stream3 = Stream1 + Stream2 であり、このStream1モードの概念が無くなり、どっちのAPIにも対応されるようになり、シンプルになった。
readableイベントもdataイベントも両方発火されるため、Stream1の受動的なデータ受信もStream2のpull型のデータ取得もできるようになった。使う側が好きな方を選ぶという仕組みに。

また、Writeする際にBufferingの調整をしてから出力したいっていうユースケースに対応するためにwritev(2)を使って必ずStream中のバッファに出力するcorkとそのバッファからflushするuncorkがWritableStreamに追加された。これによってデータを出力させずに強制的にバッファリングさせることも可能になった。

というわけでStreamに完全な互換性が追加され、若干性能面でも安定したStream3が完成。これがv0.12から提供されるStreamだ。もちろんv0.11(開発版)でも既に使える。

安定版(v0.10)でも使いたいあなたに。

実はStream3の参考実装は既に今のnpmに上がっている readable-stream@1.1.x を使えばStream3になります。

$ npm install readable-stream@1.1
var Transform = require('readable-stream/transform');
var fs = require('fs');
var util = require('util');
util.inherits(HogeStream, Transform);

function HogeStream(option) {
  Transform.call(this, option);
}
HogeStream.prototype._transform = function(chunk, enc, cb) {
  cb(null, chunk);
};
HogeStream.prototype._flush = function(cb) {
  this.push("hoge"); 
  cb(); 
};
var stream = fs.createReadStream(__dirname + '/data.txt');
var hogeStream = new HogeStream();
stream.pipe(hogeStream);

hogeStream.on("readable", function(){
  // Readableでpullで取得する
  console.log("read :"+ hogeStream.read());
});
hogeStream.on("data", function(data){ // flowing modeになる
  // dataを受動的に受信する
  console.log("data :"+ data);
});
// paused modeに移行、この行があるとreadでデータを読めるが、この行がないとreadにはデータは残らない
hogeStream.pause();

こうするとv0.10でもStream3があなたの手元に。また、readableとdataを受診するのも可能。もしもthrough2から使いたい場合は以下のようにしてしましょう。

$ npm install through2@1

through2もreadable-streamもバージョン指定しなければインストールされるのは安定版の古いバージョンですが、Stream3を触りたい人達のために既にnpmには新しいものができています。もしもStreamの新しいバージョンのものが今の安定版でも使いたい場合はバージョン指定して使ってみましょう。