このページではメッセージキューについて解説します。
具体的には、POSIX のメッセージキューに基づいて解説を行っていきます。なので、POSIX 準拠の Linux OS を利用している方であれば、このページで紹介するソースコードはそのまま利用可能です。Windows や Mac の場合でも同様の機能を持つメッセージキュー(Windows は MSMQ)が利用可能だと思いますので、考え方等は参考になると思います。
メッセージキューとは
メッセージキューとは、複数のスレッド間や複数のプロセス間で非同期的にメッセージをやり取りするための仕組みの1つとなります。
名前に “キュー” が付いている通り、メッセージキューの実体はキューになります。そして、メッセージキューでは、このキューを介してメッセージのやり取りを行うことになります。キューに関しては下記ページで詳細を解説していますので、詳しく知りたい方は下記ページを参照していただければと思います。
【C言語/データ構造】スタックとキューの配列での実装方法キューとは
キューは FIFO (Firt In, First Out) を実現するデータ構造であり、イメージとしては待ち行列に近いです。基本的には、行列に並ぶときには行列の末尾から並び、先頭の人から順にサービスを受けることになります。これにより、並んだ人から順にサービスを受けることができることになります。
同様に、キューへのデータの格納もキューの末尾から順にデータが格納されていくことになります。そして、データの取り出しに関しては、キューの先頭から順にデータが取り出されるようになっています。これらのキューへのデータの格納をエンキュー、キューからのデータの取り出しをデキューと呼びます。
基本的には、この考えで良いのですが、POSIX のメッセージキューの場合はデータに優先度をつけることができ、優先度の高いデータを先頭から順に並べるようなことも可能です。この場合もデータを取り出すときは先頭から順にデータが取り出されるため、優先度の高い順にデータを取り出すことができることになります。
スポンサーリンク
キューを介したメッセージのやり取り
前述のとおり、メッセージキューでは先ほど説明したキューを介してスレッドやプロセスの間でメッセージのやり取りが行われます。
具体的には、メッセージ送信側のスレッドやプロセスはメッセージキューにメッセージをエンキューし、そして、メッセージ受信側のスレッドやプロセスはメッセージキューからメッセージをデキューすることで、送信側から受信側へのメッセージの送信が実現されることになります。
キューには複数のメッセージを格納することが可能で、送信側がキューに複数のメッセージをエンキューした場合、受信側はデキューによって送信側がエンキューしたから順に従ってメッセージを受信することができます。
このように、単にメッセージのやり取りが行うことが出来るだけでなく、送信側がキューに格納した順序にしたがって受信側がメッセージを取り出せる、つまりメッセージの送信順序とメッセージの受信順序が同じになることが保証されているという点がメッセージキューの特徴になります。
さらに、POISX のメッセージキューにおいては、各メッセージに対して優先度を設定することも可能です。メッセージに優先度を設定することで、キューの先頭側から順に優先度の高いメッセージが並べられるようキュー内のメッセージがソートされることになるため、受信側は優先度の高い順にメッセージをデキューすることができるようになります。
生産者・消費者モデルの実現が容易
また、メッセージキューを利用することで生産者・消費者モデルのプログラム・システムの実現が容易となります。
ここでいう “生産者” とは仕事を生み出すスレッドやプロセスで、“消費者” とは仕事を実行するスレッドやプロセスのことになります。こういった関係性をスレッドやプロセスに割り当てる設計モデルを生産者・消費者モデルと呼びます。
例えば、飲食店での料理の注文で考えれば、ウェイターさんが生産者でコックさんが消費者となり、仕事は料理の注文票と考えることが出来ます。ウェイターさんはお客様から注文を受け付け、その注文を書いた注文票を掲示板に貼ります。注文票が貼られれば、その注文票を取ってコックさんが注文票にしたがって料理を作ります。こんな感じで、仕事を生み出す生産者と仕事を実行する消費者の位置づけで設計する設計モデルを生産者・消費者モデルと呼びます。
で、この生産者・消費者モデルにおいては、生産者が生み出した仕事を消費者に渡すための手段が必要で、そこにメッセージキューを採用することで、生産者・消費者モデルが実現しやすくなります。生産者は、消費者に実行して欲しい仕事を示すデータをメッセージとしてメッセージキューにエンキューする、消費者はメッセージキューからメッセージをデキューし、そのメッセージにしたがって仕事を実行するようにプログラミングしてやれば良いだけです。
キューの特性上、エンキューされた順序でメッセージがデキューされることになるので、自然と仕事を生み出した順序で仕事が実行されていくことになります。さらに、優先度を設定してやれば、自然と優先度順に仕事を実行させるようなことも可能になります。
さらに、メッセージキューは複数のメッセージを溜めておくことが可能です。そのため、生産者が仕事を生み出すペースが消費者が仕事を完了させるペースより速くても、生産者の仕事を生み出す処理を待たせるような制御をわざわざ導入する必要はありません。メッセージキューにはメッセージを複数溜めておけるので、生産者は消費者のペースを気にせず、生産者のペースで仕事を生み出してエンキューを行えば良いのです。つまり、生産者は生産者のペースで仕事を生み出し、消費者は消費者のペースで仕事を実行すれば良いだけになります。
ただし、生産者の仕事の生み出すペースが極端に速いとメッセージキューがすぐに満杯になることになります。メッセージキューを用いた生産者・消費者モデルでは、この問題の解決も容易に行うことができます。具体的には、単に消費者側のプロセス数やスレッド数を増やして並列度を上げてやることで消費者の仕事を実行するペースを上げることができます。
このように、メッセージキューは生産者・消費者モデルと相性が良いです。これらはセットで覚えておくとよいと思います。
コーディング面について補足しておくと、生産者側は基本的には無限ループを組んで “仕事を生み出してメッセージキューにエンキューする” という一連の処理を延々と続け、消費者側も無限ループを組んで “メッセージキューからデキューして仕事を実行する” という一連の理を延々と続ける形で実装を行うことが多いです。
メッセージキューの使い方
続いてメッセージキューを使用する流れについて解説していきます。関数名を使って説明していきますが、各関数の詳細に関しては次の章の メッセージキューの関数 で説明します。まずはメッセージキューを使用する際の処理の流れを理解しましょう!
スポンサーリンク
メッセージキューを作成する
メッセージキューを利用するためには、まずメッセージキューの作成が必要になります。このメッセージキューの作成は mq_open
関数で実現できます。
メッセージキューを作成するときには、キューのサイズ(キューに格納できるメッセージの数)やメッセージ自体の最大サイズを指定することが可能です。
メッセージキューを mq_open
関数で作成したら同時にメッセージキューが開かれることになるため、次の メッセージキューを開く を別途実行する必要はありません。
メッセージキューを開く
メッセージキューをプログラムから利用するためには、メッセージキューを事前に開いておく必要があります。このメッセージキューを開く処理も mq_open
関数で行うことができ、mq_open
関数で “メッセージキューを作成する” or “単に開く” のどちらを行うのかについて引数の指定によって制御可能です。
前述のとおり、このメッセージキューは複数のスレッド間や複数のプロセス間(スレッドやプロセスが混合しても OK)でメッセージのやり取りを行うためのものになりますので、特にプロセス間でメッセージのやり取りを行うのであれば、送信側と受信側の両方のプロセスで mq_open
関数を実行して “同じメッセージキュー” を開く必要があります。
同じメッセージキューを開く際に必要になるのがメッセージキューを見分けるための 名前
になります。mq_open
関数の引数には 名前
が指定できるようになっていますので、送信側と受信側とで同じ 名前
を引数にして mq_open
を実行すれば、同じメッセージキューを開くことが出来ます。あとは、このメッセージキューを介してメッセージのやり取りを行えば良いだけです。
送信側と受信側とで指定する 名前
が異なれば、送信側と受信側とで異なるメッセージキューを開くことになってしまい、メッセージのやり取りが行えなくなってしまうので注意が必要です。
また、mq_open
関数が成功した場合の返却値は開いたメッセージキューを識別するための識別子(mqd
・ディスクリプタ)となります。メッセージキューを開いた後は、この mq_open
関数の返却値を関数に指定して操作するメッセージキューを指定することになります。
つまり、メッセージキューを開く際には 名前
で開くメッセージキューを指定する必要があり、開いたメッセージキューを操作する際には mq_open
関数の返却値で操作対象のメッセージキューを指定する必要があって、それぞれでメッセージキューを見分けるための識別子が異なる点に注意してください。
スポンサーリンク
メッセージをエンキューする(メッセージを送信する)
メッセージキューを開いてしまえば、メッセージのやり取りを行う準備が整ったことになります。つぎは、このメッセージのやり取りを行う “メッセージの送信” 及び “メッセージの受信” について説明していきます。
メッセージの送信に関しては mq_send
関数により実現できます。この mq_send
関数を実行することで、メッセージキューにメッセージをエンキューすることができます。
デフォルト設定では、メッセージキューが満杯の状態で mq_send
関数を実行した場合、メッセージキューのエンキューに成功するまで mq_send
関数内部で待ち続けることになります。つまり、メッセージキューからメッセージがデキューされるまで mq_send
関数は待機し続けます。
スポンサーリンク
メッセージをエンキューする(メッセージを送信する)
メッセージの受信に関しては mq_receive
関数により実現できます。この mq_receive
関数を実行することで、メッセージキューからメッセージをデキューすることができます。デキューしたメッセージはメッセージキューから消滅します。
また、デフォルト設定では、メッセージキューが空の状態で mq_receive
関数を実行した場合、メッセージのデキューに成功するまで mq_receive
関数内部で待ち続けることになります。つまり、メッセージキューにメッセージがエンキューされるまで mq_receive
関数は待機し続けます。
メッセージキューを閉じる・削除する
メッセージの送信や受信を繰り返し行い、全てのメッセージのやり取りが完了したらメッセージキュー閉じてやりましょう。これは mq_close
関数によって実現可能です。ただし、これは単にプログラムからメッセージキューを閉じるだけであり、メッセージキュー自体が削除されるというわけではありません。なので、メッセージキューをクローズした後も、再度メッセージキューを開けばメッセージのやり取りを再開することが出来ます。
メッセージキューを削除したいのであれば mq_close
実行後に mq_unlink
関数を実行してやれば良いです。
以上がメッセージキューを利用する際の処理の基本的な流れとなります。
メッセージキューの関数
続いてメッセージキューを利用するときに使用する関数を紹介していきます。
特によく利用する関数は下記の5つです。まずはこの5つの関数について覚えておきましょう。
mq_open
:メッセージキューを開く or 作成するmq_send
:メッセージをエンキューするmq_receive
:メッセージをデキューするmq_close
:メッセージキューを閉じるmq_unlink
:メッセージキューを削除する
これらの関数を利用するためにはヘッダーファイルのインクルードとライブラリへのリンクが必要となりますので、まずはこれらについて説明を行い、その後上記の関数を個別に紹介していきたいと思います。
スポンサーリンク
関数利用時のコンパイル・リンク
では、メッセージキュー利用時に必要となるヘッダーファイルのインクルードとライブラリへのリンクについて説明します。
インクルードするヘッダーファイル
メッセージキューに関する関数は mqueue.h
でプロトタイプ宣言されています。なので、メッセージキューを利用する際には mqueue.h
のインクルードが必須となります。また、メッセージキュー作成時には作成するメッセージキューのモード(権限)を指定する必要があり、モードのパラメーターが sys/stat.h
で定義されているため、必要に応じて sys/stat.h
のインクルードも必要となります。
#include <mqueue.h>
#include <sys/stat.h> // 必要に応じて
リンクするライブラリーファイル
さらに、メッセージキューに関する関数の実体は rt
ライブラリに存在するため、リンク時には -lrt
オプションを指定する必要があります。
例えば、メッセージキューを利用するプログラムのソースコードが producer.c
である場合、gcc
を使ってコンパイル・リンクを行うのであれば下記のようなコマンドを実行する必要があります。
gcc producer.c -lrt -o producer.exe
mq_open
続いて、メッセージキュー関連の関数の個別の説明に移ります。
まず、mq_open
は、第1引数で指定した名前の “メッセージキューを新規に作成” する、もしくは、第1引数で指定した名前の “既に存在するメッセージキューを開く” 関数になります。
mq_open
関数の第2引数で指定するフラグに O_CREAT
が含まれている場合、第1引数で指定した名前のメッセージキューがシステム内に “存在しなければ”、新規にメッセージキューが生成されることになります。既に存在していれば、新規に作成されることなく単にメッセージキューが開かれることになります。
また、第2引数で指定するフラグに O_CREAT
が含まれている場合は、第3引数にモード(パーミッション)を、さらに第4引数に新規作成するメッセージキューの設定 or NULL
を指定する必要があります。O_CREAT
が含まれていないのであれば、第3引数と第4引数は不要になります。
つまり、第2引数で指定するフラグによって mq_open
関数に指定すべき引数が異なることになります。
具体的には、第2引数で指定するフラグに O_CREAT
が含まれている場合は下記のように4つの引数を指定する必要があります。
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
それ以外は、下記のように2つの引数のみを指定してやれば良いです。
mqd_t mq_open(const char *name, int oflag);
続いて、各引数について説明していきます。
mq_open
の第1引数 name
まず第1引数 name
にはメッセージキューの名前を文字列で指定します。この名前はメッセージキューを識別するためのデータとなり、mq_open
の実行によって、この名前が付けられたメッセージキューの作成が行われる or メッセージキューが開かれることになります。この name
の先頭の文字は /
である必要があります。
mq_open
の第2引数 oflag
第2引数 oflag
ではメッセージキューの開き方をフラグとして指定します。下記の3つのフラグのうちの1つは指定が必須です。
O_RDONLY
:受信専用キューとして開くO_WRONLY
:送信専用キューとして開くO_RDWR
:受信・送信両方可能なキューとして開く
メッセージキューへのデータの送信のみを行うプログラムであれば oflag
に O_WRONLY
を、メッセージキューからのデータの受信のみを行うプログラムであれば oflag
に O_RDONLY
を、送受信両方を行プログラムであれば oflag
に O_RDWR
を指定して mq_open
を実行する感じですね。
oflag
には上記の3つのうちのいずれかのフラグを指定することが必須で、他にも下記のようなフラグを指定することが可能です。複数のフラグを指定したい場合は、各フラグを |
の OR 演算子で区切る形で指定する必要があります。
O_CREAT
:キューが存在しないのであれば新規作成して開くO_NONBLOCK
:ノンブロッキングモードのキューとして開く
例えば、メッセージキューを “ノンブロッキングモード” で “送信専用” で開く、かつ、開こうとしているキューが存在しないのであれば “新規作成” したいのであれば下記のように oflag
を指定して mq_open
を実行する必要があります。
O_WRONLY | O_NONBLOCK | O_CREAT
ノンブロッキングモードについては、後述の mq_send および mq_receive で説明を行います。
mq_open
の第3引数 mode
第3引数 mode
では作成するメッセージキューのパーミッション(権限)を指定します。このパーミッションのパラメーターとしては、sys/stat.h
で定義されている S_IRUSR
や S_IRWXU
等の定義値を指定することが可能です。ただ、Linux や Mac の chmod
コマンドと同様に数値で 0666
等のパーミッションを指定することも可能です。chmod
コマンドに慣れている方は数値で指定する方が楽かなぁと思います。
mq_open
の第4引数 attr
第4引数 attr
では作成するメッセージキューの設定を指定することができます。第4引数 attr
の型は struct mq_attr *
なので、この struct mq_attr
型の変数を用意し、その変数のメンバーに各種設定値をセットしてから、その変数のアドレスを attr
として指定してやれば良いことになります。
struct mq_attr
は下記のように定義された構造体であり、この定義を見ていただければ分かる通り、mq_open
実行時には mq_maxmsg
と mq_msgsize
しか参照されないため、これらの2つのメンバーのみに値をセットして引数に指定してやれば良いです。mq_maxmsg
にはキューのサイズ(キューに格納可能なメッセージの個数)、mq_msgssize
には各メッセージのサイズの最大値を指定します。
struct mq_attr {
long mq_flags; /* Flags (ignored for mq_open()) */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue (ignored for mq_open()) */
};
また、第4引数 attr
には NULL
を指定することも可能で、この場合はシステムのデフォルト値に基づいてメッセージキューが作成されることになります。
mq_open
で作成される管理ファイル
mq_open
でメッセージキューを作成すると、もしかしたらプログラムを実行する環境によって異なるかもしれませんが、下記のフォルダに name
引数に指定した文字列と同じ名前のファイルが生成されることになります。
/dev/mqueue/
mq_open
の返却値
メッセージキューを開くことに成功した場合、mq_open
関数は正の整数を返却します。この返却値は開いたメッセージキューのディスクリプタであり、mq_send
、mq_receive
、mq_close
でメッセージキューの操作を行う際に、操作対象のメッセージキューを指定することを目的に、この mq_open
の返却値を引数で指定する必要があります。
また、メッセージキューを開くことに失敗した場合は mq_open
関数は -1
を返却します。この場合、エラーの原因を示す値が errno
にセットされます。
errno
に関しては下記ページで詳細を解説していますので、詳しくは下記ページを参照いただければと思います。関数のエラーの原因を知りたい場合も多いと思いますので、errno
を使いこなせるようにしておきましょう!
mq_open
利用時の注意点
既に name
引数で指定した名前のメッセージキューが存在する場合、mq_open
の oflag
引数に O_CREAT
を指定していてもメッセージキューが再度作られるというわけではなく、単にメッセージキューが開かれるだけになります。
なので、既に name
引数で指定した名前のメッセージキューが存在すると、メッセージキューの設定を変更しようと思って第3引数や第4引数に指定するデータを変化させて再度 mq_open
を実行したとしても、その設定の変更はメッセージキューには反映されないことになるので注意してください。設定を変更する場合は、後に説明する mq_unlink
を利用する or /dev/mqueue
以下に存在するメッセージキューの管理ファイルを削除する必要があります(もしくは、引数 name
に指定するメッセージキューの名前を変更するのでも良いです)。
mq_send
mq_send
はメッセージキューにメッセージをエンキューすることで受信側に対してメッセージを送信する関数になります。
mq_send
関数のプロトタイプ宣言は下記のとおりです。
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
mq_send
の第1引数 mqdes
第1引数 mqdes
にはエンキュー先のメッセージキューのディスクリプタを指定します。メッセージキューのディスクリプタは mq_open
関数の返却値として得られますので、つまりは mq_open
関数の返却値を指定してやれば良いことになります。
mq_send
の第2引数 msg_ptr
第2引数 msg_ptr
にはメッセージキューにエンキューしたい、つまりは相手に送信したいデータの先頭アドレスを指定します。引数の型は const char *
型ではありますが、構造体等のデータの先頭アドレスも指定可能です。
mq_send
の第3引数 msg_len
第3引数 msg_len
にはメッセージキューにエンキューしたいデータのサイズを指定します。単位はバイトです。このサイズは、mq_open
関数でメッセージキューを作成したときに第4引数 attr
のメンバー msg_size
で指定したメッセージのサイズ以下である必要があります。
mq_send
を実行した際には、第2引数 msg_ptr
のアドレスを先頭とする msg_len
バイトのデータがメッセージキューにエンキューされることになります。
mq_send
の第4引数 msg_prio
第4引数 msg_prio
には、エンキューするメッセージの優先度を設定します。POSIX のメッセージキューのキューは優先度設定可能になっており、単なる先入先出方式でデータの格納・取り出しが行われるわけではなく優先度に基づいてキュー内のデータの並びが変化するようになっています。具体的には、POSIX のメッセージキューの場合、第4引数 msg_prio
に指定した値が大きいメッセージがキューの先頭側から降順に並ぶようになっています。デキューはキューの先頭から順に行われていきますので、第4引数 msg_prio
に大きな値を指定してエンキューしたメッセージほど先にデキューされることになります。
mq_prio
の値が同じメッセージが複数エンキューされた場合は、基本的なキュー同様に、先にエンキューされたメッセージから先にデキューされることになります。つまり、エンキューした順にメッセージをデキューされるようにしたいのであれば、全てのメッセージのエンキュー時、すなわち mq_send
には mq_prio
に毎回同じ値を指定する必要があることになります。逆にいうと、mq_prio
の値がバラバラの状態で mq_send
を実行してしまうとエンキューした順序でのデキューが実現できなくなることになります。
mq_send
の返却値
メッセージの送信、すなわちメッセージキューへのメッセージのエンキューに成功した場合、mq_send
関数は 0
を返却し、失敗した場合は -1
を返却します。mq_send
から -1
が返却された場合は、メッセージの送信に失敗した理由を示す値が errno
にセットされます。
mq_send
のノンブロッキングモード時の動作
mq_open
で第2引数 oflag
に O_NONBLOCK
が指定された状態で開かれたメッセージキューを利用する場合、mq_send
関数はノンブロッキングモードで動作することになります。O_NONBLOCK
が指定されていなければブロッキングモードで動作します。
ブロッキングモードの場合は、メッセージキューが満杯でエンキューできなければ mq_send
関数はエンキューできるまで、すなわちメッセージキューからのメッセージのデキューが受信側で実行されるまで待ち続けることになります。
それに対し、ノンブロッキングモードの場合は、メッセージキューが満杯でエンキューできなければ mq_send
関数は直ちに終了することになります。この時の mq_send
関数の返却値は -1
となり、errno
には EAGAIN
がセットされることになります。
メッセージキューが満杯のときに単にエンキューできるようになるまで待ち続けるのでよいのであればブロッキングモードを利用すればよいですし、メッセージキューが満杯の時に何かしらの処理をすぐに行いたい、例えばユーザーに満杯になったことを通知したいような場合はノンブロッキングモードを利用すれば良いと思います。
スポンサーリンク
mq_receive
mq_receive
はメッセージキューからメッセージをデキューすることで送信側から送信されてきたメッセージを受信する関数になります。
mq_receive
関数のプロトタイプ宣言は下記のとおりです。
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
mq_receive
の第1引数 mqdes
第1引数 mqdes
にはデキュー先のメッセージキューのディスクリプタを指定します。mq_send
関数と同様に、メッセージキューのディスクリプタは mq_open
関数の返却値として得られますので、つまりは mq_open
関数の返却値を指定してやれば良いことになります。
mq_receive
の第2引数 msg_ptr
第2引数 msg_ptr
にはメッセージキューからデキューしたデータを、つまりは相手から受信したデータを格納するバッファーや変数の先頭アドレスを指定します。引数の型は char *
型ではありますが、構造体等のデータの先頭アドレスも指定可能です。
メッセージキューにエンキューされているメッセージのサイズの最大値は、メッセージキュー作成時に mq_open
関数の第4引数 attr
で指定した msg_size
となります。したがって、第2引数 msg_ptr
を先頭アドレスとするバッファーや変数のサイズは、この msg_size
以上である必要があります。
mq_receive
の第3引数 msg_len
第3引数 msg_len
には第2引数 msg_ptr
を先頭アドレスとするバッファーや変数のサイズを指定します。前述のとおり、このサイズは メッセージキュー作成時に mq_open
関数の第4引数 attr
で指定した msg_size
以上である必要があります。
mq_receive
の第4引数 msg_prio
第4引数 msg_prio
には、デキューしたメッセージの優先度の格納先のバッファーや変数の先頭アドレスを指定します。もし、優先度の取得が不要であれば第4引数 msg_prio
には NULL
を指定してやれば良いです。
mq_send の第4引数 msg_prio で説明したように、デキューされたメッセージは、そのデキューした時点でメッセージキューに存在するメッセージの中で最大優先度のものになりますので、msg_prio
の値をわざわざ取得しなくても優先度に応じたデキューの順序制御は勝手に行われることになります。
なので、基本的には第4引数 msg_prio
には NULL
を指定するので問題ないと思います。送信側で設定されたメッセージの優先度の具体的な値を知りたいような場合は、msg_prio
に unsigned int
型の変数のアドレスを指定してやりましょう。
mq_receive
の返却値
メッセージの受信、すなわちメッセージキューへのメッセージのデキューに成功した場合、mq_receive
関数はデキューしたメッセージのサイズを返却し、失敗した場合は -1
を返却します。mq_receive
から -1
が返却された場合は、メッセージの送信に失敗した理由を示す値が errno
にセットされます。
mq_receive
のノンブロッキングモード時の動作
mq_send
同様に、mq_receive
もノンブロッキングモードで動作させることが可能です。ノンブロッキングモードについては基本的には mq_send
と同じなので詳細に関しては mq_send のノンブロッキングモード時の動作 を参照してください。
mq_receive
の場合は、ノンブロッキングモードとブロッキングモードとで、メッセージキューが空の時に mq_receive
実行時の動作が変化することになります。
mq_close
mq_close
は開いているメッセージキューを閉じる関数になります。
mq_close
関数のプロトタイプ宣言は下記になります。
int mq_close(mqd_t mqdes);
mq_close
の第1引数 mqdes
mq_close
の第1引数には、閉じたいメッセージキューのディスクリプタを指定します。要は、mq_open
の返却値を指定してやれば良いです。
mq_close
の返却値
メッセージキューを閉じることに成功した場合、mq_close
関数は 0
を返却し、失敗した場合は -1
を返却します。
mq_close
から -1
が返却された場合は、メッセージキューを閉じることに失敗した原因を errno
から調べることができます。
mq_unlink
mq_unlink
はメッセージキューを削除する関数になります。
先ほど紹介した mq_close
ではメッセージキュー自体やメッセージは削除されません。なので、次回同じ名前を引数に指定して mq_open
でメッセージキューを開いた場合は、前回 mq_close
を実行したときと同じ状態になっています。なので、送信側が mq_send
を実行していないのに受信側が mq_receive
でいきなりメッセージをデキューできたりします。
こういった動作を望んでいるのであれば、わざわざメッセージキューを削除する必要はないのですが、プログラム開始時はメッセージキューをまっさらな状態にしておきたいような場合や、メッセージキューの設定を変更するためにメッセージキューを作り直したいような場合は mq_unlink
によるメッセージキューの削除が必要となります。
プログラムを終了した後もメッセージを残したいような場合を除いては、基本的にはプログラムの最初と最後で mq_unlink
を実行してメッセージキューを削除してやるのが良いと思います。
mq_unlink
関数のプロトタイプ宣言は下記になります。
int mq_unlink(const char *name);
mq_unlink
の第1引数 name
mq_unlink
の第1引数には、削除したいメッセージキューの名前を指定します。要は、メッセージキュー作成時に mq_open
の第1引数に指定した名前を指定してやれば良いです。
mq_unlink
の返却値
メッセージキューの削除に成功した場合、mq_unlink
関数は 0
を返却し、失敗した場合は -1
を返却します。
mq_unlink
から -1
が返却された場合は、メッセージキューの削除に失敗した原因を errno
から調べることができます。
スポンサーリンク
メッセージキューを利用したサンプルプログラム
最後に、メッセージキューを利用したサンプルプログラムのソースコードを紹介していきます。ここでは生産者・消費者モデルに基づいて設計を行ったプログラムのソースコードを紹介していきます。基本的にはマルチプロセス(複数のプログラム)で動作させるプログラムを紹介しますが、最後にマルチスレッドで動作させるプログラムのソースコードも紹介します。
また、生産者側ではユーザーから下記の3つの入力を受け付け、これらの情報をセットしたメッセージをメッセージキューにエンキューするようにしています。
- 1つ目の整数
- 2つ目の整数
- 演算子
メッセージは、下記の message.h
で定義される MESSAGE
構造体のデータとしており、各メンバーに上記で入力された整数や演算子の enum
値をセットし、この構造体のデータをメッセージキューにエンキューするようにしていきます。そして、消費者側はメッセージキューからメッセージをデキューし、そのメッセージに基づいて演算を行ってその結果を printf
で出力するようにしていきます。
この message.h
は、以降で紹介するソースコードからインクルードされるようになっているため、下記をコピペして他のソースコードと同じフォルダに message.h
というファイル名で保存しておいてください。
#ifndef _MESSAGE_H_
#define _MESSAGE_H_
typedef enum {
E_SUM = 0,
E_SUB,
E_MUL,
E_DIV,
} OPERATION;
typedef struct {
int first_num;
int second_num;
OPERATION operation;
} MESSAGE;
#endif // _MESSAGE_H_
ちなみに、構造体や enum
(列挙型) に関しては下記ページで解説していますので、詳細を知りたい方は別途下記ページを参照していただければと思います。
基本的なメッセージキューの利用例
まずは、基本的なメッセージキューの利用例となるサンプルプログラムのソースコードを紹介していきます。前述のとおり、これらは生産者・消費者モデルに基づいて設計を行ったプログラムとなっており、プログラム自体が2つに分かれています。
生産者のソースコード(producer.c
)
まずは、生産者側のソースコード producer.c
を示していきます。
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"
// キューの名前
#define QUEUE_NAME ("/sample_queue")
// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))
int main(void) {
// キューを事前に削除しておく
mq_unlink(QUEUE_NAME);
// キューの設定
struct mq_attr attr;
attr.mq_maxmsg = 3;
attr.mq_msgsize = MSG_SIZE;
// キューを生成して書き込みモードで開く
mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0666, &attr);
if (queue == -1) {
// キューの作成に失敗
printf("%s\n", strerror(errno));
return -1;
}
while (1) {
// 2つの整数の入力
int first_num, second_num;
printf("first number : ");
scanf("%d", &first_num);
printf("sencond number : ");
scanf("%d", &second_num);
// 演算子の入力
int operation;
printf("operation(0:+,1:-,2:*,3:/) : ");
scanf("%d", &operation);
if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
// 演算子が不正ならプログラム終了
printf("QUIT!\n");
break;
}
// メッセージの生成
MESSAGE message;
message.operation = operation;
message.first_num = first_num;
message.second_num = second_num;
// メッセージの送信
int ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
if (ret == -1) {
printf("%s\n", strerror(errno));
}
}
// メッセージキューを閉じて削除
mq_close(queue);
mq_unlink(QUEUE_NAME);
}
消費者のソースコード(consumer.c
)
消費者側のソースコードとなる consumer.c
は下記となります。
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"
#define QUEUE_NAME ("/sample_queue")
int calc(MESSAGE *message) {
int result;
// operationに従った計算を実行
switch(message->operation) {
case E_SUM:
result = message->first_num + message->second_num;
break;
case E_SUB:
result = message->first_num - message->second_num;
break;
case E_MUL:
result = message->first_num * message->second_num;
break;
case E_DIV:
if (message->second_num == 0) {
result = 0;
} else {
result = message->first_num / message->second_num;
}
break;
default:
result = 0;
break;
}
return result;
}
int main(void) {
// キューを読み取り専用モードで開く
mqd_t queue = mq_open(QUEUE_NAME, O_RDONLY);
if (queue == -1) {
printf("%s\n", strerror(errno));
return -1;
}
while (1) {
// メッセージ格納先の変数
MESSAGE message;
// メッセージをデキューして取得
ssize_t ret = mq_receive(queue, (char*)&message, sizeof(message), NULL);
if (ret == -1) {
printf("%s\n", strerror(errno));
}
// メッセージに従った処理を実行
int result = calc(&message);
printf("result = %d\n", result);
}
mq_close(queue);
}
動作確認
ここで示した producer.c
と consumer.c
の動作確認を行っていきたいと思います。
まずは、下記の2つのコマンドを実行して2つのソースコードをコンパイルし、実行可能ファイルとなる producer.exe
と consumer.exe
を生成してください。
gcc producer.c -lrt -o producer.exe gcc consumer.c -lrt -o consumer.exe
続いて、ターミナルアプリを2つ起動し、それぞれのターミナルで下記コマンドを実行して producer.exe
と consumer.exe
をそれぞれ別のターミナルで起動します。
./producer.exe
./consumer.exe
続いて、producer.exe
を起動した方のターミナルに整数を2つと演算子の指定を行います。
./producer.exe first number : 200 sencond number : 57 operation(0:+,1:-,2:*,3:/) : 1
すると、consumer.exe
を起動した方のターミナルに、producer.exe
側に指定した整数や演算子を用いた計算の結果が出力されていることが確認できると思います。
./consumer.exe result = 143
producer.exe
側で入力した整数や演算子に基づいた計算を consumer.exe
が実行できているのは、producer.exe
がユーザーから入力された整数2つや演算子をメッセージとしてメッセージキューにエンキューし、さらに、そのメッセージを consumer.exe
がデキューすることで 、それらの整数や演算子を取得することができているからになります。
このように、メッセージキューを利用することで、複数のプログラム間(プロセス間)でメッセージのやり取りを簡単に実現することができます。
プログラムを終了したいときは、ターミナルで control
+ c
をキー入力してください
ブロッキングモードの動作確認
現在の producer.c
や consumer.c
では mq_open
の第2引数 oflag
に O_NONBLOCK
を指定していないため、producer.exe
でも consumer.exe
でもメッセージキューがブロッキングモードで開かれることになっています。
ここで、このブロッキングモードの時にメッセージキューが満杯になった時の動作を確認しておきたいと思います。
ここでは先ほど gcc
コマンドで生成した producer.exe
と consumer.exe
をそのまま使います。
ということで、まず producer.exe
を下記コマンドで実行してください。
./producer.exe
次は、”consumer.exe
を実行していない状態” で、2つの整数と演算子の入力をそれぞれ4回分繰り返してください。
./producer.exe first number : 1 sencond number : 2 operation(0:+,1:-,2:*,3:/) : 0 first number : 1 sencond number : 2 operation(0:+,1:-,2:*,3:/) : 1 first number : 1 sencond number : 2 operation(0:+,1:-,2:*,3:/) : 2 first number : 1 sencond number : 2 operation(0:+,1:-,2:*,3:/) : 3
3回目の入力までは、演算子を示す整数を入力すれば、再度1つ目の整数の入力受付が行われるようになっていました。ですが、4回目の演算子を示す整数の入力を行っても次の整数の入力受付が行われません。
producer.c
では、演算子の入力を受け付けた後はメッセージを生成し、その後 mq_send
でメッセージキューへのエンキューを行うようになっています。そして、mq_send
終了後に、次のループに移行して1つ目の整数の入力受付を行うようになっています。したがって、次の1つ目の整数の入力受付が行われないということは、mq_send
が終了していないことを意味します。
この mq_send
が終了しない理由はメッセージキューが既に満杯になっているからです。メッセージキューを作成する mq_open
の第4引数 attr
のメンバー mq_maxmsg
を 3
に設定しているため、一度にメッセージキューに溜められるメッセージは 3
つまでとなります。さらに、 3
回分の整数や演算子の入力を行って mq_send
でメッセージキューに 3
回メッセージのエンキューが行われているため、現状メッセージキューは満杯ということになります。そして、メッセージキューはブロッキングモードで開かれているので、満杯の時に mq_send
を実行するとエンキューできるようになるまで mq_send
は終わらないことになります。
ここで、consumer.exe
を下記コマンドで実行してみましょう!
./consumer.exe
consumer.exe
が実行されれば、メッセージキューからのデキューが行われることになり、メッセージキューに空きができて満杯でなくなります。 そのため、producer.exe
を実行しているターミナルには、4回目の整数の入力受け受けが行われるようになったことが確認できるはずです。
first number :
こんな感じで、メッセージキューが満杯の時は mq_send
内部で待機してくれるため、プログラム開発者はメッセージキューが満杯の時の制御のコードを書かなくてもメッセージキューに空きが出来るまで待ち続けるという動作を実現できることになります。同様に、メッセージキューが空の時は、メッセージがエンキューされるまで mq_receive
内部で待機してくれます。
逆に言えば、メッセージキューが満杯の時は mq_send
内部で待機するため、mq_send
を実行しているスレッドやプロセスは他の処理を行うことができません。例えば、メッセージキューが満杯であることをユーザーに通知したいとしても mq_send
で待機し続けるのでそういった処理は行えません。こういった、メッセージキューが満杯であるときに他の処理を実行させたいのであればメッセージキューはノンブロッキングモードで開く必要があります。
ノンブロッキングモードのメッセージキューの利用例
ということで、次はメッセージキューをノンブロッキングモードで開いた時の動作を確認していきたいと思います。ここでは、生産者側(送信側)となる producer.c
のソースコードのみを変更してメッセージキューをノンブロッキングモードで開くようにしていきます。
生産者のソースコード(producer.c
)
メッセージキューをノンブロッキングモードで開くように変更した生産者側のソースコード producer.c
は下記となります。
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "message.h"
// キューの名前
#define QUEUE_NAME ("/sample_queue")
// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))
int main(void) {
// キューを事前に削除しておく
mq_unlink(QUEUE_NAME);
// キューの設定
struct mq_attr attr;
attr.mq_maxmsg = 3;
attr.mq_msgsize = MSG_SIZE;
// キューを生成して書き込みモードで開く
mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT | O_NONBLOCK, 0666, &attr);
if (queue == -1) {
// キューの作成に失敗
printf("%s\n", strerror(errno));
return -1;
}
while (1) {
// 2つの整数の入力
int first_num, second_num;
printf("first number : ");
scanf("%d", &first_num);
printf("sencond number : ");
scanf("%d", &second_num);
// 演算子の入力
int operation;
printf("operation(0:+,1:-,2:*,3:/) : ");
scanf("%d", &operation);
if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
// 演算子が不正ならプログラム終了
printf("QUIT!\n");
break;
}
// メッセージの生成
MESSAGE message;
message.operation = operation;
message.first_num = first_num;
message.second_num = second_num;
// メッセージの送信
int ret = -1;
do {
ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
if (ret == -1) {
if (errno == EAGAIN) {
printf("Message Queue is Full\n");
sleep(1);
} else {
printf("%s\n", strerror(errno));
break;
}
}
} while (ret == -1);
}
// メッセージキューを閉じて削除
mq_close(queue);
mq_unlink(QUEUE_NAME);
}
上記の producer.c
では mq_open
関数の第2引数に O_NONBLOCK
を指定することでメッセージキューをノンブロッキングモードで開くようにしています。mq_send
関数はメッセージキューが満杯でエンキューに失敗した場合、すなわち mq_send
関数が -1
を返却し、かつ errno
が EAGAIN
の場合は、メッセージキューが満杯であることを printf
で出力してユーザーに通知し、さらに 1
秒間スリープしたのちに再度 mq_send
を実行するようにしています。そして、これらをメッセージキューへのエンキューに成功するまで繰り返すようにしています。
ノンブロッキングモードでメッセージキューを開けば、メッセージキューが満杯の場合でも mq_send
が即座に終了するため、ユーザーへの通知などの他の処理をすぐに行うことが可能となります。ただし、メッセージキューに空きができるまで mq_send
を繰り返し実行するような制御をプログラマー自身がコーディングする必要が出てくるため、コーディングの難易度は上がります。
動作確認
実際に、ノンブロッキングモードのメッセージキューの動作について確認してみましょう!
ブロッキングモードの動作確認 で行った手順と同じこと、すなわち producer.exe
のみを実行して整数や演算子の入力を繰り返し行ってみてください。
今回は、単に入力受け受けが行われないのではなく、下記のようにメッセージキューが満杯であることを示す文字列が繰り返し出力されることが確認できるはずです。このように文字列が出力できているのは、mq_send
がメッセージキュー満杯の状態でも即座に終了するようになったためです。
Message Queue is Full
consumer.exe
を実行してやれば、メッセージキューの満杯が解消されて上記の文字列の出力が停止し、次の整数や演算子の入力受付が行われるようになることも確認できると思います。
スポンサーリンク
優先度を設定したメッセージキューの利用例
次は、優先度を設定したメッセージキューの利用例となるサンプルを示していきます。今回も変更するソースコードは producer.c
のみで、動作確認には今まで利用してきた consumer.exe
をそのまま利用します。
生産者のソースコード(producer.c
)
エンキューするメッセージに優先度を設定するようにした生産者側のソースコード producer.c
は下記となります。
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"
// キューの名前
#define QUEUE_NAME ("/sample_queue")
// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))
int main(void) {
// キューを事前に削除しておく
mq_unlink(QUEUE_NAME);
// キューの設定
struct mq_attr attr;
attr.mq_maxmsg = 3;
attr.mq_msgsize = MSG_SIZE;
// キューを生成して書き込みモードで開く
mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0666, &attr);
if (queue == -1) {
// キューの作成に失敗
printf("%s\n", strerror(errno));
return -1;
}
while (1) {
// 2つの整数の入力
int first_num, second_num;
printf("first number : ");
scanf("%d", &first_num);
printf("sencond number : ");
scanf("%d", &second_num);
// 演算子の入力
int operation;
printf("operation(0:+,1:-,2:*,3:/) : ");
scanf("%d", &operation);
if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
// 演算子が不正ならプログラム終了
printf("QUIT!\n");
break;
}
// メッセージの生成
MESSAGE message;
message.operation = operation;
message.first_num = first_num;
message.second_num = second_num;
// メッセージの送信
int ret = mq_send(queue, (const char*)&message, sizeof(message), operation);
if (ret == -1) {
printf("%s\n", strerror(errno));
}
}
// メッセージキューを閉じて削除
mq_close(queue);
mq_unlink(QUEUE_NAME);
}
基本的なメッセージキューの利用例 で示した producer.c
からの変更点は、mq_send
の第4引数に operation
を指定するようにした点のみとなります。これにより、ユーザーから入力された演算子を示す値に応じた優先度が設定されたメッセージがメッセージキューにエンキューされるようになります。具体的には、演算子に応じて下記のようにメッセージに優先度が設定されるようになります。
- 演算子が
+
:0
- 演算子が
-
:1
- 演算子が
*
:2
- 演算子が
/
:3
つまり、メッセージキューに複数のメッセージが存在する場合、上記の優先度の高い演算子が指定されたメッセージから順に先頭側に並べられることになります。同じ優先度のメッセージは先にエンキューされたものから順に先頭側から並べられます。そして、デキュー時には先頭のメッセージから順にデキューされることになります。
動作確認
実際に、優先度を設定した場合のメッセージキューの動作について確認してみましょう!
まず、下記コマンドを実行して producer.exe
を生成します。consumer.exe
に関しては 基本的なメッセージキューの利用例 の動作確認時に使用したものをそのまま利用します。
gcc producer.c -lrt -o producer.exe
続いて、下記コマンドで producer.exe
を実行し、続いて2つの整数の入力と演算子を示す整数の入力を3回分繰り返し行ってください。ここでは、演算子をバラバラに指定していただいた方が優先度の設定の効果が分かりやすいと思います。また、各演算の結果に違いが出るように入力を行ってください。
./producer.exe first number : 100 sencond number : 3 operation(0:+,1:-,2:*,3:/) : 0 first number : 100 sencond number : 3 operation(0:+,1:-,2:*,3:/) : 3 first number : 100 sencond number : 3 operation(0:+,1:-,2:*,3:/) : 2 first number :
ここまでの手順により、メッセージキューに3つのメッセージがエンキューされたことになります。続いて consumer.exe
を実行してみてください。
./consumer.exe result = 33 result = 300 result = 103
consumer.exe
を実行すると、上記のように producer.exe
に入力した整数に基づいた計算結果が3つ出力されるはずです。ただし、その計算結果の出力順序は producer.exe
に入力した順序とは異なるはずです。具体的には、割り算・掛け算・引き算・足し算の順序で計算結果が出力されているはずです。つまり、前述で紹介したメッセージの優先度に基づき、その優先度の高い塩山に対する計算結果から順に出力されているはずです。
この出力順序は mq_receive
によってメッセージがデキューされた順序と一致しており、つまりは mq_receive
では、メッセージキューに複数のメッセージが存在する場合、優先度の高いメッセージからデキューすることができるということを示しています。
こんな感じで、メッセージに優先度を設けて優先度の高いメッセージに対する処理から順にデキューすることが可能であるという点もメッセージキュー(特に POSIX のメッセージキュー)の特徴となりますので、この点も覚えておきましょう!
マルチスレッドでのメッセージキューの利用例
さて、ここまで生産者と消費者となるプログラムをそれぞれ別に用意し、それらを別々のプロセスとして動作させる例を示してきました。これは、いわゆるマルチプロセスによる処理の並列化の例となります。そして、これらのプロセス間での通信にメッセージキューを利用してきたことになります。
ここでは、マルチプロセスではなくマルチスレッドでのメッセージキューの利用例を示していきたいと思います。この場合、実行するプログラム(プロセス)は1つのみになりますが、内部でスレッドを複数(今回は2つ)動作させることで処理の並列化を実現することになります。1つのスレッドを生産者の役割として動作させ、もう1つのスレッドを消費者の役割として動作させていきます。そして、スレッド間の通信にメッセージキューを利用します。また、生産者と消費者の役割は今まで紹介してきたソースコードと同様になります。
マルチスレッドについては下記ページで解説していますので、詳しくしたい方は下記ページをご参照ください。
入門者向け!C言語でのマルチスレッドをわかりやすく解説ソースコード(producer_consumer.c
)
マルチスレッドでのメッセージキューの利用例となるソースコード producer_consumer.c
は下記となります。
#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"
// キューの名前
#define QUEUE_NAME ("/sample_queue")
// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))
// 各スレッドから参照できるようにグローバル変数で宣言
static mqd_t queue = -1;
int calc(MESSAGE *message) {
int result;
// operationに従った計算を実行
switch(message->operation) {
case E_SUM:
result = message->first_num + message->second_num;
break;
case E_SUB:
result = message->first_num - message->second_num;
break;
case E_MUL:
result = message->first_num * message->second_num;
break;
case E_DIV:
if (message->second_num == 0) {
result = 0;
} else {
result = message->first_num / message->second_num;
}
break;
default:
result = 0;
break;
}
return result;
}
void *consumer(void *arg) {
while (1) {
// メッセージ格納先の変数
MESSAGE message;
// メッセージをデキューして取得
ssize_t ret = mq_receive(queue, (char*)&message, sizeof(message), NULL);
if (ret == -1) {
printf("%s\n", strerror(errno));
}
// メッセージに基づいて処理を実行
int result = calc(&message);
printf("result = %d\n", result);
}
return NULL;
}
int main(void) {
// キューを事前に削除しておく
mq_unlink(QUEUE_NAME);
// キューの設定
struct mq_attr attr;
attr.mq_maxmsg = 3;
attr.mq_msgsize = MSG_SIZE;
// キューを生成して書き込みモードで開く
queue = mq_open(QUEUE_NAME, O_RDWR | O_CREAT, 0666, &attr);
if (queue == -1) {
// キューの作成に失敗
printf("%s\n", strerror(errno));
return -1;
}
pthread_t consumer_thread;
pthread_create(&consumer_thread, NULL, &consumer, NULL);
while (1) {
// 2つの整数の入力
int first_num, second_num;
printf("first number : ");
scanf("%d", &first_num);
printf("sencond number : ");
scanf("%d", &second_num);
// 演算子の入力
int operation;
printf("operation(0:+,1:-,2:*,3:/) : ");
scanf("%d", &operation);
if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
// 演算子が不正ならプログラム終了
printf("QUIT!\n");
break;
}
// メッセージの生成
MESSAGE message;
message.operation = operation;
message.first_num = first_num;
message.second_num = second_num;
// メッセージの送信
int ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
if (ret == -1) {
printf("%s\n", strerror(errno));
}
}
// 消費者スレッドをキャンセルして終了するまで待つ
pthread_cancel(consumer_thread);
pthread_join(consumer_thread, NULL);
// メッセージキューを閉じて削除
mq_close(queue);
mq_unlink(QUEUE_NAME);
}
上記の producer_consumer.c
においては、まずメッセージキューを main
関数の中で mq_open
関数によって生成しています。そして、その生成されたメッセージキューのディスクリプタをグローバル変数 queue
に格納するようにしています。グローバル変数に格納することにより、各スレッドから queue
、すなわちメッセージキューのディスクリプタを参照できるようになります。マルチプロセスの場合、ディスクリプタの共有がプロセス間で行えないため、生産者と消費者の両方のプロセスから mq_open
を実行する必要がありましたが、マルチスレッドの場合はグローバル変数でディスクリプタが共有できるので、1度 mq_open
でディスクリプタを取得してやれば、それを生産者と消費者の両方から参照することが出来ることになります。
メッセージキューの生成後には、pthread_create
関数を実行してスレッドの生成を行っています。このスレッドは consumer
関数を仕事とするスレッドであり、pthread_create
関数の実行によって consumer
関数が main
関数と並列して実行されることになります。つまり、main
関数での mq_send
でのメッセージキューへのエンキューと、consuemr
関数での mq_receive
でのメッセージキューからのデキューが並列して実行されることになります。
これらが並列動作するという点は、今まで紹介してきたマルチプロセスの場合と同様であり、つまりは以降の動作は基本的に今まで紹介してきたプログラムと同様で、それが単にマルチスレッドにより実現されているだけになります。なので、以降の動作に関しての説明は省略します。
また、これはメッセージキューには無関係な話なので補足になりますが、main
関数での演算子を示す整数の入力受付において 0
~ 3
以外の値が入力された時には while
ループを抜け出して main
関数を終了するようにしています。この時に、consumer
関数も終了させたいため、pthread_create
で生成したスレッドを pthread_cancel
で終了させるようにしています。そして、その後 consumer
関数が終了するまで pthread_join
で待機し、その後はいつも通りメッセージキューのクローズと削除を行った後にプログラムが終了するようにしています。
プログラムが終了すれば pthread_create
で生成したスレッドも勝手に終了しますし、consumer
スレッドの返却値も参照していないため、上記のソースコードの例の場合は別に pthread_cancel
や ptherad_join
を行わなくても良いのですが、consumer
スレッドを確実に終了させた後にプログラム内で他の処理を実行したいような場合や、consumer
スレッドの返却値を参照する必要のある場合などは、上記のような pthread_cancel
や pthread_join
等の細かな制御も必須となります。
動作確認
最後に producer_consumer.c
のプログラムの動作確認をしておきましょう!
まず、下記のコマンドで producer_consumer.c
をコンパイルして producer_consumer.exe
を生成してください。pthread
関連の関数をを利用するために -lpthread
での pthread
ライブラリへのリンクが必要となる点に注意してください。
gcc producer_consumer.c -lrt -lpthread -o producer_consumer.exe
次は、下記コマンドで producer_consumer.exe
を実行します。
./producer_consumer.exe
producer_consumer.exe
を実行したら、いつも通り整数2つと演算子の入力が促されるので、好きな整数や演算子を選んで入力してください。入力が完了すれば、計算結果が出力されることを確認できると思います。入力と出力が同じターミナルで行われるので少し見づらいですが、これは我慢してください…。
./producer_consumer.exe first number : 100 sencond number : 3 operation(0:+,1:-,2:*,3:/) : 3 first number : result = 33
計算結果の出力を行っているのは main
関数から pthread_create
により生成された consumer
関数を実行するスレッドになるため、上記のように計算結果が出力されたということは、入力された情報を main
関数を実行するスレッドからメッセージキューを介して consumer
関数を実行するスレッドが受け取ることができているからになります。
このようにマルチプロセスだけではなくマルチスレッド間での通信・メッセージのやり取りにもメッセージキューが利用できることは覚えておきましょう!
まとめ
このページではメッセージキューについて説明しました!
メッセージキューは、複数のスレッドやプロセスの間で通信・メッセージのやり取りを行うための仕組みであり、これにより、受信側は確実に送信側がエンキューした順 or 優先度の高い順でメッセージを受信することができるようになります。
これを利用することで生産者・消費者モデルを簡単に実現することができます。他にもプロセス間・スレッド間でメッセージのやりとりを行う方法はあるのですが、メッセージキューはかなり使いやすい部類の通信手段になると思いますので、並列プログラミングを利用する際はメッセージキューの採用も検討してみると良いと思います!