Streamについて、超絶シンプルなTCP Chat を基に説明してみる。

これは良い感じのStream/pipeをおさらいする機会だなぁと思ったのでシェア。

A Simple TCP Chat Server | Blog

Streamを使ったシンプルなTCP Chatです。コメントに合わせて翻訳も一応しておきます。

var net = require('net');

// チャットクライアントのリスト
var clients = [];

var server = net.createServer(function(sock){

  // 1. end リスナーをセットアップ。endイベントが来たら、クライアントをリストから除去する。
  sock.on('end', function(){
    clients.splice(clients.indexOf(sock), 1);
  });

  // 2. チャットルーム内の一人ひとりのクライアントにstreamをpipeでつなぐ。
  // 3. 新しいメンバーが来たら、その度にpipeでつなぎ、全員にStreamをつなぐ。
  clients.forEach(function(i){
    sock.pipe(i).pipe(sock);
  });

  // 4. 新しいメンバーをリストに追加する。
  clients.push(sock);
});

// 1137ポートでリッスンする。
server.listen(1337);

このファイルをchat.jsとかの適当な名前をつけて

$ node chat.js

で起動させた後、いくつかターミナルを起動して、

$ nc localhost 1337

という感じで接続すれば、簡易チャットができます。書いている行は少ないんですが、ある程度Streamのpipeに関する知識がないと何をしているのかひと目でわかりにくいかと思います。

pipeというのは


結構面白い概念で、簡単にいえば、読み込みデータを書き込みデータに流し込む役割をするメソッドです。
読み込みが早すぎて書き込みが追いつかない、そういう時には自動的に読み込みをストップさせてくれたり、ストップしたあとに書き込みの準備ができたら再流し込みをしてくれたりとpipeの中で良い感じにIOを調整してデータを取りこぼさないようにしてくれます。

んで、

clients.forEach(function(i){
  sock.pipe(i).pipe(sock);
});

ここが、なにやってるかというと、sockというのが接続開始した新しいクライアント、clientsには既にチャットに参加している既存クライアント達が居るという前提で

1. 新しいクライアント sock の読み込みStreamを既存のクライアント i の書き込みStreamに渡す。
2. 逆に既存クライアント i の読み込みStreamを新しいクライアント sock の書き込みStreamに渡す。

という方法でpipeだけで全クライアントを連結させているわけです。

つまり、

新しいクライアント.pipe(既存クライアント);
既存クライアント.pipe(新しいクライアント);

とやっているわけですね。pipe自身は第一引数であるdestinationをそのまま返すので、こういうpipeをつないだ書き方ができる訳です。

sock.pipe(i).pipe(sock);

これだけでターミナルから受け取った文字列を読み込みStreamから書き込みStreamに受け渡し、全クライアントへのブロードキャストを実現しているわけです。

endを伝搬させない。

ただし、残念ながら、このやり方だと、一つのクライアントがそのターミナルをctrl-cとかでクローズさせると他も同様にクローズされてしまいます。理由は簡単で、pipeの説明に書いてあるのですが、

Stream Node.js v0.10.4 Manual & Documentation

end()がpipeの接続先で呼ばれた場合、接続元に対してもendが呼ばれる、というのがデフォルトの動作になっているためです。

なので、既存クライアントのどれかがクローズした場合、それと数珠つなぎ状態になっているクライアントが全てendを受け取り、endイベントが伝搬されて全てのクライアントがクローズされてしまいます。

この動きを回避させるためには、endが呼ばれてもクローズされないようにオプションを渡す必要があります。

sock.pipe(i, {end:false}).pipe(sock, {end:false});

これで、ターミナルをクローズされても接続元にendは返りません。
endイベントの伝播を防ぐことができます。

Stream2の互換性を保つ

このままでもちょっとした欠陥を抱えています。主にNode v0.10以降、つまり Stream2でエラーに陥る事があります。

以下の手順を実行するとエラーが発生します。

1. 試しに1つだけクライアントを起動した状態でメッセージを送ってみて下さい。
2. そのクライアントをCtrl-Cで閉じてください。
3. 再度1つだけクライアントを起動させると先程記述されたメッセージが受信されると思います。
4. その状態で再度メッセージを送るとサーバー側でエラーが発生し、Nodeが例外を出力して落ちると思います。

ええ、まさに突然の死。です。

なんでこんなことが起きるかというと、Node v0.10のStream2では、dataを受け取れるStreamが存在しない場合に、受取可能な状態になるまで待ち続けます。これにより、dataの取りこぼしを防いでいるのですが、そうするとこの状態で、2のクライアントを閉じてもendイベントが受信されません。

つまり、2. でCtrl-cを押しても

//  end リスナーをセットアップ。endイベントが来たら、クライアントをリストから除去する。
  sock.on('end', function(){
    clients.splice(clients.indexOf(sock), 1);
  });

このendイベントの受信が実行されないので、clientsに残ったままになります。

その状態で3. で再度クライアントを起動させると、クローズしたクライアントと再起動させたクライアントがpipeで繋がれてdataが流れ始めます。さらにendが発生し、この段階で初めてendイベントで最初のクライアントが削除されます。

この段階でもう一回メッセージを送ろうとしてもpipeで繋がれたはずのクライアントはクローズされており、出力することが出来ないため、Nodeが例外を発生させます。

これを防ぐには、クライアントが1つの場合でも、resumeで明示的にデータを流して上げる必要があります。
この対処策は Nodeのapiの互換性の所に記述されてます。

Stream Node.js v0.10.4 Manual & Documentation

  sock.on('end', function(){
    clients.splice(clients.indexOf(sock), 1);
  });
  sock.resume();

resumeにより、データを明示的に流すことが可能です。これでStream2互換の動作が可能になりました。

というわけで完成形

var net = require('net');

// チャットクライアントのリスト
var clients = [];
var server = net.createServer(function(sock){

  // 1. end リスナーをセットアップ。endイベントが来たら、クライアントをリストから除去する。
  sock.on('end', function(){
    // sock.emit('data', "Goodbye!! ¥n"); //こんなふうにしておけば、クローズした時に全員にクローズしたことが通知される。
    clients.splice(clients.indexOf(sock), 1);
  });
  // resumeにより、書き込み可能なStreamがなくても明示的にdataを流すようにしておく。
  sock.resume();

  // 2. チャットルーム内の一人ひとりのクライアントにstreamをpipeでつなぐ。
  // 3. 新しいメンバーが来たら、その度にpipeでつなぎ、全員にStreamをつなぐ。
  // つなぐ際はendが伝搬しないようにする。
  clients.forEach(function(i){
    sock.pipe(i, {end:false}).pipe(sock, {end:false});
  });

  // 4. 新しいメンバーをリストに追加する。
  clients.push(sock);
});

// 1137ポートでリッスンする。
server.listen(1337);

まとめ


・Streamのpipeは読み込みと書き込みを良い具合に調整してくれるすぐれもの
・pipeでendを伝搬させないようにする方法、{end:false}
・Stream2の互換性をもたせるための注意点