yutopp's blog

サンドバッグになりたい

Boost.Asioまとめ(1)::io_service

Boost Advent Calndar 2011に恐縮ですが参加させて頂きました。15日目です。
最初はBoost.Asioについてまとめるぞーと意気込んでいたものの変に長くなってしまったのでBoost.Asioの中のio_serviceに絞ったためこんなタイトルに成り申した。

Boost.Asioとは

主にネットワークのI/Oのような時間のかかってしまう処理を非同期的かつ簡潔に扱えるようにした便利なライブラリです。
ネットワークを中心に、シリアルポート、タイマー、シグナルのハンドリングなども扱えます。

と、いうわけでio_serviceです。

Windows環境にてVC++10、Boost1.48.0を用いています。

io_service

全てはこのクラスに始まり、このクラスに終わります。
各OSの提供するI/O制御への橋渡しをしてくれるもので、Asioの提供するIOサービス(deadline_timerやip::tcp::socketなど)はこのクラスを必要とします。
ドキュメントによるとProactorという役割を担うものだそうで。
そしてこのクラスへの操作はスレッドセーフです。素敵ですね。
というわけで単体での一番単純なコードです。

#include <iostream>
#include <boost/asio.hpp>

int main() {
	boost::asio::io_service io;	//(1)

	io.post( [](){ std::cout << "post" << std::endl; } );		//(2)
	io.dispatch( []{ std::cout << "dispatch" << std::endl; } );	//(3)

	io.run();	//(4)
}

まず(1)でio_serviceを作成し、(2)と(3)でキューにハンドラを追加します。そして(4)で呼び出しを行います。
io_serviceのrun関数はこの時点で実行可能なハンドラを呼び出してくれるので、上のコードでは"post"、"dispatch"と順に出力されます。
postとdispatchの違いは後述します。
ちなみにハンドラを追加する際にコピーが行われますが、その動作を変更したい場合は各関数をオーバーロードすることで解決できます。もちろん呼び出しの動作も変更できます。

#include <iostream>
#include <boost/asio.hpp>

class hoge {
public:
	void operator()() const
	{
		std::cout << "hoge :: operator()!" << std::endl;
	}
};

inline void* asio_handler_allocate( std::size_t size, hoge* )
{
	std::cout << "custom allocator!" << std::endl;
	return ::operator new( size );
}

void asio_handler_deallocate( void* pointer, std::size_t size, hoge* )
{
	::operator delete( pointer );
	std::cout << "custom deallocator!" << std::endl;
}

template<class F>
void asio_handler_invoke( F f, hoge* )
{
	std::cout << "custom invocation!" << std::endl;
	f();
}

int main() {
	boost::asio::io_service io;
	const hoge h;

	io.post( h );
	io.run();
}

・・・シングルスレッドなのでいまいちありがたみを感じられませんね。
という訳でboost::threadを用いて非同期に動かしてみましょう。本領発揮です。奇蹟のカーニバルの開幕ですね。

マルチスレッドで

少し長くなってしまいましたが書き足しました。ついでに意図的に不具合を2つ含めてみました(それ以外のバグは知りません)。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

//数えるだけ
class counter
{
public:
	counter()
		: i_( 0 )
	{}

	void operator()()
	{
		std::cout << "count: " << i_ << std::endl;
		++i_;
	}

private:
	int i_;
};

int main() {
	std::cout << "main thread : " << boost::this_thread::get_id() << std::endl;

	//
	boost::asio::io_service io;

	//postで文字を出力させる(1)
	for( int i=0; i<5; ++i )
		io.post( []{ std::cout << boost::this_thread::get_id() << " : post." << std::endl; } );

	//スレッドを4つ作り、runを走らせる(2)
	boost::thread_group tg;
	for( int i=0; i<4; ++i )
		tg.create_thread( boost::bind( &boost::asio::io_service::run, &io ) );

	//1秒待つ(3)
	boost::this_thread::sleep( boost::posix_time::seconds( 1 ) );

	//数えてみたり(4)
	counter c;
	for( int i=0; i<5; ++i )
		io.post( std::ref( c ) );

	//文字を出力してみたり(5)
	for( int i=0; i<5; ++i )
		io.post(
			[&]{ io.dispatch(
				[]{ std::cout << boost::this_thread::get_id() << " : dispatch." << std::endl; }
				);
			} );

	//終わるのを待つ(6)
	tg.join_all();

	std::cout << "finished!" << std::endl;
}

(1)は先ほどと同様、(2)でスレッドを作成して、io_serviceのrun()メンバ関数を動かしています。
さて、(3)でメインスレッドを1秒スリープさせていますが、コレのせいでおそらく(4)(5)辺りが残念なことになります。というか動かないはずです。
原因は、メインスレッドが待機している間に他スレッドでrun()がキューにある実行可能なハンドラを呼び出し尽くして終了してしまっているためです。
そのため、この時点でio_serviceのstopped()メンバ関数はtrueを返します。
この問題はio_service::workというクラスを使うことで解決できます。

io_service::work

io_service::workはio_serviceに「お前にはまだ仕事があるぞ」と吹き込む感じの役割を果たしますので、workが1つでも存在する場合はrun()メンバ関数は働き続けることになります。
(ちなみにrun()はブロッキングを伴いキューに溜まっている物全てを、run_one()はキューに溜まっているものを1つだけ実行することを試み、完了できたハンドラの個数を返します。poll()、poll_one()はブロッキングを伴わないという違いだけでそれ以外は同様の挙動をします。
また、これらの関数はハンドラを実行する前に仕事が溜まっているかチェックし、無い場合はstop()を呼び出しています。
io_service::workは仕事が溜まっているように見せかけるため、各関数の呼び出し時にこのチェックを通りぬけ、ブロッキングを行うrun()とrun_one()は内部でstopフラグが立つまで無限ループ、ブロッキングを行わないpoll()とpoll_one()はガン無視で動作を終了します。なので、poll()またはpoll_one()を用いている場合、workは意味が無いです。)
ということでmain関数を書き足しました。

int main() {
	std::cout << "main thread : " << boost::this_thread::get_id() << std::endl;

	//
	boost::asio::io_service io;

	/// これ! ///
	boost::asio::io_service::work w( io );
	//////////////

	//postで文字を出力させる(1)
	for( int i=0; i<5; ++i )
		io.post( []{ std::cout << boost::this_thread::get_id() << " : post." << std::endl; } );

	//スレッドを4つ作り、runを走らせる(2)
	boost::thread_group tg;
	for( int i=0; i<4; ++i )
		tg.create_thread( boost::bind( &boost::asio::io_service::run, &io ) );

	//1秒待つ(3)
	boost::this_thread::sleep( boost::posix_time::seconds( 1 ) );

	//数えてみたり(4)
	counter c;
	for( int i=0; i<5; ++i )
		io.post( std::ref( c ) );

	//postとdispatch(5)
	for( int i=0; i<5; ++i )
		io.post(
			[&]{ io.dispatch(
				[]{ std::cout << boost::this_thread::get_id() << " : dispatch." << std::endl; }
				);
			} );

	//終わるのを待つ(6)
	tg.join_all();

	std::cout << "finished!" << std::endl;
}

これで(4)(5)もうまいこと実行されるようになりました。
また、run()が終了しなくなったために(6)で止まるようになります。終了させたい場合はio_serviceのstop()メンバ関数を呼び出すか、io_service::workのオブジェクトを破棄します。
io_service::workはデストラクト時に他に仕事がない場合はio_serviceのstop()メンバ関数を呼び出してくれるので、スマートポインタで保持しておくのも手です。

// #include <boost/make_shared.hpp> が必要!
auto w( boost::make_shared<boost::asio::io_service::work>( io ) );
/*...*/
w.reset();	//じゃあの

run()の実行が1度でも終了しているのならば、次にrun()を呼び出す際はその呼び出しより先にreset()メンバ関数を呼んでおく必要があるということにも注意して下さい。

さて、不具合は残り1つとなりました。(4)だけですね。
複数のスレッドで実行されるので勿論のこと競合してめちゃくちゃになってしまっているはずです。
そんなときこんなとき、役に立つのがio_service::strandというクラスです。ロック操作なんてわざわざ書く必要はありません。

io_service::strand

io_service::strandのpost()またはdispatch()メンバ関数でハンドラを追加するか、wrap()メンバ関数でハンドラをラップして先ほどのようにio_serviceにハンドラを追加する事によって、そのio_service::strandのオブジェクトごとにハンドラをシングルスレッド時と同様に動作させることができます。
ちなみにwrap()はdispatch()を包んだファンクタを返すだけのものです。
・・・またmain関数を書き足し。

int main() {
	std::cout << "main thread : " << boost::this_thread::get_id() << std::endl;

	//
	boost::asio::io_service io;
	boost::asio::io_service::work w( io );

	//postで文字を出力させる(1)
	for( int i=0; i<5; ++i )
		io.post( []{ std::cout << boost::this_thread::get_id() << " : post." << std::endl; } );

	//スレッドを4つ作り、runを走らせる(2)
	boost::thread_group tg;
	for( int i=0; i<4; ++i )
		tg.create_thread( boost::bind( &boost::asio::io_service::run, &io ) );

	//1秒待つ(3)
	boost::this_thread::sleep( boost::posix_time::seconds( 1 ) );

	/// これ! ///
	boost::asio::io_service::strand st( io );
	//////////////

	//数えてみたり(4)
	counter c;
	for( int i=0; i<5; ++i )
		io.post( st.wrap( std::ref( c ) ) );	////ここ!

	//postとdispatch(5)
	for( int i=0; i<5; ++i )
		io.post(
			[&]{ io.dispatch(
				[]{ std::cout << boost::this_thread::get_id() << " : dispatch." << std::endl;
				} );
			} );

	//終わるのを待つ(6)
	tg.join_all();

	std::cout << "finished!" << std::endl;
}

これで満足に動くようになりました。このドツボにハマってた3年前の自分に読ませたい。

post()とdispatch()

さて、後回しにしていたpost()とdispatch()の違いです。
post()は呼び出された際に単純にキューにハンドラを追加しますが、dispatch()は呼び出された際に呼び出したスレッドでrun()やrun_one()、poll()、poll_one()が動いているならばキューにハンドラを追加すること無くロックを掛け即座にハンドラの呼び出しを行います。
ですので、post()の中でdispatch()を呼び出すようなハンドラを追加するとき(上のコードの(5)の所)は、同期・非同期関係なくdispatch()内のハンドラが即時呼び出されます。
以下のコードで確かめる事ができます。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

int main() {
	std::cout << "main thread : " << boost::this_thread::get_id() << std::endl;

	//
	boost::asio::io_service io;

	//
	boost::thread_group tg;

	//postとpost
	std::cout << "- post -> post ------- " << std::endl;
	for( int i=0; i<5; ++i )
		io.post(
			[&,i]{
				const int id = i;
				std::cout << boost::this_thread::get_id() << " : outer(" << id << ")" << std::endl;
				io.post( [id]{ std::cout << boost::this_thread::get_id() << " : inner(" << id << ")" << std::endl;
				} );
			} );
	//
	for( int i=0; i<4; ++i )
		tg.create_thread( boost::bind( &boost::asio::io_service::run, &io ) );
	tg.join_all();

	//postとdispatch
	std::cout << "- post -> dispatch ------- " << std::endl;
	for( int i=0; i<5; ++i )
		io.post(
			[&,i]{
				const int id = i;
				std::cout << boost::this_thread::get_id() << " : outer(" << id << ")" << std::endl;
				io.dispatch( [id]{ std::cout << boost::this_thread::get_id() << " : inner(" << id << ")" << std::endl;
				} );
			} );
	//
	for( int i=0; i<4; ++i )
		tg.create_thread( boost::bind( &boost::asio::io_service::run, &io ) );
	tg.join_all();
}

出力

main thread : 00254EF0
- post -> post -------
00254F98 : outer(0)
00255470 : outer(2)
002551E0 : outer(3)
00255008 : outer(1)
002551E0 : inner(0)
00254F98 : outer(4)
00254F98 : inner(3)
00255008 : inner(4)
00254F98 : inner(1)
00255470 : inner(2)
- post -> dispatch -------
00255170 : outer(0)
00255008 : outer(1)
002550C0 : outer(2)
00255170 : inner(0)
00255008 : inner(1)
002550C0 : inner(2)
00255170 : outer(3)
00255008 : outer(4)
00255170 : inner(3)
00255008 : inner(4)

上手い事いけばこのように整って表示され、どのスレッドでどの番号順にハンドラが呼ばれたか確認することができます。
また、

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING

とAsioを関連のファイルをインクルードする前に定義することによって、動作を追跡出来るようになるので便利です。

最後に

なんだか誰もが知ってるような事ばかりになってしまいました。すみません;
これだけでは物足りないのでタイマーやソケットに関しては別にまとめます!
AsioのネットワークとBoost.Spirit、Serializationなどの組み合わせは最強だと思います。
今回参加させていただいてAsioのコードを読む機会ができ、ためになりましたっ。OSごとの実装の分け方も詳しく調べたいところです。しかしWindowsのAPIはパワフルですね。


16日目は @izmktr さんです。よろしくお願いします。
io.post( []{ read( @izmktr ); } );