IPC. Interprocess comunication - JulTob/Ada GitHub Wiki

16.21 Message Queues What are they?

Message queues are one of three IPC (Inter Process Communication) ways of System V. The others are shared memory and semaphores. Message queues are linked lists of messages maintained by the kernel. A process can set one up, disappear, and the queue still remains.

Are they usefull? Yes, they provide a fairly simple way of passing messages between processes. They are also very fast. A way of using them

We'll look at a simple case where two processes will pass a message between each other.

First we'll need a System V IPC key. Ftok generates a, almost always uniqe, key by gathering some info from a user provided file. This key is needed when we create a queue. type key_t is new integer;

function ftok( path : string; proj_id : integer ) return key_t; Convert a project id number and a pathname of an accessible file to a key that can be used by Linux's System V interprocess communication features (that is, message queues.) Example: my_queue := ftok( "./queue_file.que" & ASCII.NUL, my_proj );

Ftok proberbly means "File To Key".

Although Ada integers and C "int" types are identical, we'll use the Interfaces.C package for maximum portability.

-- C is key_t ftok(const char *pathname, int proj_id);

package C renames Interfaces.C; type Key_t is new C.Int; pragma Convention (C, Key_t); function C_Ftok(Pathname : in String; Proj : in C.Int) return Key_t; pragma Import(C,C_Ftok,"ftok");

By calling C_Ftok, with Proj greater than 0, we get a key, or -1 for error. We now wants to create a message queue, with this key. function msgget( key : key_t; flags : integer ) return integer; Return the message queue id number associated with the key. A new message queue will be created if the key has value IPC_PRIVATE or of no queue exists. The flags indicate indicate the permissions for the message queue. Example: qid := msgget( key, IPC_CREAT+8#660#;

Mesget can be called with a lot of options, but we'll go for getting an id for a queue, and if it does not exists, we create it.

int msgget(key_t key, int msgflg);

IPC_CREAT : constant C.Int := 512 ; IPC_PERMISSIONS : constant C.Int := 8#660#; function C_Msgget(Key : Key_t; Msgflg : C.Int) return C.Int; pragma Import (C, C_Msgget, "msgget");

By calling msgget with IPC_CREAT + IPC_PERMISSIONS, and the generated key, we get the identity of a message queue, that either exists, or is newly created with the corresponding permissons. The execute flag has no meaning for message queues. msgsnd msgsnd(2)

Now we want to do something, but first, have a look at 'ipcs -q'. This command lists message queues in the system.

We send a record that looks like this

  struct msgbuf {
    long    mtype;   /* message type, must be > 0 */
    char    mtext[1];        /* message data */
   };

This translates in Ada into a record with an C.long member + another member of arbitrary kind, ie a record.

type Message_Type is record
  M_Type         : C.Long := 100;
  An_Integer     : Integer;
  Anther_Integer : Integer;
end record;

function C_Send(Queue_Identity : in C.Int;
                Message        : in Message_Type) return C.Int is

type Message_Pointer_Type is access all Message_Type;

Tmp_Msg     : aliased Message_Type := Message;
Tmp_Msg_Ptr : Message_Pointer_Type := Tmp_Msg'Access;
-- All 'size are in bits. There are System.Storage_Unit bits in a byte
Local_Size : C.Int := C.Int((C.Long'Size + 2 * Integer'Size)/System.Storage_Unit);

function C_Msgsnd(Msqid : C.Int; Msgp   : Message_Pointer_Type;
                  Msgsz : C.Int; Msgflg : C.Int) return C.Int;
pragma Import (C, C_Msgsnd, "msgsnd");

begin return C_Msgsnd(Queue_Identity,Tmp_Msg_Ptr,Local_Size,0); end C_Send;

This will send a record containing 2 integers, with message type set to 100. The type can be used in receiving messages.

msgrec msgrcv(2)

When receiving from a queue, we can use fifo-order or just look at messages of a certain kind. This is determined by the Msg_Type parameter. 0 means fifo, greater than 0 means first message of that type, less than 0 means message with lowest type less than or equal to the absolute value of Msg_Type.

ssize_t msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg);

function C_Receive(Queue_Identity : in C.Int;
                   Msg_Type       : in C.Long;
                   Msg_Flag       : in C.Int := 0) return Message_Type is

  type Message_Pointer_Type is access all Message_Type;
  Receive_Failure : exception;
  function C_Msgrcv(Msqid   : C.Int; Msgpointer : Message_Pointer_Type;
                    Msgsize : C.Int; Msgtype    : C.Long;
                    Msgflag : C.Int)                 return C.Int;
  pragma Import (C, C_Msgrcv, "msgrcv");

  Tmp_Msg     : aliased Message_Type;
  Tmp_Msg_Ptr : Message_Pointer_Type := Tmp_Msg'Access;
  Result      : C.Int                := C.Int'First;
begin
  Result := C_Msgrcv(Queue_Identity, Tmp_Msg_Ptr,
                     C.Int((Message_type'Size + C.Long'Size)/System.Storage_Unit),
                     Msg_Type, Msg_Flag);
  if (Result >= 0) then
    return Tmp_Msg_Ptr.all;
  else
    raise Receive_Failure;
  end if;
end C_Receive;

msgctl msgctl(2)

With msgctl, you can examine and remove existing message queues. Putting it all together

We'll do 2 simple programs, the first sends a message to a message queue and creates the queue if it does not exists. Then the program exits. The second program retrieves a message from a queue. If the queue does not exists, it creates the queue, and blocks until a message arrives. It then prints out the sum of the fields in the record. The programs was tested with Mandrake 9 and Gnat 3.15p. 4 files are involved:

-- test_def.ads

-- a package to provide a simple message queue binding

with Interfaces.C; package Test_Def is package C renames Interfaces.C;

type Key_t is new C.Int; pragma Convention (C, Key_t);

IPC_CREAT : constant C.Int := 512 ; IPC_PERMISSIONS : constant C.Int := 8#660#;

type Message_Type is record M_Type : C.Long := 100; An_Integer : Integer; Another_Integer : Integer; end record;

Receive_Failure : exception;

function C_Ftok(Pathname : in String; Proj : in C.Int) return Key_t; pragma Import(C,C_Ftok,"ftok");

function C_Msgget(Key : Key_t; Msgflg : C.Int) return C.Int; pragma Import (C, C_Msgget, "msgget");

function C_Send(Queue_Identity : in C.Int; Message : in Message_Type) return C.Int ; function C_Receive(Queue_Identity : in C.Int; Msg_Type : in C.Long; Msg_Flag : in C.Int := 0) return Message_Type; end Test_Def;

-- test_def.adb package body

-- a package to provide a simple message queue binding

with Ada.Text_IO; with System; package body Test_Def is use C;

function C_Send(Queue_Identity : in C.Int; Message : in Message_Type) return C.Int is -- Send a message through the message queue. Wrapper function for msgsnd.

type Message_Pointer_Type is new System.Address;

Tmp_Msg     : aliased Message_Type := Message;
Tmp_Msg_Ptr : Message_Pointer_Type := Tmp_Msg'Address;
-- All 'size are in bits. There are System.Storage_Unit bits in a byte
Local_Size : C.Int := C.Int((C.Long'Size + 2 * Integer'Size)/System.Storage_Unit);

function C_Msgsnd(Msqid : C.Int; Msgp   : Message_Pointer_Type;
                  Msgsz : C.Int; Msgflg : C.Int) return C.Int;
pragma Import (C, C_Msgsnd, "msgsnd");

begin return C_Msgsnd(Queue_Identity,Tmp_Msg_Ptr,Local_Size,0); end C_Send;

function C_Receive(Queue_Identity : in C.Int; Msg_Type : in C.Long; Msg_Flag : in C.Int := 0) return Message_Type is -- Receive a message from a message queue. Wrapper function for msgrcv.

type Message_Pointer_Type is new System.Address;

function C_Msgrcv(Msqid   : C.Int; Msgpointer : Message_Pointer_Type;
                  Msgsize : C.Int; Msgtype    : C.Long;
                  Msgflag : C.Int)                return C.Int;
pragma Import (C, C_Msgrcv, "msgrcv");

Tmp_Msg     : aliased Message_Type;
Tmp_Msg_Ptr : Message_Pointer_Type := Tmp_Msg'Address;
Result      : C.Int                := C.Int'First;

begin Result := C_Msgrcv(Queue_Identity, Tmp_Msg_Ptr, C.Int((Message_type'Size + C.Long'Size)/System.Storage_Unit), Msg_Type, Msg_Flag); Ada.Text_Io.Put_Line("Lenght of message:" & C.Int'Image(Result)); if Result >= 0 then return Tmp_Msg_Ptr.all; else raise Receive_Failure; end if; end C_Receive;

end Test_Def;

-- test_send.adb

-- a program to test our message queue package

with Ada.Text_IO; with Test_Def; use Test_Def; procedure Test_Send is use C; Key : Key_T ; Q_Id : C.Int ; Message : Message_Type; Result : C.Int; begin Key := C_Ftok("/etc/profile" & Ascii.NUL,1); Q_Id := C_Msgget(Key, IPC_CREAT + IPC_PERMISSIONS);

Message.An_Integer := 40; Message.Another_Integer := 2; Result := C_Send(Q_Id,Message); Ada.Text_Io.Put_Line("Created/got hold of Key:" & Key_T'Image(Key) & " Q-id:" & C.Int'Image(Q_Id) & "Result sending:" & C.Int'Image(Result));

Ada.Text_Io.Put_Line("Check with 'ipcs -q'"); end Test_Send;

-- test_receive.adb

-- another program to test our message queue package

with Ada.Text_IO; with Test_Def; use Test_Def; procedure Test_Receive is use C; Key : Key_T ; Q_Id : C.Int ; Message : Message_Type; begin Key := C_Ftok("/etc/profile" & Ascii.NUL,1); Ada.Text_Io.Put_Line("Received with Key:" & Key_T'Image(Key) );

Q_Id := C_Msgget(Key, IPC_CREAT + IPC_PERMISSIONS); Ada.Text_Io.Put_Line("Q-id:" & C.Int'Image(Q_Id));

Message := C_Receive(Q_Id, Message.M_Type); Ada.Text_Io.Put_Line("Received with Key:" & Key_T'Image(Key) & " Q-id:" & C.Int'Image(Q_Id) & " The sum of the two fields are:" & Integer'Image(Message.An_Integer + Message.Another_Integer));

Ada.Text_Io.Put_Line("Check with 'ipcs -q'"); end Test_Receive;

Compile with gnatmake test_send.adb and gnatmake test_receive.adb. Run test_send first, check with 'ipcs -q' and then run test_receive. remove the queue with 'ipcrm'