10

Distributed Message Queues

Optional Component VxFusion



10.1    Introduction

VxFusion is a lightweight, media-independent mechanism, based on VxWorks message queues, for developing distributed applications.

There are several options for distributed multiprocessing in VxWorks. The Wind River optional product VxMP allows objects to be shared, but only across shared memory. TCP/IP can be used to communicate across networks, but it is low level and not intended for real-time use. Various high-level communication mechanisms that are standard for distributed computing can be used, but they have high overheads in terms of memory usage and computation time that are not always acceptable for real-time systems. Numerous proprietary methods also have been developed, but more and more often they encounter maintenance, porting, and enhancement issues. VxFusion, however, is a standard VxWorks component that:

  • Provides a lightweight distribution mechanism based upon VxWorks message queues.

  • Provides media independence, allowing distributed systems to effectively exchange data over any transport, eliminating custom requirements for communications hardware.

  • Provides a safeguard against a single point failure resulting from one-to-many master slave dependencies. This is done by replicating a database of known objects on every node in the multi-node system.

  • Supports unicast and multicast posting of messages to objects in the system.

  • Exhibits location transparency; that is, objects can be moved seamlessly within the system without rewriting application code. Specifically, posting messages to objects occurs without regard to their location in a multi-node system.

VxFusion is similar to the VxMP shared memory objects option. VxMP adds to the basic VxWorks message queue functionality support for sharing message queues, semaphores, and memory allocation over shared memory. VxFusion adds to VxWorks support for sharing message queues over any transport, as well as multicasting to message queue groups. Unlike the VxMP shared memory option, VxFusion does not support distributed semaphores or distributed memory allocation.


*      
CAUTION: When a distributed queue is created in one host, this information is broadcast to the other host through the name database. If the host where the queue was created crashes, there is no easy way for the other host to find out this information. Thus, the other host might be pending on a receive forever. It is up to the user to provide ways to detect remote nodes crashes and update the database accordingly.



10.2    Configuring VxWorks with VxFusion

Configure VxWorks with the INCLUDE_VXFUSION component to provide basic VxFusion functionality. To configure VxFusion show routines, also include the following components in the kernel domain:

INCLUDE_VXFUSION_DIST_MSG_Q_SHOW

INCLUDE_VXFUSION_GRP_MSG_Q_SHOW

INCLUDE_VXFUSION_DIST_NAME_DB_SHOW

INCLUDE_VXFUSION_IF_SHOW


*      
NOTE: VxFusion cannot be configured with targets that do not support Ethernet broadcasting, such as the VxSim target simulator.



10.3    Using VxFusion

VxFusion adds two types of distributed objects to the standard VxWorks message queue functionality:

  • distributed message queues

  • group message queues

Distributed message queues can be shared over any communications medium. Group message queues are virtual message queues that receive a message, then send it out to all message queue members of the group. VxFusion implements a set of routines that offer fine-tuned control over the timing of distributed message queue operations. However, many of the API calls used to manipulate standard message queues also work with distributed objects, making the porting of your message-queue-based application to VxFusion easy.

VxFusion also provides a distributed name database. The distributed name database is used to share data between applications. The API for the distributed name database is similar to the API for the shared name database of VxMP. See the entry for distNameLib in the VxWorks API Reference.

This section discusses system architecture and initialization, configuring VxFusion, working with the various components, and writing an adapter.

10.3.1   VxFusion System Architecture

A typical VxFusion system consists of two or more nodes connected by a communications path, as shown in Figure 10-1. The communications path is known as the transport, which can be communications hardware, like an Ethernet interface card or a bus, or it can be a software interface to communications hardware, like a driver or protocol stack. Most often the transport is a software interface; rarely is it useful or essential to communicate directly with the communications hardware.

There are many possible transports and, thus, many possible different APIs, addressing schemes, and so on; therefore, VxFusion requires a piece of software, called an adapter, to be placed between itself and the transport. The adapter provides a uniform interface to VxFusion, regardless of the underlying transport. In this way, an adapter is similar to a driver. And, as with drivers, a new adapter must be written for each type of transport VxFusion supports. The VxFusion component supplies a sample UDP adapter to be used as is or as a guide for writing new adapters (see 10.3.7 Working with Adapters).

Figure 10-1:   Example VxFusion System

Figure 10-1 illustrates an example two-node VxFusion system. VxFusion has been installed on each of the two nodes that are connected to the same subnet of an Ethernet LAN. Because the two nodes are connected by Ethernet, TCP, UDP, IP, and raw Ethernet are all possible transports for communications.

Figure 10-2:   VxFusion Components

In Figure 10-1, the UDP protocol serves as the transport and the supplied UDP adapter as the adapter.

Services and Databases

VxFusion is actually made up of a number of services and databases, as shown in Figure 10-2. The VxFusion databases and services are listed in Table 10-1 and Table 10-2, respectively (the term telegram is used in these tables; see 10.6 Telegrams and Messages). Each service runs as a separate task; you see these tasks identified whenever you list tasks on a node with VxFusion installed and running.Note that you need not be aware of these services and databases to use VxFusion.     

Table 10-1:   VxFusion Databases   


Database
Description

Distributed Name Database
The distributed name database consists of name-value- type entries (see 10.3.4 Working with the Distributed Name Database). It has copies on every node in the system, making entries available to tasks on any node.
Distributed Group Database
The distributed group database maintains the list of distributed message queue groups and their locally added members.
Distributed Node Database
The distributed node database maintains the list of all other nodes in the system along with their status.

     

Table 10-2:   VxFusion Services   


Service
Description

Distributed Message Queue Service
Handles distributed message queue telegrams from remote nodes.
Group Message Queue Service
Handles group message queue telegrams from remote nodes.
Distributed Name Database Service
Handles distributed name database telegrams from remote nodes.
Distributed Group Database Service
Handles distributed message queue group database telegrams from remote nodes.
Incorporation Service
Handles incorporation messages from remote nodes. Incorporation messages are used to signal and acknowledge the start and end of database updates.
Group Agreement Protocol (GAP) Service
Handles GAP messages. GAP messages are sent between nodes to choose a unique ID for groups.

  

Libraries

VxFusion provides the following software libraries:

a distributed name database

distributed message queues

group message queues

telegram buffers

distributed objects statistics

VxFusion adapter interface

VxFusion network layer

10.3.2   VxFusion Initialization

When you boot a VxWorks image that has VxFusion enabled, the usrVxFusionInit( ) routine found in usrVxFusion.c calls the VxFusion initialization routine, distInit( ), to start VxFusion. The distInit( ) routine initializes VxFusion on the current node. VxFusion must be installed on each node in a system for applications on those nodes to be able to communicate. Parameters of distInit( ) are set to their default values. The distInit( ) routine performs the following basic operations:

  • Initializes the local services and databases.

  • Identifies the other VxFusion nodes in the system and determines their status.

  • If there are other nodes in the system, updates the local databases using data from one of the other nodes.

Because distInit( ) is called automatically when a target boots, if your VxWorks image has VxFusion included, you should not call this routine directly from the user application.

10.3.3   Configuring VxFusion

You can configure various aspects of VxFusion--initialization, run-time activities, and the adapter interface--using one of the following methods:

  • usrVxFusionInit( ) to modify features set at initialization.

  • distCtl( ) to control run-time behavior.

  • DIST_IF structures to tune adapter interfaces.

Each method for customizing is described further in this section.

Customizing with usrVxFusionInit( )

This routine invokes the VxFusion initialization routine distInit( ). It is customizable and can be used to modify distInit( ) configuration parameters that control initialization of your VxFusion environment. Table 10-3 describes the numerous configurable arguments. For more information on distInit( ), see the entry in the VxWorks API Reference.

Table 10-3:   Configuration Parameters Modified Within usrVxFusionInit( )   


Parameter/Definition
Default Value
Description

myNodeId
node ID
IP address of booting interface
Specify a unique ID for a node. Each node in a VxFusion system must have a unique node ID. By default, the usrVxFusionInit( ) code uses the IP address of the booting interface as the node ID. The usrVxFusionInit( ) routine provides this as the first argument to the distInit( ) routine.
ifInitRtn
adapter-specific initialization routine
distUdpInit( )
Specify the initialization routine of the interface adapter to be used. By default, the usrVxFusionInit( ) routine specifies the initialization routine of the UDP adapter.
pIfInitConf
adapter-specific configuration structure
booting interface
Provide any additional adapter-specific configuration information to the adapter. For example, the usrVxFusionInit( ) routine provides the UDP adapter with the interface over which the node was booted.
maxTBufsLog2
maximum number of TBufs
9 (512 log 2)
Specify the maximum number of telegram buffers to create. The usrVxFusionInit( ) routine must provide this parameter in log 2 form, that is, if the maximum is 512, the parameter must be 9.
maxNodesLog2
maximum number of nodes in the distributed node database
5 (32 log 2)
Specify the maximum number of nodes in the distributed node database. The usrVxFusionInit( ) routine must provide this parameter in log 2 form.
maxQueuesLog2
maximum number of queues on node
7 (128 log 2)
Specify the maximum number of distributed message queues that can be created on a single node. The usrVxFusionInit( ) routine must provide this parameter in log 2 form.
maxGroupsLog2
maximum number of groups in the distributed group database
6 (64 log 2)
Specify the maximum number of group message queues that can be created in the distributed group database. The usrVxFusionInit( ) routine must provide this parameter in log 2 form.
maxNamesLog2
maximum number of entries in the distributed name database
8 (256 log 2)
Specify the maximum number of entries that can be stored in the distributed name database. The usrVxFusionInit( ) routine must provide this parameter in log 2 form.
waitNTicks
maximum number of clock ticks to wait
2401
Specify the number of clock ticks to wait for responses from other nodes at startup.

1:  4*sysClkRateGet( ) is typically 240, but not always. The defaults are defined in vxfusion/distLib.h and can be changed by the user, if desired.


*      
NOTE: If distInit( ) fails, it returns ERROR, and VxFusion is not started on the host.

Customizing with distCtl( )

This routine performs a distributed objects control function. Use distCtl( ) to configure run-time-related parameters and hooks listed in Table 10-4. For more information about available control functions using distCtl( ), see the entry in the VxWorks API Reference.

Table 10-4:   Configuration Parameters Modified Using distCtl( )   


Parameter or Hook
Default Value
Description

DIST_CTL_LOG_HOOK
NULL
Set a routine to be called each time a log message is produced. If no log hook is set, the log output is printed to standard output.
DIST_CTL_PANIC_HOOK
NULL
Set a routine to be called when the system panics due to an unrecoverable error. If no panic hook is set, the output is printed to standard output.
DIST_CTL_RETRY_TIMEOUT
200ms
Set the initial send retry timeout. Although the default timeout is shown in milliseconds, the retry timeout is actually set using clock ticks. Timeout values are rounded down to the nearest 200 ms.
DIST_CTL_MAX_RETRIES
5
Set the limit for the number of retries when sending fails.
DIST_CTL_NACK_SUPPORT
TRUE
Enable or disable the sending of negative acknowledgments (NACK).
DIST_CTL_PGGYBAK_UNICST_SUPPORT
FALSE
Enable or disable unicast piggy-backing.
DIST_CTL_PGGYBAK_BRDCST_SUPPORT
FALSE
Enable or disable broadcast piggy-backing.
DIST_CTL_OPERATIONAL_HOOK
NULL
Add to a list of routines to be called each time a node shifts to the operational state. Up to 8 routines can be added.
DIST_CTL_CRASHED_HOOK
NULL
Add to a list of routines to be called each time a node shifts to the crashed state. The list can hold a maximum of 8 routines; however, one space is used by VxFusion, leaving space for only 7 user-specified routines to be added.
DIST_CTL_SERVICE_HOOK
NULL
Set a routine to be called each time a service fails on a node, for a service invoked by a remote node.
DIST_CTL_SERVICE_CONF
service-
specific
Set the task priority and network priority of the service.


*      
NOTE: DIST_CTL_CRASHED_HOOK should always be used with distCtl( ), as it is the only way that VxFusion can provide notification that a node has crashed.

Customizing with DIST_IF Structures

Table 10-5 lists configurable fields of the DIST_IF structure, which is used to pass data about the adapter to VxFusion. It is the DIST_IF structure that gives VxFusion transport independence. For more information on the DIST_IF structure, see 10.7.2 Writing an Initialization Routine.

Table 10-5:   Configuration Parameters Modified Within Interface Adapters   


DIST_IF Field/
Definition
Default Value
Description

distIfName
interface adapter name
adapter-specific
UDP adapter: "UDP adapter"
Specifies the name of the interface adapter.
distIfMTU
MTU size
adapter-specific
UDP adapter: 1500 bytes
Specifies the MTU size of the interface adapter protocol.
distIfHdrSize
network header size
adapter-specific
UDP adapter: size of NET_HDR
Specifies the network header size.
distIfBroadcastAddr
broadcast address
adapter-specific
UDP adapter: IP broadcast address for subnet
Specifies the broadcast address for the interface and transport that broadcasts are to be sent on.
distIfRngBufSz
ring buffer size
adapter-specific
UDP adapter: 256
Specifies the number of elements to use in the sliding window protocol.
distIfMaxFrags
maximum number of fragments
adapter-specific
UDP adapter: 10
Specifies the maximum number of fragments into which a message can be broken.

10.3.4   Working with the Distributed Name Database

The distributed name database allows the association of any value to any name, such as a distributed message queue's ID with a unique name. The distributed name database provides name-to-value-and-type and value-and-type-to-name translation, allowing entries in the database to be accessed either by name or value and type.

Each node in a system has a copy of the distributed name database. Any modifications made to a local copy of the database are immediately sent to all other copies on all other nodes in the system.

Typically, the task that wants to share a value adds a name-value-type entry into the distributed name database. When adding the entry to the database, the task associates the value with a unique, specified name. Tasks on different nodes use this name to get the associated value.

Consider the example in Figure 10-3, which shows how two tasks on different nodes share a common distributed message queue ID.  

Figure 10-3:   Using the Distributed Name Database

  

Task t1 on Node 1 creates a message queue. The distributed message queue ID is returned by the creation routine. Task t1 adds the ID and its associated name, myObj, to the distributed name database. This database entry is then broadcast to all nodes in the system. For task t2 on Node 2 to send a message to this distributed message queue, it first finds the ID by looking up the name myObj in Node 2's local copy of the distributed name database.

However, if node 2 does not receive the broadcast--because, for example, the network is down--the information on the two nodes does not match, and Node 2 is not aware of Q1. (This is an example of a case in which DIST_CTL_CRASHED_HOOK could be used with distCtl( ) for notification; see Table 10-4.)

Table 10-6 lists the distributed name database service routines. The distributed name database may contain floating point values because they invoke printf( ) to print them. Any task calling distNameShow( ) should set the VX_FP_TASK task option set.

Additional information about adding a name to the distributed names database and about related show routines is provided in this section. For detailed information about all of these routines, see the entries in the VxWorks API Reference.

Table 10-6:   Distributed Name Database Service Routines   


Routine
Functionality

distNameAdd( )
Adds a name to the distributed name database.
distNameFind( )
Finds a distributed object by name.
distNameFindByValueAndType( )
Finds a distributed object by value and type.
distNameRemove( )
Removes an object from the distributed name database.
distNameShow( )
Displays the entire contents of the distributed name database to the standard output device.
distNameFilterShow( )
Displays all entries in the database of a specified type.

Adding a Name to the Distributed Name Database

Use distNameAdd( ) to add a name-value-type entry into the distributed name database. The type can be user defined or pre-defined in distNameLib.h.


*      
NOTE: The distributed name database service routines automatically convert to or from network-byte order for the pre-defined types only. Do not call htnol( ) or ntohl( ) explicitly for values of pre-defined types from the distributed name database.

Table 10-7 lists the pre-defined types.

Table 10-7:   Distributed Name Database Types    


Constant
Decimal Value
Purpose

T_DIST_MSG_Q
0
distributed message queue identifier (also group ID)
T_DIST_NODE
16
node identifier
T_DIST_UINT8
64
8-bit unsigned integer
T_DIST_UINT16
65
16-bit unsigned integer
T_DIST_UINT32
66
32-bit unsigned integer
T_DIST_UINT64
67
64-bit unsigned integer
T_DIST_FLOAT
68
single-precision floating-point number (32-bit)
T_DIST_DOUBLE
69
double-precision floating-point number (64-bit)
user-defined types
4096
and above
user-defined types

The value bound to a particular name can be updated by simply calling distNameAdd( ) another time with a new value.

Displaying Distributed Name Database Information

There are two routines for displaying data from the distributed name database: distNameShow( ) and distNameFilterShow( ).


*      
CAUTION: The distributed name database provided by vxFusion may contain floating point values. The distNameShow( ) routine invokes printf( ) to print them. Any task calling distNameShow( ) should set the VX_FP_TASK task option. The target shell has this option set.

The distNameShow( ) routine displays the entire contents of the distributed name database to the standard output device. The following demonstrates use of distNameShow( ); the output is sent to the standard output device:

[VxKernel]-> distNameShow()
        NAME              TYPE               VALUE 
-------------------- -------------- ------------------------- 
nile                   T_DIST_NODE 0x930b2617 (2466981399) 
columbia               T_DIST_NODE 0x930b2616 (2466981398) 
dmq-01                 T_DIST_MSG_Q 0x3ff9fb 
dmq-02                 T_DIST_MSG_Q 0x3ff98b 
dmq-03                 T_DIST_MSG_Q 0x3ff94b 
dmq-04                 T_DIST_MSG_Q 0x3ff8db 
dmq-05                 T_DIST_MSG_Q 0x3ff89b 
gData                  4096 0x48 0x65 0x6c 0x6c 0x6f 0x00  
gCount                 T_DIST_UINT32 0x2d (45) 
grp1                   T_DIST_MSG_Q 0x3ff9bb 
grp2                   T_DIST_MSG_Q 0x3ff90b 
value = 0 = 0x0

The distNameFilterShow( ) routine displays the contents of the distributed name database filtered by type. That is, it displays only the entries in the database that match the specified type. The following output illustrates use of distNameFilterShow( ) to display only the message queue IDs:

[VxKernel]-> distNameFilterShow(0)
        NAME              TYPE               VALUE 
-------------------- -------------- ------------------------- 
dmq-01                 T_DIST_MSG_Q 0x3ff9fb 
dmq-02                 T_DIST_MSG_Q 0x3ff98b 
dmq-03                 T_DIST_MSG_Q 0x3ff94b 
dmq-04                 T_DIST_MSG_Q 0x3ff8db 
dmq-05                 T_DIST_MSG_Q 0x3ff89b 
grp1                   T_DIST_MSG_Q 0x3ff9bb 
grp2                   T_DIST_MSG_Q 0x3ff90b 
value = 0 = 0x0 

10.3.5   Working with Distributed Message Queues

Distributed message queues are message queues that can be operated on transparently by both local and remote tasks. Table 10-8 lists the routines used to control distributed message queues.

Table 10-8:   Distributed Message Queue Routines   


Routines
Functionality

msgQDistCreate( )
Creates a distributed message queue.
msgQDistSend( )
Sends a message to a distributed message queue.
msgQDistReceive( )
Receives a message from a distributed message queue.
msgQDistNumMsgs( )
Gets the number of messages queued to a distributed message queue.

A distributed message queue must be created using the msgQDistCreate( ) routine. It physically resides on the node that instigated the create call.

Once created, a distributed message queue can be operated on by the standard message queue routines provided by msgQLib, which are msgQSend( ), msgQReceive( ), msgQNumMsgs( ), msgQDelete( ), and msgQShow( ). This newly created distributed message queue is not available to remote nodes until the local node uses distNameAdd( ) to add the ID to the distributed name database.

When using the standard message queue routines on a distributed message queue, the timeout argument specifies the amount of time to wait at the remote message queue only--there is no mechanism for indicating transmission time between nodes. When using the send, receive, and number-of-messages routines designed specifically for distributed message queues (msgQDistSend( ), msgQDistReceive( ), and msgQDistNumMsgs( )), you can take advantage of an additional timeout parameter, overallTimeout, that accounts for the transmission time as well.


*      
NOTE: For this release, you cannot delete a distributed message queue. There is no msgQDistDelete( ) routine, and a call to msgQDelete( ) with a distributed message queue ID always returns an error.

Figure 10-4 illustrates send and receive operations.

Figure 10-4:   Sending to and Receiving from a Remote Distributed Message Queue

However, before send and receive operations can occur, a task on Node 1 must have created a distributed message queue. The remote message queue Q1 has been previously created by task t1 on Node 1 with a call to msgQDistCreate( ). Q1 was then added to the distributed name database with a call to distNameAdd( ). Tasks t2 and t4 have also previously obtained the remote message queue ID for Q1 from the distributed name database using the distNameFind( ) routine. With this data, task t2 on Node 2 can send a message to Q1, using either the standard msgQSend( ) routine or the VxFusion-specific msgQDistSend( ) routine. Similarly, task t4 on Node 3 can receive a message from Q1 using the standard msgQReceive( ) routine or the VxFusion-specific msgQDistReceive( ) routine.

For detailed information about distributed message queue routines, see the entries in the VxWorks API Reference.

Sending Limitations

Local sending--that is, send actions on a single node--occurs in the same manner for distributed message queues as for standard message queues, and, therefore, is not discussed in greater detail in this manual. However, sending messages to remote message queues using the msgQDistSend( ) routine can have different outcomes, depending on the value specified for the timeout arguments. Figure 10-5 presents three examples of messages being sent to a remote node. There are two threads of execution: the local node waits for the status of the send action, and the remote node waits to place the data in the distributed message queue. Both threads are controlled with timeouts, if msgQDistSend( ) is used.  

Figure 10-5:   Sending Scenarios

 

The timeout on the remote side is the msgQTimeout argument, the number of ticks to wait at the remote message queue. The local timeout is the overallTimeout argument, the number of ticks to wait overall, including the transmission time.

In Example A, no timeout occurs before the send action completes and the status is returned. Thus, the local node receives the OK and knows that the message was received. In B and C, one of the timeouts expires, the send routine returns ERROR, and the errno variable is set to indicate a timeout.

In B, the local node is aware of the timeout; however, in C, the local node times out before the status is received. In this case, the local node does not know whether or not the send completed. In Example C, the message has been added to the remote queue, even though the local operation failed, and the two nodes have different views of the state of the system. To avoid this problem, set overallTimeout to a value great enough that the status is always received.

Using msgQSend( ) prevents a situation like Example C from occurring because msgQSend( ) waits forever for a response from the remote side.

For limitations on sending messages to nodes that are unavailable, see Detecting Absent Receiving Nodes.

Receiving Limitations

As is the case for local sending of messages, local receiving occurs in the same manner for distributed message queues as for standard message queues.

Figure 10-6:   Receiving Scenarios

As with the scenarios in Sending Limitations, different outcomes can result when receiving messages from remote message queues using the msgQDistReceive( ) routine, depending on the value specified for the timeout arguments. Figure 10-6 presents three examples of messages being received by a node. There are two threads of execution: the local node waits for data to be received, and the remote node waits for data to arrive at the distributed message queue. Both threads are controlled with timeouts.

The timeout on the remote side is specified by the msgQTimeout argument, the number of ticks to wait at the remote message queue. The local timeout is specified by the overallTimeout argument, the number of ticks to wait overall, including the transmission time.

Example A illustrates a successful receive with no timeouts. A request to receive a message from a message queue is sent to the remote side and the result is returned before either thread experiences a timeout.

In B, the remote side experiences a timeout, but a status response is returned to the local node before the overall timeout expires. The receive routine returns an error and the errno variable is set to indicate a timeout. Both sides know that the receive failed and have the same view of the state of the remote message queue.

In C, the local node tries to receive a message from the remote node, but the overallTimeout expires before a response arrives. The local and remote sides end up with different views of the state of the remote message queue because, although the message was successfully removed from the queue on the remote side, the local side thinks the operation failed.

To avoid this problem, set the overallTimeout argument to a value great enough that the reply from the remote side is always received; or use msgQReceive( ), because it waits forever for a response from the remote side.

Displaying the Contents of Distributed Message Queues

To display the contents of a distributed message queue, use the standard message queue routine msgQShow( ).

The following example shows msgQShow( ) output for a local distributed message queue:

[VxKernel]-> msgQShow 0xffe47f
Message Queue Id    : 0xffe47f     
Global unique Id    : 0x930b267b:fe         
Type                : queue 
Home Node           : 0x930b267b 
Mapped to           : 0xea74d0     
 
Message Queue Id    : 0xea74d0     
Task Queueing       : FIFO       
Message Byte Len    : 1024       
Messages Max        : 100        
Messages Queued     : 0          
Receivers Blocked   : 0          
Send Timeouts       : 0          
Receive Timeouts    : 0          
value = 0 = 0x0

The following example shows output for the same queue but from a different machine:

[VxKernel]-> msgQShow 0x3ff9b7
Message Queue Id    : 0x3ff9b7     
Global unique Id    : 0x930b267b:fe         
Type                : remote queue 
Home Node           : 0x930b267b 
value = 0 = 0x0

10.3.6   Working with Group Message Queues

VxFusion uses group message queues to support the multicasting of messages to a number of distributed message queues. A group message queue is a virtual message queue that takes any message sent to it and sends it to all member queues.

Table 10-9 lists the routines available to handle distributed group message queues.

Table 10-9:   Distributed Group Message Queue Routines   


Routines
Functionality

msgQDistGrpAdd( )
Adds a distributed message queue to a group.
msgQDistGrpDelete( )
Removes a message queue from a group.
msgQDistGrpShow( )
Displays information about a group message queue.

Group message queues are created and added to by the routine msgQDistGrpAdd( ). If a group message queue does not already exist when msgQDistGrpAdd( ) is called, one is created, and the specified queue becomes the first member of the group. If a group message queue already exists, then the specified distributed message queue is simply added as a member. The msgQDistGrpAdd( ) routine always returns the ID of the group message queue.

If you want a distributed message queue to belong to more than one group, you must call msgQDistGrpAdd( ) to assert each additional membership.

Only sending to and displaying group message queues is supported. It is an error to try to receive from a group message queue or query it for the number of messages.


*      
NOTE: Although there is a msgQDistGrpDelete( ) routine, a distributed message queue cannot be deleted from a group message queue. The msgQDistGrpDelete( ) command will always return ERROR.

Information about all the group message queues that have been created in the system and their locally added members is stored in the local copy of the distributed group database. The msgQDistGrpShow( ) routine displays either all of the groups in the distributed group database along with their locally added members or a specific group and its locally added members. For more information on using msgQDistGrpShow( ), see Displaying Information About Distributed Group Message Queues.

Figure 10-7:   Group Message Queue

Consider the example in Figure 10-7. The distributed message queues Q1, Q3, Q4, Q5, and Q6 have all been previously created by calls to msgQDistCreate( ). They have been added to the distributed name database by calls to distNameAdd( ). The same tasks that created the message queues also added Q3, Q4, and Q5 to the group message queue Q2 by calling msgQDistGrpAdd( ) for each new member. Message queues Q1 and Q6 have not been added to the Q2 group. The first call to msgQDistGrpAdd( ) creates Q2 as an entry in each node's distributed group database. (In Figure 10-7, the three databases--distributed node, name, and group--are symbolized by three cylinders, the front one of which represents the group database and shows the Q2 entry.)


*      
NOTE: The distributed message queue members of Q2 did not have to be added to the group by the task that created them. In fact, any task can add any distributed message queue to a group, as long as the ID for that queue is known locally or is available from the distributed name database.

In Figure 10-7, with group message queue Q2 established, when task t1 sends a message to the group Q2, the message is sent to all nodes in the system. Each node uses the distributed group database to identify group members and forwards the message to them. In this example, the message is sent to members Q3, Q4, and Q5, but not to non-members Q1 and Q6.

For detailed information about distributed group message queues, see the related entries in the VxWorks API Reference.

Displaying Information About Distributed Group Message Queues

The msgQDistGrpShow( ) routine displays either all of the groups in the group database along with their locally added members or a specific group and its locally added members.

The following output demonstrates the use of msgQDistGrpShow( ) with no arguments:

[VxKernel]-> msgQDistGrpShow(0)
NAME OF GROUP         GROUP ID   STATE  MEMBER ID TYPE OF MEMBER 
------------------- ---------- ------- ---------- --------------------------- 
grp1                  0x3ff9e3  global   0x3ff98b distributed msg queue 
                                         0x3ff9fb distributed msg queue 
grp2                  0x3ff933  global   0x3ff89b distributed msg queue 
                                         0x3ff8db distributed msg queue 
                                         0x3ff94b distributed msg queue 
value = 0 = 0x0

The following call demonstrates the use of msgQDistGrpShow( ) with the string "grp1" as the argument:

[VxKernel]-> msgQDistGrpShow("grp1")
NAME OF GROUP         GROUP ID   STATE  MEMBER ID TYPE OF MEMBER 
------------------- ---------- ------- ---------- --------------------------- 
grp1                  0x3ff9e3  global   0x3ff98b distributed msg queue 
                                         0x3ff9fb distributed msg queue 
value = 0 = 0x0

10.3.7   Working with Adapters

Adapters provide a uniform interface to VxFusion, regardless of the particular transport used. Table 10-10 presents the only API call related to adapters, distIfShow( ).

Table 10-10:   Adapter Routines   


Routine
Functionality

distIfShow( )
Displays information about the installed interface adapter.

For information on how to write your own VxFusion adapters, see 10.7 Designing Adapters. For detailed information about adapters, see the related entries in the VxWorks API Reference.

The following example demonstrates the use of distIfShow( ):

[VxKernel]-> distIfShow
Interface Name                 : "UDP adapter" 
MTU                            : 1500 
Network Header Size            : 14 
SWP Buffer                     : 32 
Maximum Number of Fragments    : 10 
Maximum Length of Packet       : 14860 
Broadcast Address              : 0x930b26ff 
Telegrams received             : 23 
Telegrams received for sending : 62 
Incoming Telegrams discarded   : 0 
Outgoing Telegrams discarded   : 0

To learn how to change the installed interface adapter or to modify its values, see 10.3.3 Configuring VxFusion.



10.4    System Limitations

Interrupt Service Routine Restrictions

Unlike standard message queues, distributed objects cannot be used at interrupt level. No routines that use distributed objects can be called from ISRs. An ISR is dedicated to handle time-critical processing associated with an external event; therefore, using distributed objects at interrupt time is not appropriate. On a multiprocessor system, run event-related time-critical processing on the CPU where the time-related interrupt occurred.

Detecting Absent Receiving Nodes

When a distributed message queue is created on one node, the other nodes are informed of its creation. However, if the node on which the message queue was created either crashes or is rebooted, there is no simple way for other nodes to detect the loss of the message queue. The entry in the name database is not modified, even if the system recreates the queue when it reboots. As a consequence, the other nodes might use an invalid message queue ID and pend on a receive notification that will never be sent. It is, therefore, the responsibility of the application on the receiving node to set timeouts when more data is expected.



10.5    Node Startup

The VxFusion system is designed to support the addition of new nodes at run-time, as well as to survive the failure of one or more nodes. The ability to add new nodes at run-time is made possible by the node startup (incorporation) process. The ability to survive the failure of one or more nodes is made possible by replicated databases that are populated during the node startup process. This section discusses node startup in detail.

Table 10-11:   Node Startup States   


State
Activities

Booting
Identify other nodes in the system and determine the "godfather."
Network
Update databases from the "godfather."
Operational
Notify the other nodes that the node is up and running.

Table 10-11 lists its three states, each of which is described in detail in this section. Figure 10-8 illustrates the node startup process. (To simplify Figure 10-8, it does not show the sending of acknowledgments.)

Figure 10-8:   Starting Up the System

Booting State

When VxFusion is first initialized on a node, it broadcasts a bootstrap request message (BOOTSTRAP_REQ), which is used to locate other active VxFusion nodes on the network. Nodes that receive the bootstrap request message respond with a bootstrap acknowledgment message (BOOTSTRAP_ACK).

The node that sends the first bootstrap acknowledgment response received by another node becomes the godfather for the local node. In Figure 10-8, Node 1 is the godfather because its bootstrap acknowledgment is received first. The purpose of the godfather is to help the local node update its databases. If no response is received within a specified period of time, the node assumes it is the first node to come up on the network.

As soon as a godfather is located or as soon as the assumption is made that a node is first in the network, the node shifts from the booting state to the network state.

Network State

Once a godfather is located, the local node asks the godfather to update its databases by sending an incorporation request message (INCO_REQ). The godfather updates the local node's name and group databases. These updates are indicated by the GRP_DB_ADD and NAME_DB_ADD arrows in Figure 10-8. The godfather tells the receiving node that it is finished updating the databases by sending an incorporation done message (INCO_DONE).

Once the database updates have completed, the node moves into the operational state. If there is no godfather, the node moves directly from the booting state to the operational state.

Operational State

When a node moves into the operational state, VxFusion is fully initialized and running on it. The node broadcasts the "up now" incorporation message (INCO_UPNOW) to tell the other nodes in the system that it is now active.



10.6    Telegrams and Messages

10.6.1   Telegrams Versus Messages

Because VxFusion is sending data over a network, the total byte size of a single message may be too great to transmit as a single unit. It may have to be broken into smaller segments. These VxFusion message segments are called telegrams. Telegrams are the largest packets that can be sent between nodes.

A telegram is an atomic transmission on the transport of a certain number of bytes. The MTU size of the transport, which is provided to VxFusion by the adapter at initialization time, defines the size of the telegram.

When you send a message, the system segments it into one or more telegrams. Figure 10-9 shows a telegram and its parts:

Figure 10-9:   A Telegram

Protocol Header

The transport defines the protocol header, which it builds from values provided by the adapter. The contents of this header vary from protocol to protocol, but may include fields such as source address, destination address, and priority, if the transport supports priority.

Network Header

The adapter defines and builds the network header. See 10.7.1 Designing the Network Header for a detailed description of the network header and its fields.

Service Header

VxFusion defines and builds the service header. The service header is a small header that identifies the internal service sending the message and the message type.

Service Data

The service data is the data being sent. When sending a message to a remote message queue or a message queue group, this data can be either the entire message or a fragment of the message to be sent. It is a fragment of the message if the message size exceeds the space allocated in the telegram for service data.

Because large numbers of telegrams can be necessary when working with transports having small MTU sizes, VxFusion does not acknowledge individual telegrams. Instead, only whole messages are acknowledged. In the event of a transmission error or if a telegram is lost, the whole message must be re-transmitted.

10.6.2   Telegram Buffers

When VxFusion needs to send a message to a remote node, it fragments the message into telegram buffers allocated from a pool of pre-allocated telegram buffers. Figure 10-10 shows a telegram buffer and its parts.

VxFusion sends one telegram buffer at a time to the adapter, which constructs a corresponding telegram from the network header, service header, and service data in the buffer. Finally, the adapter sends out the telegram.

On the receiving node, when the adapter receives a telegram, it is responsible for reconstructing the telegram buffer from the telegram. After reconstructing the telegram buffer, the adapter sends it to VxFusion.  

Figure 10-10:   A Telegram Buffer

  

The DIST_TBUF structure member, pTBufData, contains the address of the start of the service header. To access the network header, subtract the size of the network header from this address.

In order to reconstruct a telegram buffer at the remote node, some telegram buffer fields must be copied into the network header by the adapter before sending the telegram. For a list of the fields that must be copied into the network header, see 10.7.1 Designing the Network Header.



10.7    Designing Adapters

This section describes how to write adapters for the VxFusion component.

An adapter is a software mechanism that facilitates communication between VxFusion nodes. An adapter sits between VxFusion and the single communications transport, as shown in Figure 10-1. A transport can be a high-level communications protocol, such as UDP, or a low-level communications or bus driver, such as an Ethernet driver. When a message is sent to a remote node, VxFusion passes a telegram buffer to the adapter, and the adapter sends data to the remote node by way of the supported transport. Similarly, when a message is received from a remote node, an adapter reconstructs a telegram buffer from the incoming data and sends the buffer to VxFusion.

An adapter must provide the following:

  • a network header to transmit telegram buffer fields
  • an initialization function
  • a startup routine
  • a send function
  • an input function
  • an I/O control function

Each of these items is described in more detail in this section.


*      
NOTE: In this release, VxFusion supports only one adapter and, thus, only one transport.

10.7.1   Designing the Network Header

The telegram buffer itself is not passed to the remote node; it goes from the local node only as far as the adapter. To move data from the adapter to a remote node, VxFusion uses the adapter's network header, a structure that stores data from certain telegram buffer fields, data that is required to reconstruct the telegram buffer at the remote node. It is the network header and the data pointed to by the telegram buffer that pass from the adapter to the remote node.

Performance and message size are directly related to the size of the network header. Because you design the network header, you can influence performance by specifying the portion of the telegram dedicated to the network header. You trade off throughput for message size.

For example, for a transport with a small transmission unit size, it may be desirable to use a small network header to maximize throughput. However, reducing the number of bits for a field, like the fragment sequence number, also reduces the size of the message that can be sent. On the other hand, for transports having large transmission unit sizes, the network header constitutes a smaller percentage of the overall telegram; the adapter can use a larger fragment sequence number in the header without affecting throughput performance significantly.

The telegram buffer structure, DIST_TBUF, is as follows:

typedef struct  /* DIST_TBUF */ 
    { 
    DIST_TBUF_GEN          tBufGen;    /* TBufGen struct */ 
    void                   *pTBufData; /* pointer to the data */
/* Fields required to construct the telegram buffer on the remote side */ 
    uint16_t               tBufId;     /* ID of the packet */ 
    uint16_t               tBufAck;    /* ID of packet last received and */ 
                                       /* ackowledged without error */ 
    uint16_t               tBufSeq;    /* sequence number of the fragment */ 
    uint16_t               tBufNBytes; /* number of non-network header data 
                                       /* bytes */ 
    uint16_t               tBufType;   /* type of telegram */ 
    uint16_t               tBufFlags;  /* telegrams flags */ 
    } DIST_TBUF;

You must create fields in the adapter-specific network header that correspond to the following DIST_TBUF fields. All but the first two fields, tBufGen and pTBufData, are required to reconstruct the telegram buffer on the remote side:

You may need to create and add fields to the network header depending on the transport. For example, the message priority is an argument to both the distIfXxxSend( ) and distNetInput( ) routines, but not all transports support priority. If the transport supports priority, the priority is transmitted in the protocol header and is available at the remote side. If the transport does not support priority and you want it preserved, then you should add a field to the network header to transmit the priority value to the remote node.

10.7.2   Writing an Initialization Routine

An adapter is initialized by a series of automatic calls to initialization routines. You write some of the code that accomplishes this.

The distInit( ) routine, which initializes the whole of VxFusion, is called automatically when a target is booted with a VxWorks image that has VxFusion installed. The prototype for distInit( ) follows:

STATUS distInit 
    ( 
    DIST_NODE_ID   myNodeId,       /* node ID of this node */ 
    FUNCPTR        ifInitRtn,      /* interface init routine */ 
    void           *pIfInitConf,   /* ptr to interface configuration */ 
    int            maxTBufsLog2,   /* max number of telegram buffers */ 
    int            maxNodesLog2,   /* max number of nodes in node db */ 
    int            maxQueuesLog2,  /* max number of queues on this node */ 
    int            maxGroupsLog2,  /* max number of groups in db */ 
    int            maxNamesLog2,   /* max bindings in name db */ 
    int            waitNTicks      /* wait in ticks when bootstrapping */ 
    ) 
    { 
    }

The argument ifInitRtn specifies the adapter initialization routine, distIfXxxInit( ), where Xxx is the name of the adapter specified by you.


*      
NOTE: Never call distInit( ) or distIfXxxInit( ) directly.

You can base your version of distIfXxxInit( ) on the following prototypical code:

STATUS distIfXxxInit 
    ( 
    void     *pConf,     /* ptr to configuration data, if any */ 
    FUNCPTR  *pStartup   /* ptr to startup routine */ 
    );

The distIfXxxInit( ) should be invoked with the following parameters:

pConf

pStartup

The adapter initialization routine should perform the following operations:

  • set the startup routine pointer to point to the adapter startup routine

  • set the fields of the DIST_IF structure

The DIST_IF structure is one of the mechanisms that provides VxFusion with its transport independence: the structure maintains its form regardless of the adapter being used. The DIST_IF structure is composed of configurable fields that identify the adapter to be used and operating conditions for sending messages. For information about using DIST_IF, see Using the DIST_IF Structure.

Although you cannot call distInit( ) and distIfXxxInit( ) directly, you can modify the VxFusion startup code in usrVxFusion.c to change the VxFusion initialization process.

If you need to specify additional information to initialize the adapter, you can modify the pIfInitConf argument of distInit( ) to provide that information. The pIfInitConf argument is passed as the pConf argument of distIfXxxInit( ). To preserve information pointed to by pConf, you should copy its values into a more permanent structure within the adapter. If the adapter needs no additional configuration information, then pConf should be ignored.

The adapter initialization routine should return OK if initialization is successful, or ERROR if it fails.

Using the DIST_IF Structure

Use the DIST_IF structure to pass information about the adapter to VxFusion so VxFusion can fragment messages into telegrams of appropriate size to be sent out over the transport.

The DIST_IF structure has the following declaration:

typedef struct   /* DIST_IF */ 
    { 
    char         *distIfName;         /* name of the interface */ 
    int          distIfMTU;           /* MTU size of interface's transport */ 
    int          distIfHdrSize;       /* network header size */ 
    DIST_NODE_ID distIfBroadcastAddr; /* broadcast addr for the interface */ 
    short        distIfRngBufSz;      /* # bufs in sliding window protocol */ 
    short        distIfMaxFrags;      /* max frags msg can be broken into */ 
 
    int     (*distIfIoctl) (int fnc, ...); /* adapter IOCTL function */ 
    STATUS  (*distIfSend) (DIST_NODE_ID destId, DIST_TBUF *pTBuf, int prio); 
                                           /* send function of the adapter */ 
    } DIST_IF;

Fields for DIST_IF are defined as follows:

distIfName

distIfMTU

distIfHdrSize

distIfBroadcastAddr

distIfRngBufSz

distIfMaxFrags

maximum size = (number of fragments) x (MTU size - network header size - service header size)

The size of the service header depends on the type of VxFusion message being sent, for example, BOOTSTRAP_REQ or BOOTSTRAP_ACK.

distIfIoctl

distIfSend

10.7.3   Writing a Startup Routine

The adapter startup routine should be returned to distInit( ) by the adapter initialization routine. You can use the following prototype to write a startup routine for adapter interface Xxx:

STATUS distXxxStart 
    ( 
    void *   pConf    /* ptr to configuration data */ 
    );

The startup routine is invoked by distInit( ) after the network layer of VxFusion is initialized. The startup routine should spawn the input task, as well as any initialization or startup that must be done to enable the transmission and reception of telegrams over the desired transport. For example, at this time, a UDP adapter should create the socket that it uses for communications.

The startup routine should return OK if the operation is successful, or ERROR if it fails.

10.7.4   Writing a Send Routine

You can write a send routine, distIfXxxSend( ), for adapter interface Xxx based on the following prototypical code:

STATUS distIfXxxSend 
    ( 
    DIST_NODE_ID   nodeIdDest,  /* destination node */ 
    DIST_TBUF      pTBuf,       /* TBUF to send */ 
    int            priority     /* packet priority */ 
    );

Arguments for disIfXxxSend( ) should be defined as follows:

nodeIdDest

pTBuf

priority

The purpose of the send routine is to take a telegram buffer passed from VxFusion and send the associated telegram out over the transport.

The send routine should perform the following tasks:

  • Increment the statistics (distStat.ifOutReceived++).

  • Locate and fill in the pre-allocated network header with values from the telegram buffer that is passed in as an argument. The network header fields should be filled in using network byte order, so they can be correctly decoded on the remote side, even if the remote node uses a different byte order. (For information about which telegram buffer fields must be copied, see 10.7.1 Designing the Network Header.)

  • Fill in any additional network header fields (such as priority) that may need to be filled in.

  • Send the telegram.

The send routine should return OK if the operation is successful, or ERROR if it fails.

10.7.5   Writing an Input Routine

You can write an input routine, disIfXxxInputTask( ), for adapter interface Xxx based on the following prototypical code:

void distIfXxxInputTask( );

The purpose of the input routine is to read a telegram and send it to VxFusion by calling distNetInput( ). For more information, see the entry for distNetInput( ) in the VxWorks API Reference.

The input routine should listen or wait for an incoming telegram from the transport. Upon receipt of the telegram, the statistics field distStat.ifInReceived should be incremented. Then, the telegram should be tested to make sure that it is longer than the network header. If not, the telegram is too small and should be ignored. The distStat.ifInLength and distStat.ifInDiscarded fields should also be incremented, in this case.

If your transport does not discard broadcast packets sent from itself, use the input routine to filter out transport-originated broadcast packets.

After the input routine has discarded any duplicate or faulty telegram that originates from the transport, the incoming telegram is assumed to be correct (although there is one more check made later). A telegram buffer is allocated and the contents of the telegram are copied into it. The non-data fields of the telegram buffer that were not transmitted are as follows:

  • tBufId
  • tBufAck
  • tBufSeq
  • tBufNBytes
  • tBufType
  • tBufFlags

These fields are reconstructed using the network header. During the reconstruction, these fields should be converted back to host order from network order.

After the telegram buffer is reconstructed and the number of bytes expected in the non-header portion of the telegram are known (from the tBufNBytes field), telegram length is compared to tBufNBytes plus the size of the network header. If the lengths do not match, the telegram should be discarded and the statistics distStat.ifInLength and distStat.ifInDiscarded incremented.

If the lengths match, the telegram should be forwarded upward by calling distNetInput( ).

10.7.6   Writing an I/O Control Routine

You can write an I/O control routine, disIfXxxIoctl( ), for adapter interface Xxx based on the following prototypical code:

int distIfXxxIoctl(int func, );

You determine the control functions that must be supported by the adapter.

This routine should return the return value of the I/O control function being performed if successful, or ERROR if the operation fails. If no control functions are provided, the distIfXxxIoctl( ) routine should return ERROR.