Приглашаем посетить
Чернышевский (chernyshevskiy.lit-info.ru)

An Example: The Spread Client Wrapper

Previous
Table of Contents
Next

An Example: The Spread Client Wrapper

You now have all the tools you need to build a procedural interface PHP extension in C. To tie all these parts together, a full example is called for.

Chapter 15, "Building a Distributed Environment," shows an implementation of a distributed cache management system that uses Spread. Spread is a group communication toolkit that allows members to join a set of named groups and receive messages for those groups by using certain semantics (for example, that every member in the group will receive all messages in the same order as every other member). These strong rules provide an excellent mechanism for tackling distributed tasks, such as building multireader distributed logging systems, mastermaster database replication, or, as in the case just shown, reliable messaging systems between multiple participants.

The Spread library presents a very simple C API, so it is an ideal example for writing a PHP extension around. The following parts of the C API are covered here:

int    SP_connect( const char *spread_name, const char *private_name,
                   int priority, int group_membership, mailbox *mbox,
                   char *private_group );
int    SP_disconnect( mailbox mbox );
int    SP_join( mailbox mbox, const char *group );
int    SP_multicast( mailbox mbox, service service_type,
                     const char *group,
                     int16 mess_type, int mess_len, const char *mess );
int    SP_multigroup_multicast( mailbox mbox, service service_type,
                                     int num_groups,
                                     const char groups[][MAX_GROUP_NAME],
                                     int16 mess_type,
                                     const scatter *mess );
int    SP_receive( mailbox mbox, service *service_type,
                   char sender[MAX_GROUP_NAME], int max_groups,
                   int *num_groups, char groups[][MAX_GROUP_NAME],
                   int16 *mess_type, int *endian_mismatch,
                   int max_mess_len, char *mess );

These functions provide the following:

  1. Connecting to a spread daemon.

  2. Disconnecting from a spread daemon.

  3. Joining a group to listen on.

  4. Sending a message to a single group.

  5. Sending a message to multiple groups.

  6. Receiving messages to a group you belong to.

The strategy is to supply a PHP-level function for each of these C functions, except for SP_multicast() and SP_multigroup_multicast(), which PHP's weak typing makes ideal to combine into a single function. Connections to spread will be handled via a resource.

To start the PHP class, you generate a standard skeleton file using this:

ext_skel --extname=spread

The first step you need to take is to handle the resource management for the script. To do this, you need to create a static list identifier, le_pconn, and a destructor, close_spread_pconn(), which when handed a Spread connection resource will extract the spread connection inside and disconnect from it. Here's how this looks:

static int le_pconn;
static void _close_spread_pconn(zend_rsrc_list_entry *rsrc)
{
  mailbox *mbox = (int *)rsrc->ptr;
  if(mbox) {
    SP_disconnect(*mbox);
    free(mbox);
  }
}

mailbox is a type defined in the spread header files that is basically a connection identifier.

MINIT

During module initialization, you need to initialize the resource list le_pconn and declare constants. You are only interested in persistent connections, so you need to register only a persistent resource destructor, like this:

PHP_MINIT_FUNCTION(spread)
{
   le_pconn =
     zend_register_list_destructors_ex(NULL, _close_spread_pconn, "spread",
                                       module_number);
   REGISTER_LONG_CONSTANT("SP_LOW_PRIORITY", LOW_PRIORITY,
                      CONST_CS|CONST_PERSISTENT);
   REGISTER_LONG_CONSTANT("SP_MEDIUM_PRIORITY", MEDIUM_PRIORITY,
                      CONST_CS|CONST_PERSISTENT);
   REGISTER_LONG_CONSTANT("SP_HIGH_PRIORITY", HIGH_PRIORITY,
                     CONST_CS|CONST_PERSISTENT);

   REGISTER_LONG_CONSTANT("SP_UNRELIABLE_MESS", UNRELIABLE_MESS,
                      CONST_CS|CONST_PERSISTENT);
   REGISTER_LONG_CONSTANT("SP_RELIABLE_MESS", RELIABLE_MESS,
                      CONST_CS|CONST_PERSISTENT);
   /* ... more constants ... */
   return SUCCESS;
}

Note

The resource you are connecting to dictate whether you want persistent connections or not. In the case of Spread, a client connection causes a group event that must be propagated across all the Spread nodes. This is moderately expensive, so it makes sense to prefer persistent connections.

MySQL, on the other hand, uses an extremely lightweight protocol in which connection establishment has a very low cost. In MySQL it makes sense to always use nonpersistent connections.

Of course, nothing stops you as the extension author from providing both persistent and nonpersistent resources side-by-side if you choose.


MSHUTDOWN

The only resource you need in order to maintain this extension is the persistent resource list, which effectively manages itself. Thus, you don't need to define an MSHUTDOWN hook at all.

Module Functions

To facilitate connecting to Spread, you need to write a helper function, connect(), that should take a spread daemon name (which is either a TCP address, such as 10.0.0.1:NNNN, or a Unix domain socket, such as /tmp/NNNN) and a string, which is the private name (a name that is globally unique) of the connection. It should then either return an existing connection (from the persistent connection list indicated by le_pconn) or, if that is unsuccessful, create one.

connect(), shown here, is forced to handle all the messiness of interacting with resources:

int connect(char *spread_name, char *private_name)
{

  mailbox *mbox;
  char private_group[MAX_GROUP_NAME];
  char *hashed_details;
  int hashed_details_length;
  int rsrc_id;
  list_entry *le;

  hashed_details_length = sizeof("spread_ _") + strlen(spread_name) +
    strlen(private_name);
  hashed_details = (char *) emalloc(hashed_details_length);
  sprintf(hashed_details, "spread_%s_%s", spread_name, private_name);

  /* look up spread connection in persistent_list */
  if (zend_hash_find(&EG(persistent_list), hashed_details,
                     hashed_details_length, (void **) &le) == FAILURE) {
    list_entry new_le;
    int retval;
    mbox = (mailbox *) malloc(sizeof(int));
    if ((retval = SP_connect(spread_name, private_name,
                           0, 0, mbox, private_group)) != ACCEPT_SESSION)
    {
      zend_error(E_WARNING,
               "Failed to connect to spread daemon %s, error returned was: %d",
               spread_name, retval);
      efree(hashed_details);
      return 0;
    }
    new_le.type = le_pconn;
    new_le.ptr = mbox;
    if (zend_hash_update(&EG(persistent_list), hashed_details,
      hashed_details_length, (void *) &new_le, sizeof(list_entry),
      NULL) == FAILURE)
    {
      SP_disconnect(*mbox);
      free(mbox);
      efree(hashed_details);
      return 0;
    }
  }
  else { /* we have a pre-existing connection */
    if (le->type != le_pconn) {
      // return badly
    free(mbox);
     efree(hashed_details);
    return 0;
  }
    mbox = (mailbox *)le->ptr;
  }
  rsrc_id = ZEND_REGISTER_RESOURCE(NULL, mbox, le_pconn);
  zend_list_addref(rsrc_id);
  efree(hashed_details);
  return rsrc_id;
}

Now you need to put these functions to work. The first function you need is the spread_connect() function to model SP_connect(). spread_connect() is a simple wrapper around connect(). It takes a spread daemon name and an optional private name. If a private name is not specified, a private name based on the process ID of the executing process is created and used. Here is the code for spread_connect():

PHP_FUNCTION(spread_connect)
{
  char *spread_name = NULL;
  char *private_name = NULL;
  char *tmp = NULL;
  int spread_name_len;
  int private_name_len;
  int rsrc_id;

  if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|s",
                &spread_name, &spread_name_len,
                &private_name, &private_name_len) == FAILURE) {
    return;
  }
  if(!private_name) {
    tmp = (char *) emalloc(10);
    snprintf(tmp, MAX_PRIVATE_NAME,"php-%05d", getpid());
    private_name = tmp;
  }
  rsrc_id = connect(spread_name, private_name);
  if(tmp) {
    efree(tmp);
  }
  RETURN_RESOURCE(rsrc_id);
}

Now that you can make a connection, you also need to be able to disconnect. You can bootstrap the spread_disconnect() function off the resource destructor infrastructure to make its implementation extremely simple. Instead of having to actually fetch the Spread connection's mailbox from the resource and close it using SP_disconnect(), you can simply delete the resource from the resource list. This invokes the registered destructor for the resource, which itself calls SP_disconnect(). Here is the code for spread_disconnect():

PHP_FUNCTION(spread_disconnect) {
  zval **spread_conn;
  mailbox *mbox;
  int id = -1;
  if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC,
                           "r", &spread_conn) == FAILURE) {
    return;
  }
  zend_list_delete(Z_RESVAL_PP(spread_conn));
  RETURN_TRUE;
}

As a Spread client, you need to belong to a group to be able to receive messages for the group. Creating a group is as simple as joining it with SP_join(); if it is nonexistent, it will be implicitly created. The spread_join() function will affect this, with one minor twist: You want to able to join multiple groups by passing an array. To accomplish this, you can accept the second parameter as a raw zval and switch on its type in the code. If you are passed an array, you will iterate through it and join each group; otherwise, you will convert the scalar to a string and attempt to join that. Notice that because you are doing conversion on the zval, you need to separate it by using SEPARATE_ZVAL(). Here is the code for the spread_join function:

PHP_FUNCTION(spread_join) {
  zval **group, **mbox_zval;
  int *mbox, sperrno;
  if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rz",
                            mbox_zval, group) == FAILURE) {
    return;
  }
  ZEND_FETCH_RESOURCE(mbox, int *, mbox_zval, -1,
                       "Spread-FD", le_conn);
  SEPARATE_ZVAL(group);
  if(Z_TYPE_PP(group) == IS_ARRAY) {
    char groupnames[100][MAX_GROUP_NAME];
    zval *tmparr, **tmp;
    int n = 0;
    int error = 0;
    zend_hash_internal_pointer_reset(Z_ARRVAL_PP(group));
    while(zend_hash_get_current_data(Z_ARRVAL_PP(group), (void **) &tmp)
           == SUCCESS && n < 100) {
      convert_to_string_ex(tmp);
      if( (sperrno = SP_join(*mbox, Z_STRVAL_PP(tmp)) < 0) {
       zend_error(E_WARNING, "SP_join error(%d)", sperrno);
       error = sperrno;
     }
     n++;
     zend_hash_move_forward(Z_ARRVAL_PP(group));
    }
    if (error) {
      RETURN_LONG(error);
    }
  }
  else {
    convert_to_string_ex(group);
    if( (sperrno = SP_join(*mbox, Z_STRVAL_PP(group))) < 0) {
      zend_error(E_WARNING, "SP_join error(%d)", sperrno);
      RETURN_LONG(sperrno);
    }
  }
  RETURN_LONG(0);
}

To receive data in Spread, you simply call SP_receive() on the Spread mailbox. When SP_receive() returns, it contains not only a message but metadata on who sent the message (the sender's private name), the groups it was sent to, and the type of message. The spread_receive() function should return the following as an associative array:

array( message      => 'Message',
       groups       => array( 'groupA', 'groupB'),
       message_type => RELIABLE_MESS,
       sender       => 'spread_12345');

spread_receive() is pretty straightforward. Note the looping you need to do in SP_receive() to handle BUFFER_TOO_SHORT errors and note the assemblage of return_value:

PHP_FUNCTION(spread_receive) {
  zval **mbox_zval, *groups_zval;
  int *mbox;
  int sperrno;
  int i, endmis, ret, ngrps, msize;
  int16 mtype;
  service stype;
  static int oldmsize = 0;
  static int oldgsize = 0;
  static int newmsize = (1<<15);
  static int newgsize = (1<<6);
  static char* groups=NULL;
  static char* mess=NULL;
  char sender[MAX_GROUP_NAME];
  if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r",
                           mbox_zval) == FAILURE) {
    return;
  }
  ZEND_FETCH_RESOURCE(mbox, int *, mbox_zval, NULL, "Spread-FD", le_pconn);
  try_again: {
    if(oldgsize != newgsize) {
      if(groups) {
        groups = (char*) erealloc(groups, newgsize*MAX_GROUP_NAME);
     } else {
       groups = (char*) emalloc(newgsize*MAX_GROUP_NAME);
     }
     oldgsize=newgsize;
    }
    if(oldmsize != newmsize) {
      if(mess) {
        mess = (char *) erealloc(mess, newmsize);
      } else {
        mess = (char *) emalloc(newmsize);
     }
     oldmsize = newmsize;
    }
    if((ret=SP_receive(*mbox, &stype, sender, newgsize, &ngrps, groups,
                       &mtype, &endmis, newmsize, mess))<0) {
      if(ret==BUFFER_TOO_SHORT) {
        newmsize=-endmis;
        newmsize++;
        msize = oldmsize;
        goto try_again;
     }
    }
    msize = oldmsize;
  }
  /* spread does not null terminate these, so we should */
  mess[msize + 1] = '\0';
  /* we've got the answer; let's wind up our response */
  array_init(return_value);
  add_assoc_stringl(return_value, "message", mess, msize, 1);
  MAKE_STD_ZVAL(groups_zval);
  array_init(groups_zval);
  for(i = 0; i < ngrps; i++) {
    add_index_stringl(groups_zval, i, &groups[i*MAX_GROUP_NAME],
                      strlen(&groups[i*MAX_GROUP_NAME]), 1);
  }
  add_assoc_zval(return_value, "groups", groups_zval);
  add_assoc_long(return_value, "message_type", mtype);
  add_assoc_stringl(return_value, "sender", sender, strlen(sender), 1);
  return;
}

Finally, you need to handle sending messages. As noted earlier, Spread actually has two functions for this: SP_multicast(), which allows for sending messages to a single group, and SP_multigroup_multicast(), which sends to multiple groups. The latter cannot be implemented in terms of the former because it would break the ordering semantics of the message (because it would be possible for another client to interject a message in between the transmission to the two groups). Here is the code for spread_multicast(): PHP_FUNCTION(spread_multicast) {

  zval **group = NULL;
  zval **mbox_zval = NULL;
  char *message;
  int *mbox, service_type, mess_type, sperrno, message_length;
  if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC4, "rlzls",
                mbox_zval, service_type, group,
               mess_type, &message, &message_length) == FAILURE)
  {
    return;
  }
  SEPARATE_ZVAL(group)
  ZEND_FETCH_RESOURCE(mbox, int *, mbox_zval, -1, "Spread-FD", le_conn);
  if(Z_TYPE_PP(group) == IS_ARRAY) {
    char groupnames[100][MAX_GROUP_NAME];
    zval *tmparr, **tmp;
    int n = 0;

    zend_hash_internal_pointer_reset(Z_ARRVAL_PP(group));
    while(zend_hash_get_current_data(Z_ARRVAL_PP(group), (void **) &tmp)
        == SUCCESS && n < 100) {
      convert_to_string_ex(tmp);
      memcpy(groupnames[n], Z_STRVAL_PP(tmp), MAX_GROUP_NAME);
      n++;
      zend_hash_move_forward (Z_ARRVAL_PP(group));
    }
    if((sperrno = SP_multigroup_multicast(*mbox, service_type,
        n, (const char (*)[MAX_GROUP_NAME]) groupnames, mess_type,
        message_length, message)) <0)
    {
      zend_error(E_WARNING, "SP_multicast error(%d)", sperrno);
      RETURN_FALSE;
    }
  }
  else {
    convert_to_string_ex(group);
    if (sperrno = (SP_multicast(*mbox, service_type,
            Z_STRVAL_PP(group), mess_type,
            message_length, message)) <0)
    {
       zend_error(E_WARNING, "SP_mulicast error(%d)", sperrno);
       RETURN_FALSE;
    }
  }
  RETURN_TRUE;
}

Note

It's worth noting that as a Spread client, you do not need to join groups to send messagesonly to receive them. When you join a group, Spread needs to buffer all the messages you have not yet received, so if you do not need to incur this work, you should not.


Now all you need to do is finish registering the functions, and then you are all set. First you define the function table:

function_entry spread_functions[] = {
  PHP_FE(spread_connect, NULL)
  PHP_FE(spread_multicast, NULL)
  PHP_FE(spread_disconnect, NULL)
  PHP_FE(spread_join, NULL)
  PHP_FE(spread_receive, NULL)
  {NULL, NULL, NULL}
};

Then you register the module:

zend_module_entry spread_module_entry = {
  STANDARD_MODULE_HEADER,
  "spread",
  spread_functions,
  PHP_MINIT(spread),
  NULL,
  NULL,
  NULL,
  PHP_MINFO(spread),
  "1.0",
  STANDARD_MODULE_PROPERTIES
};
#ifdef COMPILE_DL_SPREAD
ZEND_GET_MODULE(spread)
#endif

Using the Spread Module

After compiling and installing the Spread module by following the steps outlined at the beginning of the chapter, you are ready to use it. Here is a logging class that allows you to send arbitrary message to a spread group:

<?php
if(!extension_loaded("spread")) {
  dl("spread.so");
}
class Spread_Logger {
  public  $daemon;
  public  $group;
  private $conn;

  public function _ _construct($daemon, $group)
  {
    $this->daemon = $daemon;
    $this->group = $group;
    $this->conn = spread_connect($daemon);
  }

  public function send($message) {
    return spread_multicast($this->conn, 0, $this->group,
                         SP_REGULAR_MESS, $message);
  }
}
?>

The Spread_Logger class connects to Spread in its constructor, and send() wraps spread_multicast(). Here is a sample usage of the class, which connects to a local spread daemon and sends a test message to the test group:

<?php

$spread = new Spread_Logger("127.0.0.1:4803", "test");
$spread->send("This is a test message.");

?>


Previous
Table of Contents
Next