, 4 min read
Working with System V IPC queues
There are a variety of ways to work with queues: You can purchase IBM WebSphere MQ (formerly known as MQSeries), or similar products from Oracle or Microsoft. On most Unix systems queues are already built in, as long as you do not need any functionality to cross machine boundaries, i.e., hop from one computer to the next. If you just need the queuing functionality, i.e., FIFO (first-in-first-out), IPC queues are for you. For example, handling a burst of incoming tasks/orders/messages and one by one working on them individually without the disturbing effect that something else is happening around you.
In C setting up an IPC queue is easy. First include:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/msg.h>
then get key with ftok()
, then create queue with msgget()
:
if ((key = ftok("/your/directory/goes/here",1)) < 0) {
perror("main(): ftok()");
return 13;
}
if ((msqid = msgget(key,IPC_CREAT | S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP)) < 0) {
perror("main(): msgget()");
return 14;
}
Now assume the following data structure for your message
size_t msgsz;
struct msgbuf {
long mtype;
char mtext[...];
} msg;
mtype
must be positive, but is otherwise arbitrary. mtext[]
is your actual message.
Sending messages goes like this (message is in msg
with size msgsz
):
if ((ret = msgsnd(msqid,&msg,msgsz,IPC_NOWAIT)) < 0) {
perror("main(): msgsnd()");
return 15;
}
receiving goes like this
if ((ret = msgrcv(msqid,&msg,msgsz,0,IPC_NOWAIT)) > 0) {
if (debug) {
printf("ret=%d, msg.mtype=%ld, msg.mtext=|%s|\n",
ret, msg.mtype, msg.mtext);
statmsg(msqid);
}
// do something with the message
}
Instead of IPC_NOWAIT
you can specify 0, which will block the call, if there is no message, or the queue is full.
On failure msgsnd()
and msgrcv()
return (-1)
.
statmsg()
is a function showing the metainformation on a queue using msgctl()
:
int statmsg (int msqid) {
int ret;
struct msqid_ds buf;
char stimeb[30], rtimeb[30], ctimeb[30];
if ((ret = msgctl(msqid,IPC_STAT,&buf)) != 0) {
perror("statmsg(): msgctl()");
return 1;
}
printf(
"# of messages on q = %d\n"
/* "current # bytes on q = %ld\n" */
"max # of bytes on q = %ld\n"
"pid of last msgsnd = %d\n"
"pid of last msgrcv = %d\n"
"last msgsnd time = %s"
"last msgrcv time = %s"
"last change time = %s\n",
(int)buf.msg_qnum,
/* (long)buf.__msg_cbytes, */
(long)buf.msg_qbytes,
(int)buf.msg_lspid,
(int)buf.msg_lrpid,
ctime_r(&buf.msg_stime,stimeb),
ctime_r(&buf.msg_rtime,rtimeb),
ctime_r(&buf.msg_ctime,ctimeb)
);
return 0;
}
Compare this with all the hassle you would have with WebSphere MQ. Starting queue manager, creating queues, etc. By the way, IBM WebSphere MQ employs IPC queues and semaphores itself.
Inspecting your queues in Unix goes like this:
ipcs -q
If you want to clear a queue you just read all messages with msgrcv()
, or you can simply remove the queue with ipcrm
.
Richard Stevens states some shortcomings of System V IPC in his book "Advanced Programming in the Unix Environment", chapter 14.6.4 and 14.7:
They [IPC queues] remain in the system until specifically read or deleted: by some process calling
msgrcv
ormsgctl
, by someone executing theipcrm
(1) command, or by the system being rebooted. . . . Another problem with system V IPC is that these IPC structures are not known by names in the filesystem. We can't access them and modify their properties with the functions we described in Chapters 3 and 4. Almost a dozen brand new system calls were added to the kernel to support them (msgget
,semop
,shmat
, etc.). We can't see them with anls
command, we can't remove them with therm
command, and we can't change their permissions with thechmod
command. Instead brand new commands,ipcs
(1), andipcrm
(1), were added.Since these forms of IPC don't use file descriptors, we can't use the multiplexing I/O functions with them:
select
andpoll
. This makes it harder to use more than one of these IPC structures at a time, or to use any of these IPC structures with file or device I/O. For example, we can't have a server wait for a message to be placed on one of two message queues without some form of busy-wait loop.Notice how ungracefully the removal of a message queue is handled. Since a reference count is not maintained with each message queue (as there is for open files), the removal of a queue just generates errors on the next queue operation by processes still using the queue. Semaphores handle this removal in the same fashion. Removing a file doesn't delete the file's content until the last process using the file closes it.
When we consider the problems in using message queues (Section 14.6.4), we come to the conclusion that we shouldn't use them for new applications.
Besides the above stated shortcomings, when these arguments do not matter, then System V IPC queues are easy to use and administer. In my cases there were no multiple queues and therefore no select
/poll
issue, so separating two processes, one producing, the other consuming data just works fine without great fuss. In particular if you have two processes, one producing, the other consuming, and these two processes are completely independent, i.e., one of them could be stopped, or both, and queue content should be left intact, then System V queues are a good fit.
Specifics to Perl and PHP are discussed in my post Working with System V IPC queues in Perl and PHP.