Logo Search packages:      
Sourcecode: taktuk version File versions  Download package

taktuk.c

/******************************************************************************
*                                                                             *
*  TakTuk, a middleware for adaptive large scale parallel remote executions   *
*  deployment. Perl implementation, copyright(C) 2006 Guillaume Huard.        *
*                                                                             *
*  This program is free software; you can redistribute it and/or modify       *
*  it under the terms of the GNU General Public License as published by       *
*  the Free Software Foundation; either version 2 of the License, or          *
*  (at your option) any later version.                                        *
*                                                                             *
*  This program is distributed in the hope that it will be useful,            *
*  but WITHOUT ANY WARRANTY; without even the implied warranty of             *
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
*  GNU General Public License for more details.                               *
*                                                                             *
*  You should have received a copy of the GNU General Public License          *
*  along with this program; if not, write to the Free Software                *
*  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
*                                                                             *
*  Contact: Guillaume.Huard@imag.fr                                           *
*           ENSIMAG - Laboratoire ID                                          *
*           51 avenue Jean Kuntzmann                                          *
*           38330 Montbonnot Saint Martin                                     *
*                                                                             *
******************************************************************************/

#include  "taktuk.h"
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>

/* #define DEBUG */
#ifdef DEBUG
#define debug(f, ...) printf("DEBUG (Line %d) : " f, __LINE__, ##__VA_ARGS__);\
                      fflush(stdout)
#else
#define debug(f, ...)
#endif

static char *taktuk_error_messages[] =
  {
    "write failed",
    "TakTuk engine closed the communication channel",
    "read error",
    "should not occur : arg to missing",
    "should not occur : arg body missing",
    "timeouted",
    "invalid destination set specification",
    "memory allocation failed",
    "invalid buffer size",
    "cannot get connector channel",
    "invalid field for get",
    "mutex error: no memory",
    "mutex error: again",
    "internal: mutex error",
    "internal: get_data",
    "internal: from value",
    "internal: recv message",
    "internal: info value",
    "internal: get message"
  };

static pthread_mutex_t taktuk_mutex[4];
static int has_threads = 0;
static int taktuk_fd[2] = {-1,-1};

/* These sizes are sufficient for 19 decimal digits hosts logical numbers */
#define HEADER_BUFFER 64
#define SMALL_BUFFER 20
#define MSG_LENGTH 1

#define READ_ACCESS 0
#define WRITE_ACCESS 1
#define GET_INFO_ACCESS 2
#define RECV_ACCESS 3

static int insistent_write(int fd, const void *buffer, size_t size)
{
  ssize_t result = 0;
  if (size ==0) return 0;

  do {
    result = write(fd, buffer, size);
    if (result <0) 
    {
      if (errno == EINTR) 
      {
        continue;
      }
      return TAKTUK_ESWRIT;
    }
    size -= result;
    buffer = ((char *) buffer) + result;
  } while (size >0);
  return 0;
}

static int insistent_read(int fd, void *buffer, size_t size)
{
  ssize_t result = 0;
  if (size ==0) return 0;
  do {
    result = read(fd, buffer, size);
    if (result <0) 
    {
      if (errno == EINTR) 
      {
        continue;
      }
      return TAKTUK_ESREAD;
    }
    if (result == 0) /* Premature EOF */
    {
      return TAKTUK_EFCLSD;
    }
    size -= result;
    buffer = ((char *) buffer) + result;
  } while (size >0);

  return 0;  
}

static int get_taktuk_fd(void)
  {
    int taktuk_error = 0;
    if (taktuk_fd[0] == -1)
      {
        char *fd_string;
        fd_string = getenv("TAKTUK_CONTROL_READ");
        if (fd_string != NULL)
            taktuk_fd[0] = atoi(fd_string);
        if (taktuk_fd[0] < 0)
            taktuk_error = TAKTUK_ENOCON;
      }
    if (taktuk_fd[0] != -1)
      {
        char *fd_string;
        fd_string = getenv("TAKTUK_CONTROL_WRITE");
        if (fd_string != NULL)
            taktuk_fd[1] = atoi(fd_string);
        if (taktuk_fd[1] < 0)
            taktuk_error = TAKTUK_ENOCON;
      }
    return taktuk_error;
  }

static int taktuk_lock(int num)
  {
    if (has_threads)
        if (pthread_mutex_lock(taktuk_mutex+num))
            return TAKTUK_EINTMX;
        else
          {
            /* debug("LOCK %d\n", num); */
            return 0;
          }
    else
        return 0;
  }

static int taktuk_unlock(int num)
  {
    if (has_threads)
        if (pthread_mutex_unlock(taktuk_mutex+num))
            return TAKTUK_EINTMX;
        else
          {
            /* debug("UNLOCK %d\n", num); */
            return 0;
          }
    else
        return 0;
  }

typedef struct message_node {
  char code;
  size_t size;
  struct message_node *next, *prev;
  } *message_list;

static struct message_node sentinel = { 0, 0, &sentinel, &sentinel};
static message_list message;
static size_t position;

static message_list message_list_begin(void)
  {
    return sentinel.next;
  }

static message_list message_list_end(void)
  {
    return &sentinel;
  }

static void push_message_in_list(message_list m)
  {
    m->next = &sentinel;
    m->prev = sentinel.prev;
    m->prev->next = m;
    sentinel.prev = m;
  }

static message_list remove_message_from_list(message_list m)
  {
    m->prev->next = m->next;
    m->next->prev = m->prev;
    return m->next;
  }

static void *put_uint32(void *pos, uint32_t value)
  {
    uint32_t to_be_sent;
    char *src, *dst;
    int size = sizeof(uint32_t);

    to_be_sent = htonl(value);
    src = (char *) &to_be_sent;
    dst = (char *) pos;
    while (size--)
        *(dst++) = *(src++);
    return dst;
  }

static void *put_bytes(void *pos, const void *mem, size_t length)
  {
    memcpy(pos, mem, length);
    return ((char *) pos) + length;
  }

static void *get_uint32(const void *pos, uint32_t *value)
  {
    uint32_t received;
    char *src, *dst;
    int size = sizeof(uint32_t);

    src = (char *) pos;
    dst = (char *) &received;
    while (size--)
        *(dst++) = *(src++);
    *value = ntohl(received);
    return src;
  }

static void *get_bytes(const void *pos, void *mem, size_t length)
  {
    memcpy(mem, pos, length);
    return ((char *) pos) + length;
  }

static int wait_message(char *codes, char *code, size_t *size)
  {
    char header_buffer[sizeof(uint32_t)+MSG_LENGTH];
    uint32_t size_sent;
    char *current;
    int result;
    message_list m;

#ifdef DEBUG
    debug("Entering wait_message, waiting for %s\n", codes);
    debug("Messages in list:");
    for (m = message_list_begin(); m != message_list_end(); m = m->next)
      {
        printf(" %c", m->code);
      }
    printf("\n");
#endif

    for (m = message_list_begin(); m != message_list_end(); m = m->next)
      {
        debug("Examining list message %c\n", m->code);
        if (index(codes, m->code) != NULL)
          {
            remove_message_from_list(m);
            message = m;
            *code = m->code;
            *size = m->size;
            position = 0;
            return 0;
          }
      }
    message = NULL;
    debug("No list message matched\n");
    while (1)
      {
        result = insistent_read(taktuk_fd[0],
                                  header_buffer, sizeof(uint32_t)+MSG_LENGTH);
        if (result) return result;

        current = get_uint32(header_buffer, &size_sent);
        get_bytes(current, code, MSG_LENGTH);
        *size = size_sent - MSG_LENGTH;
        debug("Got message %c of size %ld\n", *code, *size);

        if (index(codes, *code) != NULL) return 0;

        m = malloc(sizeof(struct message_node) + *size);
        if (m == NULL) return TAKTUK_EALLOC;
        result = insistent_read(taktuk_fd[0], m+1, *size);
        if (result) return result;
        m->code = *code;
        m->size = *size;
        push_message_in_list(m);
        debug("Pushed in list\n");
      }
  }

static int get_data(void *buffer, size_t size)
  {
    char *data = (char *) (message+1);
    if (message != NULL)
      {
        if (position + size <= message->size)
          {
            memcpy(buffer, data+position, size);
            position += size;
            return 0;
          }
        else
          {
            return TAKTUK_EINTGD;
          }
      }
    else
      {
        return insistent_read(taktuk_fd[0], buffer, size);
      }
  }

static void free_message(void)
  {
    if (message != NULL)
        free(message);
  }

static int send_header(const char *dest, size_t body_length)
  {
    char buffer[HEADER_BUFFER];
    char *allocated_buffer = NULL;
    void *header = buffer;
    void *current;
    uint32_t header_size;
    uint32_t dest_size;
    int result;
    char send_to = TAKTUK_SEND_TO;
    char message = TAKTUK_MESSAGE;
    static char *taktuk_from = NULL;
    static int taktuk_from_size;

    if (taktuk_from == NULL)
      {
        taktuk_from = getenv("TAKTUK_RANK");
        if (taktuk_from != NULL)
            taktuk_from_size = strlen(taktuk_from);
        else
            taktuk_from_size = 0;
      }
    dest_size = strlen(dest);
    /* Do not take into account the first four bytes encoding the total size
     * Unfortunately I have to compute size first to allocate my memory if
     * needed ...
     */
    header_size = sizeof(uint32_t) +
                  MSG_LENGTH +
                  sizeof(uint32_t) + dest_size +
                  MSG_LENGTH +
                  sizeof(uint32_t) + taktuk_from_size;

    if (header_size > HEADER_BUFFER)
      {
        allocated_buffer = (char *) malloc(header_size);
        if (allocated_buffer == NULL)
          {
            return TAKTUK_EALLOC;
          }
        header = allocated_buffer;
      } 

    current = header;
    /* The first four bytes of the header encode its own size not including
     * the four bytes themselves
     */
    current = put_uint32(current, header_size-sizeof(uint32_t) + body_length);
    current = put_bytes(current, &send_to, MSG_LENGTH);
    current = put_uint32(current, dest_size);
    current = put_bytes(current, dest, dest_size);
    current = put_bytes(current, &message, MSG_LENGTH);
    current = put_uint32(current, taktuk_from_size);
    current = put_bytes(current, taktuk_from, taktuk_from_size);

    debug("Sending message: header_size %d, body_length %ld\n", header_size,
          body_length);
    if ((result = get_taktuk_fd())) return result;
    result = insistent_write(taktuk_fd[1], header, header_size);
    if (allocated_buffer != NULL)
        free(allocated_buffer);
    return result;
  }

static void purge_data(size_t msg_length, void *buffer, size_t length)
  {
    int result = 0;

    debug("Entering purge_data\n");
    while (msg_length && (result == 0))
      {
        if (msg_length < length)
            length = msg_length;
        result = get_data(buffer, length);
        msg_length -= length;
      }
    /* No test for return_values as I already returns an error in this case */
    taktuk_unlock(READ_ACCESS);
    taktuk_unlock(RECV_ACCESS);
  }

/* Implementation of public functions */

const char *taktuk_error_msg(int msg_code)
  {
    msg_code--;
    if ((msg_code < 0) || (msg_code >= TAKTUK_EMAXCD))
        return "Unknown error";
    else
        return taktuk_error_messages[msg_code];
  }

int taktuk_init_threads(void)
  {
    int result=0, i;
    has_threads = 1;
    result = get_taktuk_fd();
    for (i=0; i<4 && !result; i++)
      {
        result = pthread_mutex_init(taktuk_mutex+i, NULL);
        if (result)
           switch (result)
             {
             case ENOMEM: return TAKTUK_EMTXNM;
             case EAGAIN: return TAKTUK_EMTXAG;
             default: return TAKTUK_EINTMX;
             }
      }
    debug("Threads initialized\n");
    return result;
  }

int taktuk_leave_threads(void)
  {
    int result=0, i;
    for (i=0; i<4 && !result; i++)
      {
        result = pthread_mutex_destroy(taktuk_mutex+i);
        if (result)
            return TAKTUK_EINTMX;
      }
    has_threads = 0;
    debug("Threads terminated");
    return result;
  }

int taktuk_get(char *field, long *value)
  {
    char buffer[SMALL_BUFFER];
    char *current;
    char get_info = TAKTUK_GET_INFO;
    int result;
    size_t size;
    size_t length, buffer_size;
    char code;
    char messages_expected[] = { TAKTUK_INFO, TAKTUK_INVALID, '\0' };

    if ((result = get_taktuk_fd())) return result;

    current = buffer;
    current = put_uint32(current, 0);
    current = put_bytes(current, &get_info, MSG_LENGTH);
    buffer_size = MSG_LENGTH;
    /* WE ASSUME THAT SMALL_BUFFER IS LARGE ENOUGH */
    length = strlen(field);
    memcpy(current, field, length);
    buffer_size += length;
    put_uint32(buffer, buffer_size);
    buffer_size += sizeof(uint32_t);

    if (taktuk_lock(GET_INFO_ACCESS)) return TAKTUK_EINTMX;
    if (taktuk_lock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    result = insistent_write(taktuk_fd[1], buffer, buffer_size);
    if (taktuk_unlock(WRITE_ACCESS)) return result?result:TAKTUK_EINTMX;

    if (result == 0)
      {
        if (taktuk_lock(READ_ACCESS)) return TAKTUK_EINTMX;
        if((result = wait_message(messages_expected, &code, &size)) == 0)
          {
            switch (code)
              {
              case TAKTUK_INFO:
                if ((result = get_data(buffer, size)) == 0)
                  {
                    buffer[size] = '\0';
                    *value = strtol(buffer, &current, 10);
                    if (*current != '\0')
                      {
                        result = TAKTUK_EINTIV;
                      }
                  }
                break;
              case TAKTUK_INVALID:
                result = TAKTUK_EINVAL;
                break;
              default:
                result = TAKTUK_EINTGM;
              } 
            debug("Got info %ld for %s\n", *value, field);
            free_message();
          }
        if (taktuk_unlock(READ_ACCESS)) return result?result:TAKTUK_EINTMX;
      }
    if (taktuk_unlock(GET_INFO_ACCESS)) return result?result:TAKTUK_EINTMX;
    return result;
  }

int taktuk_multi_send(const char *dest, const void *buffer, size_t length)
  {
    int result;

    if (taktuk_lock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    result = send_header(dest, length);
    if (result == 0)
      {
        result = insistent_write(taktuk_fd[1], buffer, length);
      }
    if (taktuk_unlock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    return result;
  }

int taktuk_multi_sendv(const char *dest, const struct iovec *iov, int iovcnt)
  {
    int i;
    int result;
    size_t length =0;

    for (i = 0; i < iovcnt; ++i)
        length += iov[i].iov_len;

    if (taktuk_lock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    result = send_header(dest, length);
    if (result == 0)
      { 
        for (i =0; i<iovcnt && !result; ++i)
          {
            result = insistent_write(taktuk_fd[1], iov[i].iov_base,
                                                               iov[i].iov_len);
          }
      }
    if (taktuk_unlock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    return result;
  }

int taktuk_send(long dest, const void *buffer, size_t length)
  {
    char name[SMALL_BUFFER];
    sprintf(name,"%ld",dest);
    return taktuk_multi_send(name, buffer, length);
  }

int taktuk_sendv(long dest, const struct iovec *iov, int iovcnt)
  {
    char name[SMALL_BUFFER];
    sprintf(name,"%ld",dest);
    return taktuk_multi_sendv(name, iov, iovcnt);
  }

int taktuk_recv(long *from, void *buffer, size_t *length, int timeout)
  {
    int result;
    size_t msg_length;
    
    result = taktuk_wait_message(from, &msg_length, timeout);
    if (result == 0)
      {
        if (length != NULL)
            *length = msg_length;
        result = taktuk_read(buffer, msg_length);
      }
    return result;
  }

int taktuk_recvv(long *from, const struct iovec *iov, int iovcnt, int timeout)
  {
    int result;
    size_t msg_length;
    char buffer[SMALL_BUFFER];

    result = taktuk_wait_message(from, &msg_length, timeout);
    if (result == 0)
      {
        size_t length =0;
        int i;

        for (i = 0; i < iovcnt; ++i)
            length += iov[i].iov_len;

        if (msg_length != length)
          {
            purge_data(msg_length, buffer, SMALL_BUFFER);
            result = TAKTUK_EIBUFF;
          }
        else
          {
            result = taktuk_readv(iov, iovcnt);
          }
      }
    return result;
  }

int taktuk_wait_message(long *from, size_t *size, int timeout)
  {
    char buffer[SMALL_BUFFER];
    char *current;
    char wait = TAKTUK_WAIT_MESSAGE;
    int buffer_size;
    int result = 0;
    char code;
    char messages_expected[] = { TAKTUK_MESSAGE, TAKTUK_TIMEOUT, '\0' };
    uint32_t from_size;
    long long_result;

    current = buffer;
    current = put_uint32(current, 0);
    current = put_bytes(current, &wait, MSG_LENGTH);
    buffer_size = MSG_LENGTH;
    if (timeout > 0)
      {
        /* WE ASSUME THAT SMALL_BUFFER IS LARGE ENOUGH
         */
        sprintf(current, "%d", timeout);
        buffer_size += strlen(current);
      }
    put_uint32(buffer, buffer_size);
    buffer_size += sizeof(uint32_t);

    /* Note on locks :
       - RECV/READ ACCESSES are locked in this function
       - they are unlocked either :
         -> on error in this function (if return_value == -1)
         -> upon call to taktuk read/readv
         -> upon call to purge_data
       - these are the only three cases in recv/recvv
    */
    if (taktuk_lock(RECV_ACCESS)) return TAKTUK_EINTMX;
    if (taktuk_lock(WRITE_ACCESS)) return TAKTUK_EINTMX;
    result = get_taktuk_fd();
    if (result == 0)
        result = insistent_write(taktuk_fd[1], buffer, buffer_size);
    if (taktuk_unlock(WRITE_ACCESS) && !result) return TAKTUK_EINTMX;

    if (result == 0)
      {
        if (taktuk_lock(READ_ACCESS)) return TAKTUK_EINTMX;
        if ((result = wait_message(messages_expected, &code, size)) == 0)
          {
            switch (code)
              {
              case TAKTUK_MESSAGE:
                if ((result = get_data(buffer, sizeof(uint32_t))) == 0)
                  {
                    *size -= sizeof(uint32_t);
                    get_uint32(buffer, &from_size);
                    if ((result = get_data(buffer, from_size)) == 0)
                      {
                        *size -= from_size;
                        buffer[from_size] = '\0';
                        long_result = strtol(buffer, &current, 10);
                        if (from != NULL)
                            *from = long_result;
                        if (*current != '\0')
                            result = TAKTUK_EINTFV;
                        else
                            debug("Got message %c from %ld (size remaining "
                                  "%ld)\n", code, long_result, *size);
                      }
                  }
                break;
              case TAKTUK_TIMEOUT:
                free_message();
                result = TAKTUK_ETMOUT;
                break;
              default:
                free_message();
                result = TAKTUK_EINTRM;
              } 
          }
        if (result)
            taktuk_unlock(READ_ACCESS);
      }
    if (result)
        taktuk_unlock(RECV_ACCESS);
    debug("End of taktuk_wait_message, returning %d\n", result);
    return result;
  }

int taktuk_read(void* buffer, size_t length)
  {
    int result;
    debug("Entering taktuk_read\n");
    result = get_data(buffer, length);
    free_message();
    if (taktuk_unlock(READ_ACCESS)) return result?result:TAKTUK_EINTMX;
    if (taktuk_unlock(RECV_ACCESS)) return result?result:TAKTUK_EINTMX;
    return result;
  }

int taktuk_readv(const struct iovec *iov, int iovcnt)
  {
    int result = 0;
    int i;

    debug("Entering taktuk_readv\n");
    for (i=0; (i<iovcnt) && !result; ++i)
      {
        result = get_data(iov[i].iov_base, iov[i].iov_len);
      }
    free_message();
    if (taktuk_unlock(READ_ACCESS)) return result?result:TAKTUK_EINTMX;
    if (taktuk_unlock(RECV_ACCESS)) return result?result:TAKTUK_EINTMX;
    return result;
  }

Generated by  Doxygen 1.6.0   Back to index