这里用了二分法和归并排序
#include <stdio.h> #include<stdlib.h> #include<mpi.h> double* readata(double* data, int n, int my_rank, int comm_sz) { if (my_rank == 0) { FILE* fp; fp = fopen("in.txt", "r"); if (fp == NULL) { printf("open failed"); exit(0); } int i = 0; data = (double*)malloc(sizeof(double)*n); for (i = 0; i < n; i++)fscanf(fp, "%le", &data[i]); //printf("process %d readata\n", my_rank); } return data; } double* scatterdata(double* data, int n, double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm) { if (n%comm_sz == 0) { //printf("zhengchu ok\n"); *local_n = n / comm_sz; local_data = (double*)malloc(sizeof(double)*(*local_n)); MPI_Scatter(data, *local_n, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm); } else { //printf("no zhengchu%d\n", my_rank); //printf("n is %ld\n", n); *local_n = n / comm_sz; if (my_rank == comm_sz - 1) (*local_n) += n % comm_sz; local_data = (double*)malloc(sizeof(double)*(*local_n)); int* send_count=NULL; int* displ=NULL; //output(data, n, my_rank); if (my_rank == 0) { send_count = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要获得的数据量 displ = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要读取的第一个数据在data中的位置 int flag = 0; for (int i = 0; i < comm_sz; i++) { send_count[i] = n / comm_sz; if (i == 0)flag = 0; else flag += send_count[i - 1]; displ[i] = flag; if (i == comm_sz - 1)send_count[i] += n % comm_sz; //printf("process%d send_count[%d] is %d\n", my_rank, i, send_count[i]); } //printf("first %d\n", send_count[0]); //output(send_count,comm_sz, my_rank); } //printf("qian%d,%d\n", my_rank,*local_n); MPI_Scatterv(data, send_count, displ, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm); //printf("hou%d,%d\n", my_rank, *local_n); //output(local_data, *local_n, my_rank); if (my_rank == 0) { free(send_count); free(displ); free(data); } } return local_data; } double* mergesort_parallel(double* local_data, int* local_n, double* local_partner_data, int local_partner_n) { int n = *local_n + local_partner_n, i = 0, j = 0, k = 0; double* alldata = (double*)malloc(sizeof(double)*n); while (i < *local_n && j < local_partner_n) { if (local_data[i] <= local_partner_data[j])alldata[k++] = local_data[i++]; else alldata[k++] = local_partner_data[j++]; } while (i < *local_n)alldata[k++] = local_data[i++]; while (j < local_partner_n)alldata[k++] = local_partner_data[j++]; free(local_data); free(local_partner_data); *local_n = n; return alldata; } void merge(double* a, int l1, int r1, int l2, int r2) { int i = l1, j = l2; double* temp; int index = 0; temp = (double*)malloc(sizeof(double)*(r1 - l1 + r2 - l2 + 2)); while (i <= r1 && j <= r2) { if (a[i] <= a[j]) { temp[index++] = a[i++]; } else { temp[index++] = a[j++]; } } while (i <= r1) temp[index++] = a[i++]; while (j <= r2) temp[index++] = a[j++]; for (i = 0; i < index; i++) { a[l1 + i] = temp[i]; } free(temp); } void mergesort(double *a,int left, int right) { if (left < right) { int mid = (left + right) / 2; mergesort(a, left, mid); mergesort(a, mid + 1, right); merge(a, left, mid, mid + 1, right); } } double* sortdata(double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm) { mergesort(local_data, 0, *local_n - 1); //printf("process %d sortdata\n", my_rank); int last = comm_sz - 1, mid; while (last != 0) { if (last % 2 == 0) { mid = last / 2; if (my_rank > mid) { MPI_Send(local_n, 1, MPI_INT, last-my_rank, 0, comm); MPI_Send(local_data, *local_n, MPI_DOUBLE,last- my_rank , 0, comm); } else if (my_rank < mid) { int local_partner_n; MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE); if (my_rank == 0)printf("process %d local_partner_n is %ld\n", my_rank, local_partner_n); double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n); MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE); local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n); if (my_rank == 0)printf("new local_n is "); printf("here\n"); } last = mid; } else { mid = (last + 1) / 2; if (my_rank >= mid) { MPI_Send(local_n, 1, MPI_INT, last - my_rank, 0, comm); MPI_Send(local_data, *local_n, MPI_DOUBLE, last - my_rank, 0, comm); } else { int local_partner_n; MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE); //printf("my partner number is %d\n", local_partner_n); double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n); MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE); local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n); } last = mid - 1; } if (my_rank > last)break;//如果我的进程是用过的,那就不要再使用了 } //if (my_rank == 0)printf("process 0 local_n%d\n", *local_n); return local_data; } void writedata(double* local_data,int local_n, int my_rank, int comm_sz) { if (my_rank == 0) { FILE* fp; fp = fopen("output.txt", "w"); if (fp == NULL) { printf("open failed"); exit(0); } int i = 0; while (local_n--)fprintf(fp, "%f\n", local_data[i++]); //printf("process %d writedata\n", my_rank); } } void output(double* local_data, int local_n,int my_rank) { if (local_data == NULL) { printf("process %d is NULL\n", my_rank); return; } for (int i = 0; i < local_n; i++) { printf("process %d local_n-----------%d\n", my_rank,local_data[i]); //printf("%f\n", local_data[i]); } } int main() { int my_rank, comm_sz; MPI_Init(NULL, NULL); MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); MPI_Comm_size(MPI_COMM_WORLD, &comm_sz); int n, local_n; double* data = NULL, *local_data = NULL; n = 100000; data=readata(data, n, my_rank, comm_sz); //printf("hehe%d\n",my_rank); MPI_Barrier(MPI_COMM_WORLD); //printf("haha%d\n",my_rank); double local_start = MPI_Wtime(); local_data=scatterdata(data, n, local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD); //千万不要忘记传递通信子 //printf("after scatter process %d local_n is %d\n", my_rank, local_n); //output(local_data, local_n, my_rank); local_data=sortdata(local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD); //printf("process %d sortdata\n", my_rank); //output(local_data, local_n, my_rank); double local_finish = MPI_Wtime(); double local_elapsed = local_finish - local_start; double elapsed; MPI_Reduce(&local_elapsed, &elapsed, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); if (my_rank == 0)printf("elapsed time is %e s\n", elapsed); writedata(local_data, local_n, my_rank, comm_sz); MPI_Finalize(); return 0; }