Logo Search packages:      
Sourcecode: taktuk version File versions

communication5.c

/******************************************************************************
*                                                                             *
*  TakTuk, a middleware for adaptive large scale parallel remote executions   *
*  deployment. Perl implementation, copyright(C) 2006 Guillaume Huard.        *
*                                                                             *
*  This program is free software; you can redistribute it and/or modify       *
*  it under the terms of the GNU General Public License as published by       *
*  the Free Software Foundation; either version 2 of the License, or          *
*  (at your option) any later version.                                        *
*                                                                             *
*  This program is distributed in the hope that it will be useful,            *
*  but WITHOUT ANY WARRANTY; without even the implied warranty of             *
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
*  GNU General Public License for more details.                               *
*                                                                             *
*  You should have received a copy of the GNU General Public License          *
*  along with this program; if not, write to the Free Software                *
*  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
*                                                                             *
*  Contact: Guillaume.Huard@imag.fr                                           *
*           ENSIMAG - Laboratoire ID                                          *
*           51 avenue Jean Kuntzmann                                          *
*           38330 Montbonnot Saint Martin                                     *
*                                                                             *
******************************************************************************/

#include <taktuk.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

/* THIS PROGRAM DOES LONG BLOCKING */
/* actually the two processes initiate a dispatcher that receives messages but
 * only the process of rank 1 makes send calls. So you have to wait all recv
 * timeouts in process of rank 2 */

#define NUMBER_MSG 10
#define THREADS

int get_numeric_variable(char *name)
  {
    char *value_string;

    if ((value_string = getenv(name)) != NULL)
      {
        return atoi(value_string);
      }
    else
        return -1;
  }

int rank;

void* dispatcher(void* arg)
{
  printf("Dispatcher started\n");
  fflush(stdout);
  //char from[32];
  char first_part[128];
  char second_part[128];
  struct iovec vecteur[2];
  int num = NUMBER_MSG;
  vecteur[0].iov_base = first_part;
  vecteur[0].iov_len = 128;
  vecteur[1].iov_base = second_part;
  vecteur[1].iov_len = 128;
  while (num--)
  {
    int result = taktuk_recvv(0, vecteur, 2, 5);
    if (result < 0)
      {
        printf("Error %d in recv %d : %s\n", taktuk_error, rank,
               taktuk_error_msg(taktuk_error));
        fflush(stdout);
      }
    else
      {
        printf("Received [%s] a [%s] from 1\n",
               (char *) vecteur[0].iov_base,
               (char *) vecteur[1].iov_base);
        fflush(stdout);
      }
  }
  return NULL;
}


int main()
  {
    char first_part[128];
    char second_part[128];
    struct iovec vecteur[2];
    //char from[32];
    char to[32];
    int count, result, next;
    
#ifdef THREADS
      pthread_t tid;
      int err = pthread_create( &tid, 0, &dispatcher, 0 );
      if (err !=0) {
          fprintf(stderr,"Error :%i ", errno );
          return 1;
      }
#endif
    vecteur[0].iov_base = first_part;
    vecteur[0].iov_len = 128;
    vecteur[1].iov_base = second_part;
    vecteur[1].iov_len = 128;

    if (((rank = get_numeric_variable("TAKTUK_RANK")) == -1) ||
        ((count = get_numeric_variable("TAKTUK_COUNT")) == -1))
      {
        fprintf(stderr,
            "TAKTUK_RANK or TAKTUK_COUNT not defined, do you use TakTuk ?\n");
        return 1;
      }

    printf("I'm process %d among %d\n", rank, count);
    fflush(stdout);

    // Receive for all except 1
    if (rank == 1)
    {
      strcpy(first_part, "Salut a toi ");
      strcpy(second_part, "de 1\n");

      // Send for all
      next = (rank%count)+1;
      sprintf(to, "%d", next);
      int i;
      for (i=0; i<NUMBER_MSG; ++i)
      {
        result = taktuk_sendv(to, vecteur, 2);
        printf("I'm process %d among %d message sent\n", rank, count);
        fflush(stdout);
        if (result < 0)
          {
            printf("Error in send %d : %s\n", rank, taktuk_error_msg(taktuk_error));
          }
      }
    }
    else {
#ifndef THREADS
      dispatcher(0);
#endif
    
    }
    
#ifdef THREADS
    pthread_join(tid, NULL);
#endif
    return 0;
  }

Generated by  Doxygen 1.6.0   Back to index