Logo Search packages:      
Sourcecode: taktuk version File versions  Download package

communication4.c

/******************************************************************************
*                                                                             *
*  TakTuk, a middleware for adaptive large scale parallel remote executions   *
*  deployment. Perl implementation, copyright(C) 2006 Guillaume Huard.        *
*                                                                             *
*  This program is free software; you can redistribute it and/or modify       *
*  it under the terms of the GNU General Public License as published by       *
*  the Free Software Foundation; either version 2 of the License, or          *
*  (at your option) any later version.                                        *
*                                                                             *
*  This program is distributed in the hope that it will be useful,            *
*  but WITHOUT ANY WARRANTY; without even the implied warranty of             *
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
*  GNU General Public License for more details.                               *
*                                                                             *
*  You should have received a copy of the GNU General Public License          *
*  along with this program; if not, write to the Free Software                *
*  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
*                                                                             *
*  Contact: Guillaume.Huard@imag.fr                                           *
*           ENSIMAG - Laboratoire d'Informatique de Grenoble                  *
*           51 avenue Jean Kuntzmann                                          *
*           38330 Montbonnot Saint Martin                                     *
*                                                                             *
******************************************************************************/

#include <taktuk.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>

struct sender_parameters {
  struct iovec vecteur[2];
  int iovcnt;
  int iter;
  unsigned long rank;
  unsigned long count;
  unsigned long dest;
} sender_parameters;

struct receiver_parameters {
  int iter;
  unsigned long rank;
  unsigned long count;
  struct timeval timeout;
} receiver_parameters;

void *sender(void *args)
  {
    struct sender_parameters *p = (struct sender_parameters *) args;
    int result;
    struct iovec *vecteur;
    int iovcnt, i;
    char tid[32];

    sprintf(tid,"(thread %ld)",(long int) pthread_self());
    iovcnt = p->iovcnt+1;
    vecteur = malloc(sizeof(struct iovec)*iovcnt);
    assert(vecteur != NULL);
    for (i=0; i<p->iovcnt; i++)
      {
        vecteur[i] = p->vecteur[i];
      }
    vecteur[p->iovcnt].iov_base = tid;
    vecteur[p->iovcnt].iov_len = 32;

    srand((int) pthread_self());
    for (i=0; i<p->iter; i++)
      {
        sleep(rand()%3);
        result = taktuk_sendv(p->dest, TAKTUK_TARGET_ANY, vecteur, iovcnt);
        if (result)
          {
            printf("Error %d in send %lu (thread %ld): %s\n", result,
                p->rank, (long int) pthread_self(), taktuk_error_msg(result));
          }
      }
    pthread_exit(0);
  }

void *receiver(void *args)
  {
    struct receiver_parameters *p = (struct receiver_parameters *) args;
    char parts[3][128];
    struct iovec vecteur[3];
    int i;
    unsigned long from, rank;
    int result;

    for (i=0; i<3; i++)
      {
        vecteur[i].iov_base = parts[i];
        vecteur[i].iov_len = 128;
      }
    vecteur[2].iov_len = 32;

    for (i=0; i<p->iter; i++)
      {
        result = taktuk_get("rank", &rank);
        if (result)
          {
            fprintf(stderr, "Invalid rank: %s, do you use TakTuk ?\n",
                    taktuk_error_msg(result));
            pthread_exit((void *)1);
          }
        result = taktuk_recvv(&from, vecteur, 3, &p->timeout);
        if (result)
          {
            printf("Error %d in recv %lu (thread %ld): %s\n", result,
                   rank, (long int) pthread_self(), taktuk_error_msg(result));
          }
        else
          {
            printf("Received [%s], [%s] and [%s] from %lu (thread %ld)\n",
                   (char *) vecteur[0].iov_base,
                   (char *) vecteur[1].iov_base,
                   (char *) vecteur[2].iov_base,
                   from, (long int) pthread_self());
          }
      }
    pthread_exit(0);
  }

int main(int argc, char *argv[])
  {
    char first_part[128];
    char second_part[128];
    unsigned long rank, count;
    int nb_threads, i, pos;
    pthread_t *threads;
    int result;

    if (argc != 4)
      {
        fprintf(stderr, "Invalid arguments number (threads, count, timeout)\n");
        return 1;
      }

    nb_threads = atoi(argv[1]);
    
    assert(!taktuk_init_threads());
    result = taktuk_get("rank", &rank);
    if (!result)
        result = taktuk_get("count", &count);
    if (result)
      {
        fprintf(stderr, "Invalid rank or count: %s, do you use TakTuk ?\n",
                taktuk_error_msg(result));
        return 1;
      }

    sender_parameters.vecteur[0].iov_base = first_part;
    sender_parameters.vecteur[0].iov_len = 128;
    sender_parameters.vecteur[1].iov_base = second_part;
    sender_parameters.vecteur[1].iov_len = 128;
    sender_parameters.iovcnt = 2;

    sender_parameters.iter = atoi(argv[2]);
    sender_parameters.rank = rank;
    sender_parameters.count = count;
    sender_parameters.dest = 3-rank;

    receiver_parameters.iter = atoi(argv[2]);
    receiver_parameters.rank = rank;
    receiver_parameters.count = count;
    receiver_parameters.timeout.tv_sec = atoi(argv[3]);
    receiver_parameters.timeout.tv_usec = 0;

    strcpy(first_part, "Salut  toi...");
    strcpy(second_part, "...petit processus");

    printf("I'm process %lu among %lu\n", rank, count);
    fflush(stdout);

    threads = malloc(sizeof(pthread_t)*nb_threads*2);
    assert(threads);
    pos = 0;

    for (i=0; i<nb_threads; i++)
      {
        assert(!pthread_create(&threads[pos++],NULL,sender,
                                                          &sender_parameters)); 
        assert(!pthread_create(&threads[pos++],NULL,receiver,
                                                        &receiver_parameters)); 
      }

    printf("Threads created\n");
    fflush(stdout);

    for (i=0; i<2*nb_threads; i++)
      {
        assert(!pthread_join(threads[i], NULL));
      }
    assert(!taktuk_leave_threads());

    return 0;
  }

Generated by  Doxygen 1.6.0   Back to index