【C言語】メッセージキューについて解説(mq_open・mq_send・mq_receiveなど)

メッセージキューの解説ページアイキャッチ

このページにはプロモーションが含まれています

このページではメッセージキューについて解説します。

具体的には、POSIX のメッセージキューに基づいて解説を行っていきます。なので、POSIX 準拠の Linux OS を利用している方であれば、このページで紹介するソースコードはそのまま利用可能です。Windows や Mac の場合でも同様の機能を持つメッセージキュー(Windows は MSMQ)が利用可能だと思いますので、考え方等は参考になると思います。

メッセージキューとは

メッセージキューとは、複数のスレッド間や複数のプロセス間で非同期的にメッセージをやり取りするための仕組みの1つとなります。

名前に “キュー” が付いている通り、メッセージキューの実体はキューになります。そして、メッセージキューでは、このキューを介してメッセージのやり取りを行うことになります。キューに関しては下記ページで詳細を解説していますので、詳しく知りたい方は下記ページを参照していただければと思います。

スタックとキューの配列での実装方法解説ページアイキャッチ 【C言語/データ構造】スタックとキューの配列での実装方法

キューとは

キューは FIFO (Firt In, First Out) を実現するデータ構造であり、イメージとしては待ち行列に近いです。基本的には、行列に並ぶときには行列の末尾から並び、先頭の人から順にサービスを受けることになります。これにより、並んだ人から順にサービスを受けることができることになります。

キューを待ち行列で表現した図

同様に、キューへのデータの格納もキューの末尾から順にデータが格納されていくことになります。そして、データの取り出しに関しては、キューの先頭から順にデータが取り出されるようになっています。これらのキューへのデータの格納をエンキューキューからのデータの取り出しをデキューと呼びます。

キューの説明図

基本的には、この考えで良いのですが、POSIX のメッセージキューの場合はデータに優先度をつけることができ、優先度の高いデータを先頭から順に並べるようなことも可能です。この場合もデータを取り出すときは先頭から順にデータが取り出されるため、優先度の高い順にデータを取り出すことができることになります。

優先度付きのキューの説明図

スポンサーリンク

キューを介したメッセージのやり取り

前述のとおり、メッセージキューでは先ほど説明したキューを介してスレッドやプロセスの間でメッセージのやり取りが行われます。

具体的には、メッセージ送信側のスレッドやプロセスはメッセージキューにメッセージをエンキューし、そして、メッセージ受信側のスレッドやプロセスはメッセージキューからメッセージをデキューすることで、送信側から受信側へのメッセージの送信が実現されることになります。

メッセージキューの説明図

キューには複数のメッセージを格納することが可能で、送信側がキューに複数のメッセージをエンキューした場合、受信側はデキューによって送信側がエンキューしたから順に従ってメッセージを受信することができます。

メッセージキューがFIFOになっていることを示す図

このように、単にメッセージのやり取りが行うことが出来るだけでなく、送信側がキューに格納した順序にしたがって受信側がメッセージを取り出せる、つまりメッセージの送信順序とメッセージの受信順序が同じになることが保証されているという点がメッセージキューの特徴になります。

さらに、POISX のメッセージキューにおいては、各メッセージに対して優先度を設定することも可能です。メッセージに優先度を設定することで、キューの先頭側から順に優先度の高いメッセージが並べられるようキュー内のメッセージがソートされることになるため、受信側は優先度の高い順にメッセージをデキューすることができるようになります。

メッセージキューが優先度付きFIFOにも対応していることを示す図

生産者・消費者モデルの実現が容易

また、メッセージキューを利用することで生産者・消費者モデルのプログラム・システムの実現が容易となります。

ここでいう “生産者” とは仕事を生み出すスレッドやプロセスで、“消費者” とは仕事を実行するスレッドやプロセスのことになります。こういった関係性をスレッドやプロセスに割り当てる設計モデルを生産者・消費者モデルと呼びます。

例えば、飲食店での料理の注文で考えれば、ウェイターさんが生産者でコックさんが消費者となり、仕事は料理の注文票と考えることが出来ます。ウェイターさんはお客様から注文を受け付け、その注文を書いた注文票を掲示板に貼ります。注文票が貼られれば、その注文票を取ってコックさんが注文票にしたがって料理を作ります。こんな感じで、仕事を生み出す生産者仕事を実行する消費者の位置づけで設計する設計モデルを生産者・消費者モデルと呼びます。

生産者・消費者モデルの例

で、この生産者・消費者モデルにおいては、生産者が生み出した仕事を消費者に渡すための手段が必要で、そこにメッセージキューを採用することで、生産者・消費者モデルが実現しやすくなります。生産者は、消費者に実行して欲しい仕事を示すデータをメッセージとしてメッセージキューにエンキューする、消費者はメッセージキューからメッセージをデキューし、そのメッセージにしたがって仕事を実行するようにプログラミングしてやれば良いだけです。

生産者・消費者モデルをメッセージキューを利用して実現する様子

キューの特性上、エンキューされた順序でメッセージがデキューされることになるので、自然と仕事を生み出した順序で仕事が実行されていくことになります。さらに、優先度を設定してやれば、自然と優先度順に仕事を実行させるようなことも可能になります。

さらに、メッセージキューは複数のメッセージを溜めておくことが可能です。そのため、生産者が仕事を生み出すペースが消費者が仕事を完了させるペースより速くても、生産者の仕事を生み出す処理を待たせるような制御をわざわざ導入する必要はありません。メッセージキューにはメッセージを複数溜めておけるので、生産者は消費者のペースを気にせず、生産者のペースで仕事を生み出してエンキューを行えば良いのです。つまり、生産者は生産者のペースで仕事を生み出し、消費者は消費者のペースで仕事を実行すれば良いだけになります。

メッセージキューがバッファーとなるので相手のペースを気にせずに生産者と仕事が動作して問題ないことを示す図

ただし、生産者の仕事の生み出すペースが極端に速いとメッセージキューがすぐに満杯になることになります。メッセージキューを用いた生産者・消費者モデルでは、この問題の解決も容易に行うことができます。具体的には、単に消費者側のプロセス数やスレッド数を増やして並列度を上げてやることで消費者の仕事を実行するペースを上げることができます。

消費者側の並列度を上げて消費者の仕事を実行するペースを向上させる様子

このように、メッセージキューは生産者・消費者モデルと相性が良いです。これらはセットで覚えておくとよいと思います。

コーディング面について補足しておくと、生産者側は基本的には無限ループを組んで “仕事を生み出してメッセージキューにエンキューする” という一連の処理を延々と続け、消費者側も無限ループを組んで “メッセージキューからデキューして仕事を実行する” という一連の理を延々と続ける形で実装を行うことが多いです。

生産者と消費者の処理の流れをコードで示した図

メッセージキューの使い方

続いてメッセージキューを使用する流れについて解説していきます。関数名を使って説明していきますが、各関数の詳細に関しては次の章の メッセージキューの関数 で説明します。まずはメッセージキューを使用する際の処理の流れを理解しましょう!

スポンサーリンク

メッセージキューを作成する

メッセージキューを利用するためには、まずメッセージキューの作成が必要になります。このメッセージキューの作成は mq_open 関数で実現できます。

メッセージキューの作成の説明図

メッセージキューを作成するときには、キューのサイズ(キューに格納できるメッセージの数)メッセージ自体の最大サイズを指定することが可能です。

メッセージキューを mq_open 関数で作成したら同時にメッセージキューが開かれることになるため、次の メッセージキューを開く を別途実行する必要はありません。

メッセージキューを開く

メッセージキューをプログラムから利用するためには、メッセージキューを事前に開いておく必要があります。このメッセージキューを開く処理も mq_open 関数で行うことができ、mq_open 関数で “メッセージキューを作成する” or “単に開く” のどちらを行うのかについて引数の指定によって制御可能です。

前述のとおり、このメッセージキューは複数のスレッド間や複数のプロセス間(スレッドやプロセスが混合しても OK)でメッセージのやり取りを行うためのものになりますので、特にプロセス間でメッセージのやり取りを行うのであれば、送信側と受信側の両方のプロセスで mq_open 関数を実行して “同じメッセージキュー” を開く必要があります。

同じメッセージキューを開く際に必要になるのがメッセージキューを見分けるための 名前 になります。mq_open 関数の引数には 名前 が指定できるようになっていますので、送信側と受信側とで同じ 名前 を引数にして mq_open を実行すれば、同じメッセージキューを開くことが出来ます。あとは、このメッセージキューを介してメッセージのやり取りを行えば良いだけです。

メッセージキューを開く・作成する際に同じ名前を指定する必要があることを示す図

送信側と受信側とで指定する 名前 が異なれば、送信側と受信側とで異なるメッセージキューを開くことになってしまい、メッセージのやり取りが行えなくなってしまうので注意が必要です。

また、mq_open 関数が成功した場合の返却値は開いたメッセージキューを識別するための識別子(mqd・ディスクリプタ)となります。メッセージキューを開いた後は、この mq_open 関数の返却値を関数に指定して操作するメッセージキューを指定することになります。

つまり、メッセージキューを開く際には 名前 で開くメッセージキューを指定する必要があり、開いたメッセージキューを操作する際には mq_open 関数の返却値で操作対象のメッセージキューを指定する必要があって、それぞれでメッセージキューを見分けるための識別子が異なる点に注意してください。

名前とディスクリプタの役割の違いを示す図

スポンサーリンク

メッセージをエンキューする(メッセージを送信する)

メッセージキューを開いてしまえば、メッセージのやり取りを行う準備が整ったことになります。つぎは、このメッセージのやり取りを行う “メッセージの送信” 及び “メッセージの受信” について説明していきます。

メッセージの送信に関しては mq_send 関数により実現できます。この mq_send 関数を実行することで、メッセージキューにメッセージをエンキューすることができます。

メッセージキューにおけるメッセージの送信の説明図

デフォルト設定では、メッセージキューが満杯の状態で mq_send 関数を実行した場合、メッセージキューのエンキューに成功するまで mq_send 関数内部で待ち続けることになります。つまり、メッセージキューからメッセージがデキューされるまで mq_send 関数は待機し続けます。

メッセージキューが満杯の時のエンキューの動作の説明図

スポンサーリンク

メッセージをエンキューする(メッセージを送信する)

メッセージの受信に関しては mq_receive 関数により実現できます。この mq_receive 関数を実行することで、メッセージキューからメッセージをデキューすることができます。デキューしたメッセージはメッセージキューから消滅します。

メッセージキューにおけるメッセージの受信の説明図

また、デフォルト設定では、メッセージキューが空の状態で mq_receive 関数を実行した場合、メッセージのデキューに成功するまで mq_receive 関数内部で待ち続けることになります。つまり、メッセージキューにメッセージがエンキューされるまで mq_receive 関数は待機し続けます。

メッセージキューが空の時のデキューの動作の説明図

メッセージキューを閉じる・削除する

メッセージの送信や受信を繰り返し行い、全てのメッセージのやり取りが完了したらメッセージキュー閉じてやりましょう。これは mq_close 関数によって実現可能です。ただし、これは単にプログラムからメッセージキューを閉じるだけであり、メッセージキュー自体が削除されるというわけではありません。なので、メッセージキューをクローズした後も、再度メッセージキューを開けばメッセージのやり取りを再開することが出来ます。

メッセージキューを削除したいのであれば mq_close 実行後に mq_unlink 関数を実行してやれば良いです。

以上がメッセージキューを利用する際の処理の基本的な流れとなります。

メッセージキューの関数

続いてメッセージキューを利用するときに使用する関数を紹介していきます。

特によく利用する関数は下記の5つです。まずはこの5つの関数について覚えておきましょう。

  • mq_open:メッセージキューを開く or 作成する
  • mq_send:メッセージをエンキューする
  • mq_receive:メッセージをデキューする
  • mq_close:メッセージキューを閉じる
  • mq_unlink:メッセージキューを削除する

これらの関数を利用するためにはヘッダーファイルのインクルードとライブラリへのリンクが必要となりますので、まずはこれらについて説明を行い、その後上記の関数を個別に紹介していきたいと思います。

スポンサーリンク

関数利用時のコンパイル・リンク

では、メッセージキュー利用時に必要となるヘッダーファイルのインクルードとライブラリへのリンクについて説明します。

インクルードするヘッダーファイル

メッセージキューに関する関数は mqueue.h でプロトタイプ宣言されています。なので、メッセージキューを利用する際には mqueue.h のインクルードが必須となります。また、メッセージキュー作成時には作成するメッセージキューのモード(権限)を指定する必要があり、モードのパラメーターが sys/stat.h で定義されているため、必要に応じて sys/stat.h のインクルードも必要となります。

メッセージキュー利用時のインクルード
#include <mqueue.h>
#include <sys/stat.h> // 必要に応じて

リンクするライブラリーファイル

さらに、メッセージキューに関する関数の実体は rt ライブラリに存在するため、リンク時には -lrt オプションを指定する必要があります。

例えば、メッセージキューを利用するプログラムのソースコードが producer.c である場合、gcc を使ってコンパイル・リンクを行うのであれば下記のようなコマンドを実行する必要があります。

gcc producer.c -lrt -o producer.exe

mq_open

続いて、メッセージキュー関連の関数の個別の説明に移ります。

まず、mq_open は、第1引数で指定した名前の “メッセージキューを新規に作成” する、もしくは、第1引数で指定した名前の “既に存在するメッセージキューを開く” 関数になります。

mq_open 関数の第2引数で指定するフラグに O_CREAT が含まれている場合、第1引数で指定した名前のメッセージキューがシステム内に “存在しなければ”、新規にメッセージキューが生成されることになります。既に存在していれば、新規に作成されることなく単にメッセージキューが開かれることになります。

また、第2引数で指定するフラグに O_CREAT が含まれている場合は、第3引数にモード(パーミッション)を、さらに第4引数に新規作成するメッセージキューの設定 or NULL を指定する必要があります。O_CREAT が含まれていないのであれば、第3引数と第4引数は不要になります。

つまり、第2引数で指定するフラグによって mq_open 関数に指定すべき引数が異なることになります。

具体的には、第2引数で指定するフラグに O_CREAT が含まれている場合は下記のように4つの引数を指定する必要があります。

mq_open(キュー作成時)
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

それ以外は、下記のように2つの引数のみを指定してやれば良いです。

mq_open(キュー作成時以外)
mqd_t mq_open(const char *name, int oflag);

続いて、各引数について説明していきます。

mq_open の第1引数 name

まず第1引数 name にはメッセージキューの名前を文字列で指定します。この名前はメッセージキューを識別するためのデータとなり、mq_open の実行によって、この名前が付けられたメッセージキューの作成が行われる or メッセージキューが開かれることになります。この name の先頭の文字は / である必要があります。

mq_open の第2引数 oflag

第2引数 oflag ではメッセージキューの開き方をフラグとして指定します。下記の3つのフラグのうちの1つは指定が必須です。

  • O_RDONLY:受信専用キューとして開く
  • O_WRONLY:送信専用キューとして開く
  • O_RDWR:受信・送信両方可能なキューとして開く

メッセージキューへのデータの送信のみを行うプログラムであれば oflagO_WRONLY を、メッセージキューからのデータの受信のみを行うプログラムであれば oflagO_RDONLY を、送受信両方を行プログラムであれば oflagO_RDWR を指定して mq_open を実行する感じですね。

oflag には上記の3つのうちのいずれかのフラグを指定することが必須で、他にも下記のようなフラグを指定することが可能です。複数のフラグを指定したい場合は、各フラグを | の OR 演算子で区切る形で指定する必要があります。

  • O_CREAT:キューが存在しないのであれば新規作成して開く
  • O_NONBLOCK:ノンブロッキングモードのキューとして開く

例えば、メッセージキューを “ノンブロッキングモード” で “送信専用” で開く、かつ、開こうとしているキューが存在しないのであれば “新規作成” したいのであれば下記のように oflag を指定して mq_open を実行する必要があります。

O_WRONLY | O_NONBLOCK | O_CREAT

ノンブロッキングモードについては、後述の mq_send および mq_receive で説明を行います。

mq_open の第3引数 mode

第3引数 mode では作成するメッセージキューのパーミッション(権限)を指定します。このパーミッションのパラメーターとしては、sys/stat.h で定義されている S_IRUSRS_IRWXU 等の定義値を指定することが可能です。ただ、Linux や Mac の chmod コマンドと同様に数値で 0666 等のパーミッションを指定することも可能です。chmod コマンドに慣れている方は数値で指定する方が楽かなぁと思います。

mq_open の第4引数 attr

第4引数 attr では作成するメッセージキューの設定を指定することができます。第4引数 attr の型は struct mq_attr * なので、この struct mq_attr 型の変数を用意し、その変数のメンバーに各種設定値をセットしてから、その変数のアドレスを attr として指定してやれば良いことになります。

struct mq_attr は下記のように定義された構造体であり、この定義を見ていただければ分かる通り、mq_open 実行時には mq_maxmsgmq_msgsize しか参照されないため、これらの2つのメンバーのみに値をセットして引数に指定してやれば良いです。mq_maxmsg にはキューのサイズ(キューに格納可能なメッセージの個数)、mq_msgssize には各メッセージのサイズの最大値を指定します。

struct mq_attr
struct mq_attr {
    long mq_flags;       /* Flags (ignored for mq_open()) */
    long mq_maxmsg;      /* Max. # of messages on queue */
    long mq_msgsize;     /* Max. message size (bytes) */
    long mq_curmsgs;     /* # of messages currently in queue (ignored for mq_open()) */
};

また、第4引数 attr には NULL を指定することも可能で、この場合はシステムのデフォルト値に基づいてメッセージキューが作成されることになります。

mq_open で作成される管理ファイル

mq_open でメッセージキューを作成すると、もしかしたらプログラムを実行する環境によって異なるかもしれませんが、下記のフォルダに name 引数に指定した文字列と同じ名前のファイルが生成されることになります。

/dev/mqueue/

mq_open の返却値

メッセージキューを開くことに成功した場合、mq_open 関数は正の整数を返却します。この返却値は開いたメッセージキューのディスクリプタであり、mq_sendmq_receivemq_close でメッセージキューの操作を行う際に、操作対象のメッセージキューを指定することを目的に、この mq_open の返却値を引数で指定する必要があります。

また、メッセージキューを開くことに失敗した場合は mq_open 関数は -1 を返却します。この場合、エラーの原因を示す値が errno にセットされます。

errno に関しては下記ページで詳細を解説していますので、詳しくは下記ページを参照いただければと思います。関数のエラーの原因を知りたい場合も多いと思いますので、errno を使いこなせるようにしておきましょう!

【C言語】errnoを利用してエラーの原因を特定する

mq_open 利用時の注意点

既に name 引数で指定した名前のメッセージキューが存在する場合、mq_openoflag 引数に O_CREAT を指定していてもメッセージキューが再度作られるというわけではなく、単にメッセージキューが開かれるだけになります。

なので、既に name 引数で指定した名前のメッセージキューが存在すると、メッセージキューの設定を変更しようと思って第3引数や第4引数に指定するデータを変化させて再度 mq_open を実行したとしても、その設定の変更はメッセージキューには反映されないことになるので注意してください。設定を変更する場合は、後に説明する mq_unlink を利用する or /dev/mqueue 以下に存在するメッセージキューの管理ファイルを削除する必要があります(もしくは、引数 name に指定するメッセージキューの名前を変更するのでも良いです)。

mq_send

mq_sendメッセージキューにメッセージをエンキューすることで受信側に対してメッセージを送信する関数になります。

mq_send 関数のプロトタイプ宣言は下記のとおりです。

mq_send
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);

mq_send の第1引数 mqdes

第1引数 mqdes にはエンキュー先のメッセージキューのディスクリプタを指定します。メッセージキューのディスクリプタは mq_open 関数の返却値として得られますので、つまりは mq_open 関数の返却値を指定してやれば良いことになります。

mq_send の第2引数 msg_ptr

第2引数 msg_ptr にはメッセージキューにエンキューしたい、つまりは相手に送信したいデータの先頭アドレスを指定します。引数の型は const char * 型ではありますが、構造体等のデータの先頭アドレスも指定可能です。

mq_send の第3引数 msg_len

第3引数 msg_len にはメッセージキューにエンキューしたいデータのサイズを指定します。単位はバイトです。このサイズは、mq_open 関数でメッセージキューを作成したときに第4引数 attr のメンバー msg_sizeで指定したメッセージのサイズ以下である必要があります。

mq_send を実行した際には、第2引数 msg_ptr のアドレスを先頭とする msg_len バイトのデータがメッセージキューにエンキューされることになります。

mq_sendで送信されるデータのアドレスとサイズを説明する図

mq_send の第4引数 msg_prio

第4引数 msg_prio には、エンキューするメッセージの優先度を設定します。POSIX のメッセージキューのキューは優先度設定可能になっており、単なる先入先出方式でデータの格納・取り出しが行われるわけではなく優先度に基づいてキュー内のデータの並びが変化するようになっています。具体的には、POSIX のメッセージキューの場合、第4引数 msg_prio に指定した値が大きいメッセージがキューの先頭側から降順に並ぶようになっています。デキューはキューの先頭から順に行われていきますので、第4引数 msg_prio に大きな値を指定してエンキューしたメッセージほど先にデキューされることになります。

mq_send関数におけるmsg_prio引数の意味合いを説明する図

mq_prio の値が同じメッセージが複数エンキューされた場合は、基本的なキュー同様に、先にエンキューされたメッセージから先にデキューされることになります。つまり、エンキューした順にメッセージをデキューされるようにしたいのであれば、全てのメッセージのエンキュー時、すなわち mq_send には mq_prio に毎回同じ値を指定する必要があることになります。逆にいうと、mq_prio の値がバラバラの状態で mq_send を実行してしまうとエンキューした順序でのデキューが実現できなくなることになります。

mq_send関数でエンキューした順にメッセージをソートするための方法を説明する図

mq_send の返却値

メッセージの送信、すなわちメッセージキューへのメッセージのエンキューに成功した場合、mq_send 関数は 0 を返却し、失敗した場合は -1 を返却します。mq_send から -1 が返却された場合は、メッセージの送信に失敗した理由を示す値が errno にセットされます。

mq_send のノンブロッキングモード時の動作

mq_open で第2引数 oflagO_NONBLOCK が指定された状態で開かれたメッセージキューを利用する場合、mq_send 関数はノンブロッキングモードで動作することになります。O_NONBLOCK が指定されていなければブロッキングモードで動作します。

ブロッキングモードの場合は、メッセージキューが満杯でエンキューできなければ mq_send 関数はエンキューできるまで、すなわちメッセージキューからのメッセージのデキューが受信側で実行されるまで待ち続けることになります。

それに対し、ノンブロッキングモードの場合は、メッセージキューが満杯でエンキューできなければ mq_send 関数は直ちに終了することになります。この時の mq_send 関数の返却値は -1 となり、errno には EAGAIN がセットされることになります。

メッセージキューが満杯のときに単にエンキューできるようになるまで待ち続けるのでよいのであればブロッキングモードを利用すればよいですし、メッセージキューが満杯の時に何かしらの処理をすぐに行いたい、例えばユーザーに満杯になったことを通知したいような場合はノンブロッキングモードを利用すれば良いと思います。

スポンサーリンク

mq_receive

mq_receiveメッセージキューからメッセージをデキューすることで送信側から送信されてきたメッセージを受信する関数になります。

mq_receive 関数のプロトタイプ宣言は下記のとおりです。

mq_receive
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

mq_receive の第1引数 mqdes

第1引数 mqdes にはデキュー先のメッセージキューのディスクリプタを指定します。mq_send 関数と同様に、メッセージキューのディスクリプタは mq_open 関数の返却値として得られますので、つまりは mq_open 関数の返却値を指定してやれば良いことになります。

mq_receive の第2引数 msg_ptr

第2引数 msg_ptr にはメッセージキューからデキューしたデータを、つまりは相手から受信したデータを格納するバッファーや変数の先頭アドレスを指定します。引数の型は char * 型ではありますが、構造体等のデータの先頭アドレスも指定可能です。

mq_receiveでデキューしたデータがmsg_ptrの指すバッファーに格納されることを示す図

メッセージキューにエンキューされているメッセージのサイズの最大値は、メッセージキュー作成時に mq_open 関数の第4引数 attr で指定した msg_size となります。したがって、第2引数 msg_ptr を先頭アドレスとするバッファーや変数のサイズは、この msg_size 以上である必要があります。

mq_receive の第3引数 msg_len

第3引数 msg_len には第2引数 msg_ptr を先頭アドレスとするバッファーや変数のサイズを指定します。前述のとおり、このサイズは メッセージキュー作成時に mq_open 関数の第4引数 attr で指定した msg_size 以上である必要があります。

mq_receive の第4引数 msg_prio

第4引数 msg_prio には、デキューしたメッセージの優先度の格納先のバッファーや変数の先頭アドレスを指定します。もし、優先度の取得が不要であれば第4引数 msg_prio には NULL を指定してやれば良いです。

mq_send の第4引数 msg_prio で説明したように、デキューされたメッセージは、そのデキューした時点でメッセージキューに存在するメッセージの中で最大優先度のものになりますので、msg_prio の値をわざわざ取得しなくても優先度に応じたデキューの順序制御は勝手に行われることになります。

なので、基本的には第4引数 msg_prio には NULL を指定するので問題ないと思います。送信側で設定されたメッセージの優先度の具体的な値を知りたいような場合は、msg_priounsigned int 型の変数のアドレスを指定してやりましょう。

mq_receive の返却値

メッセージの受信、すなわちメッセージキューへのメッセージのデキューに成功した場合、mq_receive 関数はデキューしたメッセージのサイズを返却し、失敗した場合は -1 を返却します。mq_receive から -1 が返却された場合は、メッセージの送信に失敗した理由を示す値が errno にセットされます。

mq_receive のノンブロッキングモード時の動作

mq_send 同様に、mq_receive もノンブロッキングモードで動作させることが可能です。ノンブロッキングモードについては基本的には mq_send と同じなので詳細に関しては mq_send のノンブロッキングモード時の動作 を参照してください。

mq_receive の場合は、ノンブロッキングモードとブロッキングモードとで、メッセージキューが空の時mq_receive 実行時の動作が変化することになります。

mq_close

mq_close は開いているメッセージキューを閉じる関数になります。

mq_close 関数のプロトタイプ宣言は下記になります。

mq_close
int mq_close(mqd_t mqdes);

mq_close の第1引数 mqdes

mq_close の第1引数には、閉じたいメッセージキューのディスクリプタを指定します。要は、mq_open の返却値を指定してやれば良いです。

mq_close の返却値

メッセージキューを閉じることに成功した場合、mq_close 関数は 0 を返却し、失敗した場合は -1 を返却します。

mq_close から -1 が返却された場合は、メッセージキューを閉じることに失敗した原因を errno から調べることができます。

mq_unlink

mq_unlink はメッセージキューを削除する関数になります。

先ほど紹介した mq_close ではメッセージキュー自体やメッセージは削除されません。なので、次回同じ名前を引数に指定して mq_openでメッセージキューを開いた場合は、前回 mq_close を実行したときと同じ状態になっています。なので、送信側が mq_send を実行していないのに受信側が mq_receiveでいきなりメッセージをデキューできたりします。

こういった動作を望んでいるのであれば、わざわざメッセージキューを削除する必要はないのですが、プログラム開始時はメッセージキューをまっさらな状態にしておきたいような場合や、メッセージキューの設定を変更するためにメッセージキューを作り直したいような場合は mq_unlink によるメッセージキューの削除が必要となります。

プログラムを終了した後もメッセージを残したいような場合を除いては、基本的にはプログラムの最初と最後で mq_unlink を実行してメッセージキューを削除してやるのが良いと思います。

mq_unlink 関数のプロトタイプ宣言は下記になります。

mq_unlink
int mq_unlink(const char *name);

mq_unlink の第1引数 name

mq_unlink の第1引数には、削除したいメッセージキューの名前を指定します。要は、メッセージキュー作成時に mq_open の第1引数に指定した名前を指定してやれば良いです。

mq_unlink の返却値

メッセージキューの削除に成功した場合、mq_unlink 関数は 0 を返却し、失敗した場合は -1 を返却します。

mq_unlink から -1 が返却された場合は、メッセージキューの削除に失敗した原因を errno から調べることができます。

スポンサーリンク

メッセージキューを利用したサンプルプログラム

最後に、メッセージキューを利用したサンプルプログラムのソースコードを紹介していきます。ここでは生産者・消費者モデルに基づいて設計を行ったプログラムのソースコードを紹介していきます。基本的にはマルチプロセス(複数のプログラム)で動作させるプログラムを紹介しますが、最後にマルチスレッドで動作させるプログラムのソースコードも紹介します。

また、生産者側ではユーザーから下記の3つの入力を受け付け、これらの情報をセットしたメッセージをメッセージキューにエンキューするようにしています。

  • 1つ目の整数
  • 2つ目の整数
  • 演算子

メッセージは、下記の message.h で定義される MESSAGE 構造体のデータとしており、各メンバーに上記で入力された整数や演算子の enum 値をセットし、この構造体のデータをメッセージキューにエンキューするようにしていきます。そして、消費者側はメッセージキューからメッセージをデキューし、そのメッセージに基づいて演算を行ってその結果を printf で出力するようにしていきます。

この message.h は、以降で紹介するソースコードからインクルードされるようになっているため、下記をコピペして他のソースコードと同じフォルダに message.h というファイル名で保存しておいてください。

message.h
#ifndef _MESSAGE_H_
#define _MESSAGE_H_

typedef enum {
    E_SUM = 0,
    E_SUB,
    E_MUL,
    E_DIV,
} OPERATION;

typedef struct {
    int first_num;
    int second_num;
    OPERATION operation;
} MESSAGE;

#endif // _MESSAGE_H_

ちなみに、構造体や enum (列挙型) に関しては下記ページで解説していますので、詳細を知りたい方は別途下記ページを参照していただければと思います。

構造体解説ページのアイキャッチ 【C言語】構造体について初心者向けに分かりやすく解説 列挙型・enum の解説ページアイキャッチ 【C言語】列挙型(enum)について解説

基本的なメッセージキューの利用例

まずは、基本的なメッセージキューの利用例となるサンプルプログラムのソースコードを紹介していきます。前述のとおり、これらは生産者・消費者モデルに基づいて設計を行ったプログラムとなっており、プログラム自体が2つに分かれています。

生産者のソースコード(producer.c

まずは、生産者側のソースコード producer.c を示していきます。

producer.c
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"

// キューの名前
#define QUEUE_NAME ("/sample_queue")

// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))

int main(void) {
    // キューを事前に削除しておく
    mq_unlink(QUEUE_NAME);

    // キューの設定
    struct mq_attr attr;
    attr.mq_maxmsg = 3;
    attr.mq_msgsize = MSG_SIZE;

    // キューを生成して書き込みモードで開く
    mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0666, &attr);
    if (queue == -1) {
        // キューの作成に失敗
        printf("%s\n", strerror(errno));
        return -1;
    }

    while (1) {
        // 2つの整数の入力
        int first_num, second_num;

        printf("first number : ");
        scanf("%d", &first_num);
        printf("sencond number : ");
        scanf("%d", &second_num);

        // 演算子の入力
        int operation;

        printf("operation(0:+,1:-,2:*,3:/) : ");
        scanf("%d", &operation);

        if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
            // 演算子が不正ならプログラム終了
            printf("QUIT!\n");
            break;
        }

        // メッセージの生成
        MESSAGE message;
        message.operation = operation;
        message.first_num = first_num;
        message.second_num = second_num;

        // メッセージの送信
        int ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
        if (ret == -1) {
            printf("%s\n", strerror(errno));
        }
    }

    // メッセージキューを閉じて削除
    mq_close(queue);
    mq_unlink(QUEUE_NAME);
}

消費者のソースコード(consumer.c

消費者側のソースコードとなる consumer.c は下記となります。

consumer.c
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"

#define QUEUE_NAME ("/sample_queue")

int calc(MESSAGE *message) {
    int result;

    // operationに従った計算を実行
    switch(message->operation) {
    case E_SUM:
        result = message->first_num + message->second_num;
        break;
    case E_SUB:
        result = message->first_num - message->second_num;
        break;
    case E_MUL:
        result = message->first_num * message->second_num;
        break;
    case E_DIV:
        if (message->second_num == 0) {
            result = 0;
        } else {
            result = message->first_num / message->second_num;
        }
        break;
    default:
        result = 0;
        break;
    }

    return result;
}

int main(void) {

    // キューを読み取り専用モードで開く
    mqd_t queue = mq_open(QUEUE_NAME, O_RDONLY);
    if (queue == -1) {
        printf("%s\n", strerror(errno));
        return -1;
    }

    while (1) {
        // メッセージ格納先の変数
        MESSAGE message;
        
        // メッセージをデキューして取得
        ssize_t ret = mq_receive(queue, (char*)&message, sizeof(message), NULL);
        if (ret == -1) {
            printf("%s\n", strerror(errno));
        }

        // メッセージに従った処理を実行
        int result = calc(&message);
        printf("result = %d\n", result);
    }

    mq_close(queue);

}

動作確認

ここで示した producer.cconsumer.c の動作確認を行っていきたいと思います。

まずは、下記の2つのコマンドを実行して2つのソースコードをコンパイルし、実行可能ファイルとなる producer.execonsumer.exe を生成してください。

gcc producer.c -lrt -o producer.exe
gcc consumer.c -lrt -o consumer.exe

続いて、ターミナルアプリを2つ起動し、それぞれのターミナルで下記コマンドを実行して producer.execonsumer.exe をそれぞれ別のターミナルで起動します。

./producer.exe
./consumer.exe

続いて、producer.exe を起動した方のターミナルに整数を2つと演算子の指定を行います。

./producer.exe
first number : 200
sencond number : 57
operation(0:+,1:-,2:*,3:/) : 1

すると、consumer.exe を起動した方のターミナルに、producer.exe 側に指定した整数や演算子を用いた計算の結果が出力されていることが確認できると思います。

./consumer.exe
result = 143

producer.exe 側で入力した整数や演算子に基づいた計算を consumer.exe が実行できているのは、producer.exe がユーザーから入力された整数2つや演算子をメッセージとしてメッセージキューにエンキューし、さらに、そのメッセージを consumer.exe がデキューすることで 、それらの整数や演算子を取得することができているからになります。

このように、メッセージキューを利用することで、複数のプログラム間(プロセス間)でメッセージのやり取りを簡単に実現することができます。

MEMO

プログラムを終了したいときは、ターミナルで control + c をキー入力してください

ブロッキングモードの動作確認

現在の producer.cconsumer.c では mq_open の第2引数 oflagO_NONBLOCK を指定していないため、producer.exe でも consumer.exe でもメッセージキューがブロッキングモードで開かれることになっています。

ここで、このブロッキングモードの時にメッセージキューが満杯になった時の動作を確認しておきたいと思います。

ここでは先ほど gcc コマンドで生成した producer.execonsumer.exe をそのまま使います。

ということで、まず producer.exe を下記コマンドで実行してください。

./producer.exe

次は、”consumer.exe を実行していない状態” で、2つの整数と演算子の入力をそれぞれ4回分繰り返してください。

./producer.exe
first number : 1
sencond number : 2
operation(0:+,1:-,2:*,3:/) : 0
first number : 1
sencond number : 2
operation(0:+,1:-,2:*,3:/) : 1
first number : 1
sencond number : 2
operation(0:+,1:-,2:*,3:/) : 2
first number : 1
sencond number : 2
operation(0:+,1:-,2:*,3:/) : 3

3回目の入力までは、演算子を示す整数を入力すれば、再度1つ目の整数の入力受付が行われるようになっていました。ですが、4回目の演算子を示す整数の入力を行っても次の整数の入力受付が行われません。

producer.c では、演算子の入力を受け付けた後はメッセージを生成し、その後 mq_send でメッセージキューへのエンキューを行うようになっています。そして、mq_send 終了後に、次のループに移行して1つ目の整数の入力受付を行うようになっています。したがって、次の1つ目の整数の入力受付が行われないということは、mq_send が終了していないことを意味します。

この mq_send が終了しない理由はメッセージキューが既に満杯になっているからです。メッセージキューを作成する mq_open の第4引数 attr のメンバー mq_maxmsg3 に設定しているため、一度にメッセージキューに溜められるメッセージは 3 つまでとなります。さらに、 3 回分の整数や演算子の入力を行って mq_send でメッセージキューに 3 回メッセージのエンキューが行われているため、現状メッセージキューは満杯ということになります。そして、メッセージキューはブロッキングモードで開かれているので、満杯の時に mq_send を実行するとエンキューできるようになるまで mq_send は終わらないことになります。

ここで、consumer.exe を下記コマンドで実行してみましょう!

./consumer.exe

consumer.exe が実行されれば、メッセージキューからのデキューが行われることになり、メッセージキューに空きができて満杯でなくなります。 そのため、producer.exe を実行しているターミナルには、4回目の整数の入力受け受けが行われるようになったことが確認できるはずです。

first number :

こんな感じで、メッセージキューが満杯の時は mq_send 内部で待機してくれるため、プログラム開発者はメッセージキューが満杯の時の制御のコードを書かなくてもメッセージキューに空きが出来るまで待ち続けるという動作を実現できることになります。同様に、メッセージキューが空の時は、メッセージがエンキューされるまで mq_receive 内部で待機してくれます。

逆に言えば、メッセージキューが満杯の時は mq_send 内部で待機するため、mq_send を実行しているスレッドやプロセスは他の処理を行うことができません。例えば、メッセージキューが満杯であることをユーザーに通知したいとしても mq_send で待機し続けるのでそういった処理は行えません。こういった、メッセージキューが満杯であるときに他の処理を実行させたいのであればメッセージキューはノンブロッキングモードで開く必要があります。

ノンブロッキングモードのメッセージキューの利用例

ということで、次はメッセージキューをノンブロッキングモードで開いた時の動作を確認していきたいと思います。ここでは、生産者側(送信側)となる producer.c のソースコードのみを変更してメッセージキューをノンブロッキングモードで開くようにしていきます。

生産者のソースコード(producer.c

メッセージキューをノンブロッキングモードで開くように変更した生産者側のソースコード producer.c は下記となります。

producer.c
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "message.h"

// キューの名前
#define QUEUE_NAME ("/sample_queue")

// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))

int main(void) {
    // キューを事前に削除しておく
    mq_unlink(QUEUE_NAME);

    // キューの設定
    struct mq_attr attr;
    attr.mq_maxmsg = 3;
    attr.mq_msgsize = MSG_SIZE;

    // キューを生成して書き込みモードで開く
    mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT | O_NONBLOCK, 0666, &attr);
    if (queue == -1) {
        // キューの作成に失敗
        printf("%s\n", strerror(errno));
        return -1;
    }

    while (1) {
        // 2つの整数の入力
        int first_num, second_num;

        printf("first number : ");
        scanf("%d", &first_num);
        printf("sencond number : ");
        scanf("%d", &second_num);

        // 演算子の入力
        int operation;

        printf("operation(0:+,1:-,2:*,3:/) : ");
        scanf("%d", &operation);

        if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
            // 演算子が不正ならプログラム終了
            printf("QUIT!\n");
            break;
        }

        // メッセージの生成
        MESSAGE message;
        message.operation = operation;
        message.first_num = first_num;
        message.second_num = second_num;

        // メッセージの送信
        int ret = -1;
        do {
            ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
            if (ret == -1) {
                if (errno == EAGAIN) {
                    printf("Message Queue is Full\n");
                    sleep(1);
                } else {
                    printf("%s\n", strerror(errno));
                    break;
                }
            }
        } while (ret == -1);
    }

    // メッセージキューを閉じて削除
    mq_close(queue);
    mq_unlink(QUEUE_NAME);
}

上記の producer.c では mq_open 関数の第2引数に O_NONBLOCK を指定することでメッセージキューをノンブロッキングモードで開くようにしています。mq_send 関数はメッセージキューが満杯でエンキューに失敗した場合、すなわち mq_send 関数が -1 を返却し、かつ errnoEAGAIN の場合は、メッセージキューが満杯であることを printf で出力してユーザーに通知し、さらに 1 秒間スリープしたのちに再度 mq_send を実行するようにしています。そして、これらをメッセージキューへのエンキューに成功するまで繰り返すようにしています。

ノンブロッキングモードでメッセージキューを開けば、メッセージキューが満杯の場合でも mq_send が即座に終了するため、ユーザーへの通知などの他の処理をすぐに行うことが可能となります。ただし、メッセージキューに空きができるまで mq_send を繰り返し実行するような制御をプログラマー自身がコーディングする必要が出てくるため、コーディングの難易度は上がります。

動作確認

実際に、ノンブロッキングモードのメッセージキューの動作について確認してみましょう!

ブロッキングモードの動作確認 で行った手順と同じこと、すなわち producer.exe のみを実行して整数や演算子の入力を繰り返し行ってみてください。

今回は、単に入力受け受けが行われないのではなく、下記のようにメッセージキューが満杯であることを示す文字列が繰り返し出力されることが確認できるはずです。このように文字列が出力できているのは、mq_send がメッセージキュー満杯の状態でも即座に終了するようになったためです。

Message Queue is Full

consumer.exe を実行してやれば、メッセージキューの満杯が解消されて上記の文字列の出力が停止し、次の整数や演算子の入力受付が行われるようになることも確認できると思います。

スポンサーリンク

優先度を設定したメッセージキューの利用例

次は、優先度を設定したメッセージキューの利用例となるサンプルを示していきます。今回も変更するソースコードは producer.c のみで、動作確認には今まで利用してきた consumer.exe をそのまま利用します。

生産者のソースコード(producer.c

エンキューするメッセージに優先度を設定するようにした生産者側のソースコード producer.c は下記となります。

producer.c
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"

// キューの名前
#define QUEUE_NAME ("/sample_queue")

// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))

int main(void) {
    // キューを事前に削除しておく
    mq_unlink(QUEUE_NAME);

    // キューの設定
    struct mq_attr attr;
    attr.mq_maxmsg = 3;
    attr.mq_msgsize = MSG_SIZE;

    // キューを生成して書き込みモードで開く
    mqd_t queue = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0666, &attr);
    if (queue == -1) {
        // キューの作成に失敗
        printf("%s\n", strerror(errno));
        return -1;
    }

    while (1) {
        // 2つの整数の入力
        int first_num, second_num;

        printf("first number : ");
        scanf("%d", &first_num);
        printf("sencond number : ");
        scanf("%d", &second_num);

        // 演算子の入力
        int operation;

        printf("operation(0:+,1:-,2:*,3:/) : ");
        scanf("%d", &operation);

        if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
            // 演算子が不正ならプログラム終了
            printf("QUIT!\n");
            break;
        }

        // メッセージの生成
        MESSAGE message;
        message.operation = operation;
        message.first_num = first_num;
        message.second_num = second_num;

        // メッセージの送信
        int ret = mq_send(queue, (const char*)&message, sizeof(message), operation);
        if (ret == -1) {
            printf("%s\n", strerror(errno));
        }
    }

    // メッセージキューを閉じて削除
    mq_close(queue);
    mq_unlink(QUEUE_NAME);
}

基本的なメッセージキューの利用例 で示した producer.c からの変更点は、mq_send の第4引数に operation を指定するようにした点のみとなります。これにより、ユーザーから入力された演算子を示す値に応じた優先度が設定されたメッセージがメッセージキューにエンキューされるようになります。具体的には、演算子に応じて下記のようにメッセージに優先度が設定されるようになります。

  • 演算子が + : 0
  • 演算子が - : 1
  • 演算子が * : 2
  • 演算子が / : 3

つまり、メッセージキューに複数のメッセージが存在する場合、上記の優先度の高い演算子が指定されたメッセージから順に先頭側に並べられることになります。同じ優先度のメッセージは先にエンキューされたものから順に先頭側から並べられます。そして、デキュー時には先頭のメッセージから順にデキューされることになります。

動作確認

実際に、優先度を設定した場合のメッセージキューの動作について確認してみましょう!

まず、下記コマンドを実行して producer.exe を生成します。consumer.exe に関しては 基本的なメッセージキューの利用例 の動作確認時に使用したものをそのまま利用します。

gcc producer.c -lrt -o producer.exe

続いて、下記コマンドで producer.exe を実行し、続いて2つの整数の入力と演算子を示す整数の入力を3回分繰り返し行ってください。ここでは、演算子をバラバラに指定していただいた方が優先度の設定の効果が分かりやすいと思います。また、各演算の結果に違いが出るように入力を行ってください。

./producer.exe
first number : 100
sencond number : 3
operation(0:+,1:-,2:*,3:/) : 0
first number : 100
sencond number : 3
operation(0:+,1:-,2:*,3:/) : 3
first number : 100
sencond number : 3
operation(0:+,1:-,2:*,3:/) : 2
first number :

ここまでの手順により、メッセージキューに3つのメッセージがエンキューされたことになります。続いて consumer.exe を実行してみてください。

./consumer.exe
result = 33
result = 300
result = 103

consumer.exe を実行すると、上記のように producer.exe に入力した整数に基づいた計算結果が3つ出力されるはずです。ただし、その計算結果の出力順序は producer.exe に入力した順序とは異なるはずです。具体的には、割り算・掛け算・引き算・足し算の順序で計算結果が出力されているはずです。つまり、前述で紹介したメッセージの優先度に基づき、その優先度の高い塩山に対する計算結果から順に出力されているはずです。

この出力順序は mq_receive によってメッセージがデキューされた順序と一致しており、つまりは mq_receive では、メッセージキューに複数のメッセージが存在する場合、優先度の高いメッセージからデキューすることができるということを示しています。

こんな感じで、メッセージに優先度を設けて優先度の高いメッセージに対する処理から順にデキューすることが可能であるという点もメッセージキュー(特に POSIX のメッセージキュー)の特徴となりますので、この点も覚えておきましょう!

マルチスレッドでのメッセージキューの利用例

さて、ここまで生産者と消費者となるプログラムをそれぞれ別に用意し、それらを別々のプロセスとして動作させる例を示してきました。これは、いわゆるマルチプロセスによる処理の並列化の例となります。そして、これらのプロセス間での通信にメッセージキューを利用してきたことになります。

ここでは、マルチプロセスではなくマルチスレッドでのメッセージキューの利用例を示していきたいと思います。この場合、実行するプログラム(プロセス)は1つのみになりますが、内部でスレッドを複数(今回は2つ)動作させることで処理の並列化を実現することになります。1つのスレッドを生産者の役割として動作させ、もう1つのスレッドを消費者の役割として動作させていきます。そして、スレッド間の通信にメッセージキューを利用します。また、生産者と消費者の役割は今まで紹介してきたソースコードと同様になります。

マルチプロセスとマルチスレッドの違いを説明する図

マルチスレッドについては下記ページで解説していますので、詳しくしたい方は下記ページをご参照ください。

入門者向け!C言語でのマルチスレッドをわかりやすく解説

ソースコード(producer_consumer.c

マルチスレッドでのメッセージキューの利用例となるソースコード producer_consumer.c は下記となります。

producer_consumer.c
#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "message.h"

// キューの名前
#define QUEUE_NAME ("/sample_queue")

// メッセージの最大サイズ
#define MSG_SIZE (sizeof(MESSAGE))

// 各スレッドから参照できるようにグローバル変数で宣言
static mqd_t queue = -1;

int calc(MESSAGE *message) {
    int result;

    // operationに従った計算を実行
    switch(message->operation) {
    case E_SUM:
        result = message->first_num + message->second_num;
        break;
    case E_SUB:
        result = message->first_num - message->second_num;
        break;
    case E_MUL:
        result = message->first_num * message->second_num;
        break;
    case E_DIV:
        if (message->second_num == 0) {
            result = 0;
        } else {
            result = message->first_num / message->second_num;
        }
        break;
    default:
        result = 0;
        break;
    }

    return result;
}

void *consumer(void *arg) {

    while (1) {
        // メッセージ格納先の変数
        MESSAGE message;
        
        // メッセージをデキューして取得
        ssize_t ret = mq_receive(queue, (char*)&message, sizeof(message), NULL);
        if (ret == -1) {
            printf("%s\n", strerror(errno));
        }

        // メッセージに基づいて処理を実行
        int result = calc(&message);
        printf("result = %d\n", result);
    }

    return NULL;
}

int main(void) {
    // キューを事前に削除しておく
    mq_unlink(QUEUE_NAME);

    // キューの設定
    struct mq_attr attr;
    attr.mq_maxmsg = 3;
    attr.mq_msgsize = MSG_SIZE;

    // キューを生成して書き込みモードで開く
    queue = mq_open(QUEUE_NAME, O_RDWR | O_CREAT, 0666, &attr);
    if (queue == -1) {
        // キューの作成に失敗
        printf("%s\n", strerror(errno));
        return -1;
    }

    pthread_t consumer_thread;
    pthread_create(&consumer_thread, NULL, &consumer, NULL);

    while (1) {
        // 2つの整数の入力
        int first_num, second_num;

        printf("first number : ");
        scanf("%d", &first_num);
        printf("sencond number : ");
        scanf("%d", &second_num);

        // 演算子の入力
        int operation;

        printf("operation(0:+,1:-,2:*,3:/) : ");
        scanf("%d", &operation);

        if (operation != E_SUM && operation != E_SUB && operation != E_MUL && operation != E_DIV) {
            // 演算子が不正ならプログラム終了
            printf("QUIT!\n");
            break;
        }

        // メッセージの生成
        MESSAGE message;
        message.operation = operation;
        message.first_num = first_num;
        message.second_num = second_num;

        // メッセージの送信
        int ret = mq_send(queue, (const char*)&message, sizeof(message), 0);
        if (ret == -1) {
            printf("%s\n", strerror(errno));
        }
    }

    // 消費者スレッドをキャンセルして終了するまで待つ
    pthread_cancel(consumer_thread);
    pthread_join(consumer_thread, NULL);

    // メッセージキューを閉じて削除
    mq_close(queue);
    mq_unlink(QUEUE_NAME);
}

上記の producer_consumer.c においては、まずメッセージキューを main 関数の中で mq_open 関数によって生成しています。そして、その生成されたメッセージキューのディスクリプタをグローバル変数 queue に格納するようにしています。グローバル変数に格納することにより、各スレッドから queue、すなわちメッセージキューのディスクリプタを参照できるようになります。マルチプロセスの場合、ディスクリプタの共有がプロセス間で行えないため、生産者と消費者の両方のプロセスから mq_open を実行する必要がありましたが、マルチスレッドの場合はグローバル変数でディスクリプタが共有できるので、1度 mq_open でディスクリプタを取得してやれば、それを生産者と消費者の両方から参照することが出来ることになります。

メッセージキューの生成後には、pthread_create 関数を実行してスレッドの生成を行っています。このスレッドは consumer 関数を仕事とするスレッドであり、pthread_create 関数の実行によって consumer 関数が main 関数と並列して実行されることになります。つまり、main 関数での mq_send でのメッセージキューへのエンキューと、consuemr 関数での mq_receive でのメッセージキューからのデキューが並列して実行されることになります。

これらが並列動作するという点は、今まで紹介してきたマルチプロセスの場合と同様であり、つまりは以降の動作は基本的に今まで紹介してきたプログラムと同様で、それが単にマルチスレッドにより実現されているだけになります。なので、以降の動作に関しての説明は省略します。

また、これはメッセージキューには無関係な話なので補足になりますが、main 関数での演算子を示す整数の入力受付において 03 以外の値が入力された時には while ループを抜け出して main 関数を終了するようにしています。この時に、consumer 関数も終了させたいため、pthread_create で生成したスレッドを pthread_cancel で終了させるようにしています。そして、その後 consumer 関数が終了するまで pthread_join で待機し、その後はいつも通りメッセージキューのクローズと削除を行った後にプログラムが終了するようにしています。

プログラムが終了すれば pthread_create で生成したスレッドも勝手に終了しますし、consumer スレッドの返却値も参照していないため、上記のソースコードの例の場合は別に pthread_cancelptherad_join を行わなくても良いのですが、consumer スレッドを確実に終了させた後にプログラム内で他の処理を実行したいような場合や、consumer スレッドの返却値を参照する必要のある場合などは、上記のような pthread_cancelpthread_join 等の細かな制御も必須となります。

動作確認

最後に producer_consumer.c のプログラムの動作確認をしておきましょう!

まず、下記のコマンドで producer_consumer.c をコンパイルして producer_consumer.exe を生成してください。pthread 関連の関数をを利用するために -lpthread での pthread ライブラリへのリンクが必要となる点に注意してください。

gcc producer_consumer.c -lrt -lpthread -o producer_consumer.exe

次は、下記コマンドで producer_consumer.exe を実行します。

./producer_consumer.exe

producer_consumer.exe を実行したら、いつも通り整数2つと演算子の入力が促されるので、好きな整数や演算子を選んで入力してください。入力が完了すれば、計算結果が出力されることを確認できると思います。入力と出力が同じターミナルで行われるので少し見づらいですが、これは我慢してください…。

./producer_consumer.exe
first number : 100
sencond number : 3
operation(0:+,1:-,2:*,3:/) : 3
first number : result = 33

計算結果の出力を行っているのは main 関数から pthread_create により生成された consumer 関数を実行するスレッドになるため、上記のように計算結果が出力されたということは、入力された情報を main 関数を実行するスレッドからメッセージキューを介して consumer 関数を実行するスレッドが受け取ることができているからになります。

このようにマルチプロセスだけではなくマルチスレッド間での通信・メッセージのやり取りにもメッセージキューが利用できることは覚えておきましょう!

まとめ

このページではメッセージキューについて説明しました!

メッセージキューは、複数のスレッドやプロセスの間で通信・メッセージのやり取りを行うための仕組みであり、これにより、受信側は確実に送信側がエンキューした順 or 優先度の高い順でメッセージを受信することができるようになります。

これを利用することで生産者・消費者モデルを簡単に実現することができます。他にもプロセス間・スレッド間でメッセージのやりとりを行う方法はあるのですが、メッセージキューはかなり使いやすい部類の通信手段になると思いますので、並列プログラミングを利用する際はメッセージキューの採用も検討してみると良いと思います!

同じカテゴリのページ一覧を表示