This is the mail archive of the libc-alpha@sources.redhat.com mailing list for the glibc project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

memcpy-optimized version of qsort.


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi all,

  I recently studied the implementation of the merge sort algorithm, which is 
used by glibc's qsort() implementation, because I have been writing a 
program, which uses qsort() very frequently and I came across it during 
profiling my program. I use glibc-2.3.3 as shipped together with SuSE-9.1 (as 
far as I know a CVS snapshot from April 2004).

  Inside msort_with_tmp() in stdlib/msort.c I recognized, that the two sorted 
subsequences are merged into a temporary array and afterwards, the temporary 
array is copied back as a whole to the original place. This way each array 
element is copied twice per iteration, which makes n*log2(n)*2 copy 
operations for sorting n elements.

  I felt, that there must be a way to copy each element only once per 
iteration and after a few hours wtching my children play, I got the solution. 
(Please don't pledge me, if this is described in the standard literature:)

  The key to to my optimized memory bookkepping is the observation, that if 
you want to merge two sorted sequences into a resulting sequence, the second 
sequence could as well be stored in the upper part of the resulting array, 
because these elemnts are only copied from a higher address to a lower 
address, thus merge proess does not destroy any valuable information.

  So the attached patch implements msort_with_tmp() involving the following 
steps:

given: sort sequence b with n elements, tmp is temp. storage of size n 
elements.

n1:=n/2

1) sort first half of b from b into tmp.
2) sort second half of b from b+n1 into b+n1
3) merge tmp and b+n1 into b.

The minor bookkeeping issues, that have to be handled are best seen in the 
source code. I'd like to especially mention the switch for the trivial cases 
n==0 or n==1 at the start of the code. This gave a minor speedup on my 
Linux/P4 laptop copmared to the 'if (n<=0) return;' version.

  Benchmarks results on my P4 2GHz laptop:

$ gcc -static qsort_test.c -o qsort_test
$ time ./qsort_test 10000000 > out

real    0m13.084s
user    0m12.197s
sys     0m0.750s
$ gcc -static -L/usr/src/packages/BUILD/glibc-2.3/cc qsort_test.c -o 
qsort_test
$ time ./qsort_test 10000000 > out2

real    0m11.033s
user    0m10.046s
sys     0m0.817s

$ gcc -static qsort_test2.c -o qsort_test2
$ time ./qsort_test2 10000000 > out

real    0m49.588s
user    0m47.356s
sys     0m1.284s
$ gcc -static -L/usr/src/packages/BUILD/glibc-2.3/cc qsort_test2.c -o 
qsort_test2
$ time ./qsort_test2 10000000 > out2

real    0m44.355s
user    0m42.948s
sys     0m1.179s

So depending on the test case I observe a speedup of 10-20% for sorting 10 
Million keys. The optimized version of the lib is placed 
in /usr/src/packages/BUILD/glibc-2.3/cc. Executables linked w/o -L use the 
stock glibc from my distro. I linked the attached benchmark programs with 
- -static, so I didn't have to install the new shared library during the 
development process.

  Please perform further benchmarks with my patch and consider it for 
inclusion in forthcoming glibc releases.

  Best regards,

   Wolfgang
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.4 (GNU/Linux)

iD8DBQFAxHBeTif4DCz+puQRAhtGAJ4/qig+Ur+SdsE5RK9BdTvlEJxCPwCg0yHH
qzaWcT02xXsMSt71Xq6GW0w=
=EX6z
-----END PGP SIGNATURE-----
--- stdlib/msort.c.orig	2004-02-09 08:08:36.000000000 +0100
+++ stdlib/msort.c	2004-06-07 14:07:38.182909160 +0200
@@ -25,46 +25,69 @@
 #include <memcopy.h>
 #include <errno.h>
 
-static void msort_with_tmp (void *b, size_t n, size_t s,
-			    __compar_fn_t cmp, char *t);
+static void msort_with_tmp (char *b, size_t n, size_t s,
+			    __compar_fn_t cmp, char *res, char *tmp);
 
 static void
-msort_with_tmp (void *b, size_t n, size_t s, __compar_fn_t cmp,
-		char *t)
+msort_with_tmp (char *b, size_t n, size_t s, __compar_fn_t cmp,
+		char *res, char *tmp)
 {
-  char *tmp;
   char *b1, *b2;
   size_t n1, n2;
+  char *r,*t;
 
-  if (n <= 1)
-    return;
+  int word_aligned =
+    (s == OPSIZ && (b - (char *) 0) % OPSIZ == 0);
+
+  switch (n)
+    {
+    case 1:
+      if (res != b)
+	{
+	  if (word_aligned)
+	    *((op_t *) res) = *((op_t *) b);
+	  else
+	    memcpy (res, b, s);
+	}
+    case 0:
+      return;
+    default:
+      break;
+    }
 
   n1 = n / 2;
   n2 = n - n1;
-  b1 = b;
+  b1 = tmp;
   b2 = (char *) b + (n1 * s);
+  t  = tmp + (n1 * s);
 
-  msort_with_tmp (b1, n1, s, cmp, t);
-  msort_with_tmp (b2, n2, s, cmp, t);
-
-  tmp = t;
+  /* sort first part from b -> tmp */
+  if (n1)
+    msort_with_tmp (b, n1, s, cmp, b1, t);
+  /* sort 2nd part from b2 -> b2 */
+  if (n2 > 1)
+    msort_with_tmp (b2, n2, s, cmp, b2, t);
+
+  /* now merge the sorted parts, where b2 could be co-located
+     with res + n1 (in the case of b == res) */
+  r  = res;
 
-  if (s == OPSIZ && (b1 - (char *) 0) % OPSIZ == 0)
+  if (word_aligned)
     /* We are operating on aligned words.  Use direct word stores.  */
     while (n1 > 0 && n2 > 0)
       {
 	if ((*cmp) (b1, b2) <= 0)
 	  {
 	    --n1;
-	    *((op_t *) tmp) = *((op_t *) b1);
-	    tmp += sizeof (op_t);
+	    *((op_t *) r) = *((op_t *) b1);
+	    r += sizeof (op_t);
 	    b1 += sizeof (op_t);
 	  }
 	else
 	  {
 	    --n2;
-	    *((op_t *) tmp) = *((op_t *) b2);
-	    tmp += sizeof (op_t);
+	    *((op_t *) r) = *((op_t *) b2);
+	    r += sizeof (op_t);
 	    b2 += sizeof (op_t);
 	  }
       }
@@ -73,20 +96,23 @@
       {
 	if ((*cmp) (b1, b2) <= 0)
 	  {
-	    tmp = (char *) __mempcpy (tmp, b1, s);
+	    r = (char *) __mempcpy (r, b1, s);
 	    b1 += s;
 	    --n1;
 	  }
 	else
 	  {
-	    tmp = (char *) __mempcpy (tmp, b2, s);
+	    r = (char *) __mempcpy (r, b2, s);
 	    b2 += s;
 	    --n2;
 	  }
       }
+  /* handle trailing bits of the sorted sub-sequences */
   if (n1 > 0)
-    memcpy (tmp, b1, n1 * s);
-  memcpy (b, t, (n - n2) * s);
+    memcpy (r, b1, n1 * s);
+  /* r and b2 could be co-located, so avoid noops here. */
+  else if (n2 > 0 && r != b2)
+    memcpy (r, b2, n2 * s);
 }
 
 void
@@ -99,7 +125,7 @@
       void *buf = __alloca (size);
 
       /* The temporary array is small, so put it on the stack.  */
-      msort_with_tmp (b, n, s, cmp, buf);
+      msort_with_tmp ((char *)b, n, s, cmp, b, buf);
     }
   else
     {
@@ -151,7 +177,7 @@
 	  else
 	    {
 	      __set_errno (save);
-	      msort_with_tmp (b, n, s, cmp, tmp);
+	      msort_with_tmp ((char *)b, n, s, cmp, b, tmp);
 	      free (tmp);
 	    }
 	}
/* An alternative to qsort, with an identical interface.
   This file is part of the GNU C Library.
   Copyright (C) 1992,95-97,99,2000,01,02,04 Free Software Foundation, Inc.
   Written by Mike Haertel, September 1988.

   The GNU C Library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
   License as published by the Free Software Foundation; either
   version 2.1 of the License, or (at your option) any later version.

   The GNU C Library is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   Lesser General Public License for more details.

   You should have received a copy of the GNU Lesser General Public
   License along with the GNU C Library; if not, write to the Free
   Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
   02111-1307 USA.  */

#include <alloca.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <memcopy.h>
#include <errno.h>

static void msort_with_tmp (char *b, size_t n, size_t s,
			    __compar_fn_t cmp, char *res, char *tmp);

static void
msort_with_tmp (char *b, size_t n, size_t s, __compar_fn_t cmp,
		char *res, char *tmp)
{
  char *b1, *b2;
  size_t n1, n2;
  char *r,*t;

  int word_aligned =
    (s == OPSIZ && (b - (char *) 0) % OPSIZ == 0);

  switch (n)
    {
    case 1:
      if (res != b)
	{
	  if (word_aligned)
	    *((op_t *) res) = *((op_t *) b);
	  else
	    memcpy (res, b, s);
	}
    case 0:
      return;
    default:
      break;
    }

  n1 = n / 2;
  n2 = n - n1;
  b1 = tmp;
  b2 = (char *) b + (n1 * s);
  t  = tmp + (n1 * s);

  /* sort first part from b -> tmp */
  if (n1)
    msort_with_tmp (b, n1, s, cmp, b1, t);
  /* sort 2nd part from b2 -> b2 */
  if (n2 > 1)
    msort_with_tmp (b2, n2, s, cmp, b2, t);

  /* now merge the sorted parts, where b2 could be co-located
     with res + n1 (in the case of b == res) */
  r  = res;

  if (word_aligned)
    /* We are operating on aligned words.  Use direct word stores.  */
    while (n1 > 0 && n2 > 0)
      {
	if ((*cmp) (b1, b2) <= 0)
	  {
	    --n1;
	    *((op_t *) r) = *((op_t *) b1);
	    r += sizeof (op_t);
	    b1 += sizeof (op_t);
	  }
	else
	  {
	    --n2;
	    *((op_t *) r) = *((op_t *) b2);
	    r += sizeof (op_t);
	    b2 += sizeof (op_t);
	  }
      }
  else
    while (n1 > 0 && n2 > 0)
      {
	if ((*cmp) (b1, b2) <= 0)
	  {
	    r = (char *) __mempcpy (r, b1, s);
	    b1 += s;
	    --n1;
	  }
	else
	  {
	    r = (char *) __mempcpy (r, b2, s);
	    b2 += s;
	    --n2;
	  }
      }
  /* handle trailing bits of the sorted sub-sequences */
  if (n1 > 0)
    memcpy (r, b1, n1 * s);
  /* r and b2 could be co-located, so avoid noops here. */
  else if (n2 > 0 && r != b2)
    memcpy (r, b2, n2 * s);
}

void
qsort (void *b, size_t n, size_t s, __compar_fn_t cmp)
{
  const size_t size = n * s;

  if (size < 1024)
    {
      void *buf = __alloca (size);

      /* The temporary array is small, so put it on the stack.  */
      msort_with_tmp ((char *)b, n, s, cmp, b, buf);
    }
  else
    {
      /* We should avoid allocating too much memory since this might
	 have to be backed up by swap space.  */
      static long int phys_pages;
      static int pagesize;

      if (phys_pages == 0)
	{
	  phys_pages = __sysconf (_SC_PHYS_PAGES);

	  if (phys_pages == -1)
	    /* Error while determining the memory size.  So let's
	       assume there is enough memory.  Otherwise the
	       implementer should provide a complete implementation of
	       the `sysconf' function.  */
	    phys_pages = (long int) (~0ul >> 1);

	  /* The following determines that we will never use more than
	     a quarter of the physical memory.  */
	  phys_pages /= 4;

	  pagesize = __sysconf (_SC_PAGESIZE);
	}

      /* Just a comment here.  We cannot compute
	   phys_pages * pagesize
	   and compare the needed amount of memory against this value.
	   The problem is that some systems might have more physical
	   memory then can be represented with a `size_t' value (when
	   measured in bytes.  */

      /* If the memory requirements are too high don't allocate memory.  */
      if (size / pagesize > (size_t) phys_pages)
	_quicksort (b, n, s, cmp);
      else
	{
	  /* It's somewhat large, so malloc it.  */
	  int save = errno;
	  char *tmp = malloc (size);
	  if (tmp == NULL)
	    {
	      /* Couldn't get space, so use the slower algorithm
		 that doesn't need a temporary array.  */
	      __set_errno (save);
	      _quicksort (b, n, s, cmp);
	    }
	  else
	    {
	      __set_errno (save);
	      msort_with_tmp ((char *)b, n, s, cmp, b, tmp);
	      free (tmp);
	    }
	}
    }
}
libc_hidden_def (qsort)
#include<stdio.h>
#include<stdlib.h>

static int compare(const void *a, const void *b)
{
  long *la=(long*)a;
  long *lb=(long*)b;

  if (*la<*lb) return -1;
  return *la>*lb;
}

int main(int argc, char **argv)
{
  size_t nvalues;
  long int * values;
  size_t i;

  if (argc < 2 || sscanf(argv[1],"%lu",&nvalues) != 1)
    {
      fprintf(stderr,"usage: %s <nvalues>.\n",argv[0]);
      return 1;
    }

  values = malloc(nvalues*sizeof(long));

  for (i=0;i<nvalues;++i)
    values[i] = random();

  qsort(values,nvalues,sizeof(long),compare);

  for (i=0;i<nvalues;++i)
    printf("%ld\n",values[i]);

  free(values);

  return 0;
}
#include<stdio.h>
#include<stdlib.h>
#include<string.h>

// must be large enough to hold string reps of lng integers. */
#define VAL_LEN 12

typedef int (*compare_func_t)(const void*,const void*);

int main(int argc, char **argv)
{
  size_t nvalues;
  char (*values)[VAL_LEN];
  size_t i;

  if (argc < 2 || sscanf(argv[1],"%lu",&nvalues) != 1)
    {
      fprintf(stderr,"usage: %s <nvalues>.\n",argv[0]);
      return 1;
    }

  values = malloc(nvalues*VAL_LEN);

  for (i=0;i<nvalues;++i)
    snprintf(values[i],VAL_LEN,"%*ld",VAL_LEN-1,random());

  qsort(values,nvalues,VAL_LEN,(compare_func_t)strcmp);

  for (i=0;i<nvalues;++i)
    printf("%s\n",values[i]);

  free(values);

  return 0;
}

Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]