• Create a new directory where the ELDK is to be installed, say:
bash$ mkdir /opt/eldk
• Mount a CD or an ISO image with the distribution:
bash$ mount /dev/cdrom /mnt/cdrom
• Run the installation utility included on the distribution to install into that specified directory:
bash$ /mnt/cdrom/install -d /opt/eldk
• After the installation utility completes, export the CROSS_COMPILE variable:
bash$ export CROSS_COMPILE=ppc_8xx-
The trailing '-' character in the CROSS_COMPILE variable value is optional and has no effect on
the cross tools behavior.
• Add the directories /opt/eldk/usr/bin and /opt/eldk/bin to PATH:
bash$ PATH=$PATH:/opt/eldk/usr/bin:/opt/eldk/bin
• Compile a file:
bash$ ${CROSS_COMPILE}gcc -o hello_world hello_world.c
You can also call the cross tools using the generic prefix ppc-linux- for example:
bash$ ppc-linux-gcc -o hello_world hello_world.c
3.5. Working with ELDK 14
• or, equivalently:
bash$ /opt/eldk/usr/ppc-linux/bin/gcc -o hello_world hello_world.c
The value of the CROSS_COMPILE variable must correspond to the target CPU family you want the cross
tools to work for. Refer to the table below for the supported CROSS_COMPILE variable values:
3.5.A Table of possible values for $CROSS_COMPILE
CROSS_COMPILE Value | Predefined Compiler Flag | FPU present or not
ppc_4xx- | -mcpu=403 | No
ppc_4xxFP- | -mcpu=405fp | Yes
ppc_6xx- | -mcpu=603 | Yes
ppc_74xx- | -mcpu=7400 | Yes
ppc_8xx- | -mcpu=860 | No
ppc_85xx- | -mcpu=8540 | Yes
For compatibility with older versions of the ELDK and with other toolkits the following values for
$CROSS_COMPILE can be used, too: ppc_7xx- and ppc_82xx-. These are synonyms for ppc_6xx.
Saturday, December 25, 2010
Thursday, December 23, 2010
An Example of Creating a Compressed RAM Disk
----------------------------------------------
To create a RAM disk image, you will need a spare block device to
construct it on. This can be the RAM disk device itself, or an
unused disk partition (such as an unmounted swap partition). For this
example, we will use the RAM disk device, "/dev/ram".
Note: This technique should not be done on a machine with less than 8 MB
of RAM. If using a spare disk partition instead of /dev/ram, then this
restriction does not apply.
a) Decide on the RAM disk size that you want. Say 2 MB for this example.
Create it by writing to the RAM disk device. (This step is not currently
required, but may be in the future.) It is wise to zero out the
area (esp. for disks) so that maximal compression is achieved for
the unused blocks of the image that you are about to create.
dd if=/dev/zero of=/dev/ram bs=1k count=2048
b) Make a filesystem on it. Say ext2fs for this example.
mke2fs -vm0 /dev/ram 2048
c) Mount it, copy the files you want to it (eg: /etc/* /dev/* ...)
and unmount it again.
d) Compress the contents of the RAM disk. The level of compression
will be approximately 50% of the space used by the files. Unused
space on the RAM disk will compress to almost nothing.
dd if=/dev/ram bs=1k count=2048 | gzip -v9 > /tmp/ram_image.gz
e) Put the kernel onto the floppy
dd if=zImage of=/dev/fd0 bs=1k
f) Put the RAM disk image onto the floppy, after the kernel. Use an offset
that is slightly larger than the kernel, so that you can put another
(possibly larger) kernel onto the same floppy later without overlapping
the RAM disk image. An offset of 400 kB for kernels about 350 kB in
size would be reasonable. Make sure offset+size of ram_image.gz is
not larger than the total space on your floppy (usually 1440 kB).
dd if=/tmp/ram_image.gz of=/dev/fd0 bs=1k seek=400
g) Use "rdev" to set the boot device, RAM disk offset, prompt flag, etc.
For prompt_ramdisk=1, load_ramdisk=1, ramdisk_start=400, one would
have 2^15 + 2^14 + 400 = 49552.
rdev /dev/fd0 /dev/fd0
rdev -r /dev/fd0 49552
That is it. You now have your boot/root compressed RAM disk floppy. Some
users may wish to combine steps (d) and (f) by using a pipe.
To create a RAM disk image, you will need a spare block device to
construct it on. This can be the RAM disk device itself, or an
unused disk partition (such as an unmounted swap partition). For this
example, we will use the RAM disk device, "/dev/ram".
Note: This technique should not be done on a machine with less than 8 MB
of RAM. If using a spare disk partition instead of /dev/ram, then this
restriction does not apply.
a) Decide on the RAM disk size that you want. Say 2 MB for this example.
Create it by writing to the RAM disk device. (This step is not currently
required, but may be in the future.) It is wise to zero out the
area (esp. for disks) so that maximal compression is achieved for
the unused blocks of the image that you are about to create.
dd if=/dev/zero of=/dev/ram bs=1k count=2048
b) Make a filesystem on it. Say ext2fs for this example.
mke2fs -vm0 /dev/ram 2048
c) Mount it, copy the files you want to it (eg: /etc/* /dev/* ...)
and unmount it again.
d) Compress the contents of the RAM disk. The level of compression
will be approximately 50% of the space used by the files. Unused
space on the RAM disk will compress to almost nothing.
dd if=/dev/ram bs=1k count=2048 | gzip -v9 > /tmp/ram_image.gz
e) Put the kernel onto the floppy
dd if=zImage of=/dev/fd0 bs=1k
f) Put the RAM disk image onto the floppy, after the kernel. Use an offset
that is slightly larger than the kernel, so that you can put another
(possibly larger) kernel onto the same floppy later without overlapping
the RAM disk image. An offset of 400 kB for kernels about 350 kB in
size would be reasonable. Make sure offset+size of ram_image.gz is
not larger than the total space on your floppy (usually 1440 kB).
dd if=/tmp/ram_image.gz of=/dev/fd0 bs=1k seek=400
g) Use "rdev" to set the boot device, RAM disk offset, prompt flag, etc.
For prompt_ramdisk=1, load_ramdisk=1, ramdisk_start=400, one would
have 2^15 + 2^14 + 400 = 49552.
rdev /dev/fd0 /dev/fd0
rdev -r /dev/fd0 49552
That is it. You now have your boot/root compressed RAM disk floppy. Some
users may wish to combine steps (d) and (f) by using a pipe.
Manually building a custom initial RAM disk
Because there is no hard drive in many embedded systems based on Linux, the initrd also serves as the permanent root file system.I'm using a standard Linux desktop so you can follow along without an embedded target. Other than cross-compilation, the concepts (as they apply to initrd construction) are the same for an embedded target.
Utility (mkird) to create a custom initrd
# Create an empty ramdisk image
dd if=/dev/zero of=/tmp/ramdisk.img bs=$BLKSIZE count=$RDSIZE
# Make it an ext2 mountable file system
/sbin/mke2fs -F -m 0 -b $BLKSIZE /tmp/ramdisk.img $RDSIZE
# Mount it so that we can populate
mount /tmp/ramdisk.img /mnt/initrd -t ext2 -o loop=/dev/loop0
Utility (mkird) to create a custom initrd
# Create an empty ramdisk image
dd if=/dev/zero of=/tmp/ramdisk.img bs=$BLKSIZE count=$RDSIZE
# Make it an ext2 mountable file system
/sbin/mke2fs -F -m 0 -b $BLKSIZE /tmp/ramdisk.img $RDSIZE
# Mount it so that we can populate
mount /tmp/ramdisk.img /mnt/initrd -t ext2 -o loop=/dev/loop0
Saturday, December 18, 2010
Sasken Tech Round F2F Interview Questions181210 (channi)
1. What is difference between pointers and arrays?
Is it possible to create an array using pointers?
2. What is the circular linked list?
How to find whether a linked list is a circular linked list or not?
3. What is the difference between USB 1.0 and 2.0?
4. How we can insert module in to the kernel dynamically?
1. What is difference between pointers and arrays?
Is it possible to create an array using pointers?
2. What is the circular linked list?
How to find whether a linked list is a circular linked list or not?
3. What is the difference between USB 1.0 and 2.0?
4. How we can insert module in to the kernel dynamically?
Friday, December 10, 2010
Wednesday, December 1, 2010
NAME
memory: memccpy(), memchr(), memcmp(), memcpy(), memmove(), memset(),
bcmp(), bcopy(), bzero(), ffs() - memory operations
SYNOPSIS
#include
void *memccpy(void *__restrict s1, const void *__restrict s2, int c,
size_t n);
void *memchr(const void *s, int c, size_t n);
int memcmp(const void *s1, const void *s2, size_t n);
void *memcpy(void *__restrict s1, const void *__restrict s2, size_t n);
void *memmove(void *s1, const void *s2, size_t n);
void *memset(void *s, int c, size_t n);
#include
int bcmp(const void *s1, const void *s2, size_t n);
void bcopy(const void *s1, void *s2, size_t n);
void bzero(void *s, size_t n);
int ffs(int i);
Remarks
bcmp(), bcopy(), bzero(), ffs(), and are provided solely
for portability of BSD applications, and are not recommended for new
applications where portability is important. For portable
applications, use memcmp(), memmove(), and memset(), respectively.
ffs() has no portable equivalent.
DESCRIPTION
These functions operate as efficiently as possible on memory areas
(arrays of bytes bounded by a count, not terminated by a null byte).
They do not check for the overflow of any receiving memory area.
Definitions for all these functions, the type size_t, and the constant
NULL are provided in the header file.
memccpy() Copy bytes from the object pointed to by s2 into the
object pointed to by s1, stopping after the first
occurrence of byte c has been copied, or after n bytes
have been copied, whichever comes first. If copying
takes place between objects that overlap, the behavior
is undefined. memccpy() returns a pointer to the byte
after the copy of c in s1, or a NULL pointer if c was
not found in the first n bytes of s2.
memchr() Locate the first occurrence of c (converted to an
unsigned char) in the initial n bytes (each interpreted
as unsigned char) of the object pointed to by s.
memchr() returns a pointer to the located byte, or a
NULL pointer if the byte does not occur in the object.
memcmp() Compare the first n bytes of the object pointed to by
s1 to the first n bytes of the object pointed to by s2.
memcmp() returns an integer greater than, equal to, or
less than zero, according to whether the object pointed
to by s1 is greater than, equal to, or less than the
object pointed to by s2. The sign of a non-zero return
value is determined by the sign of the difference
between the values of the first pair of bytes (both
interpreted as unsigned char) that differ in the
objects being compared.
memcpy() Copy n bytes from the object pointed to by s2 into the
object pointed to by s1. If copying takes place
between objects that overlap, the behavior is
undefined. memcpy() returns the value of s1.
memmove() Copy n bytes from the object pointed to by s2 into the
object pointed to by s1. Copying takes place as if the
n bytes from the object pointed to by s2 are first
copied into a temporary array of n bytes that does not
overlap the objects pointed to by s1 and s2, and then
the n bytes from the temporary array are copied into
the object pointed to by s1. memmove() returns the
value of s1.
memset() Copy the value of c (converted to an unsigned char)
into each of the first n bytes of the object pointed to
by s. memset() returns the value of s.
bcopy() copies n bytes from the area pointed to by s1 to the
area pointed to by s2.
bcmp() Compare the first n bytes of the area pointed to by s1
with the area pointed to by s2. bcmp() returns zero if
they are identical; non-zero otherwise. Both areas are
assumed to be n bytes in length.
bzero() Clear n bytes in the area pointed to by s by setting
them to zero.
ffs() Find the first bit set (beginning with the least
significant bit) and return the index of that bit.
Bits are numbered starting at one. A return value of 0
indicates that i is zero.
memory: memccpy(), memchr(), memcmp(), memcpy(), memmove(), memset(),
bcmp(), bcopy(), bzero(), ffs() - memory operations
SYNOPSIS
#include
void *memccpy(void *__restrict s1, const void *__restrict s2, int c,
size_t n);
void *memchr(const void *s, int c, size_t n);
int memcmp(const void *s1, const void *s2, size_t n);
void *memcpy(void *__restrict s1, const void *__restrict s2, size_t n);
void *memmove(void *s1, const void *s2, size_t n);
void *memset(void *s, int c, size_t n);
#include
int bcmp(const void *s1, const void *s2, size_t n);
void bcopy(const void *s1, void *s2, size_t n);
void bzero(void *s, size_t n);
int ffs(int i);
Remarks
bcmp(), bcopy(), bzero(), ffs(), and
for portability of BSD applications, and are not recommended for new
applications where portability is important. For portable
applications, use memcmp(), memmove(), and memset(), respectively.
ffs() has no portable equivalent.
DESCRIPTION
These functions operate as efficiently as possible on memory areas
(arrays of bytes bounded by a count, not terminated by a null byte).
They do not check for the overflow of any receiving memory area.
Definitions for all these functions, the type size_t, and the constant
NULL are provided in the
memccpy() Copy bytes from the object pointed to by s2 into the
object pointed to by s1, stopping after the first
occurrence of byte c has been copied, or after n bytes
have been copied, whichever comes first. If copying
takes place between objects that overlap, the behavior
is undefined. memccpy() returns a pointer to the byte
after the copy of c in s1, or a NULL pointer if c was
not found in the first n bytes of s2.
memchr() Locate the first occurrence of c (converted to an
unsigned char) in the initial n bytes (each interpreted
as unsigned char) of the object pointed to by s.
memchr() returns a pointer to the located byte, or a
NULL pointer if the byte does not occur in the object.
memcmp() Compare the first n bytes of the object pointed to by
s1 to the first n bytes of the object pointed to by s2.
memcmp() returns an integer greater than, equal to, or
less than zero, according to whether the object pointed
to by s1 is greater than, equal to, or less than the
object pointed to by s2. The sign of a non-zero return
value is determined by the sign of the difference
between the values of the first pair of bytes (both
interpreted as unsigned char) that differ in the
objects being compared.
memcpy() Copy n bytes from the object pointed to by s2 into the
object pointed to by s1. If copying takes place
between objects that overlap, the behavior is
undefined. memcpy() returns the value of s1.
memmove() Copy n bytes from the object pointed to by s2 into the
object pointed to by s1. Copying takes place as if the
n bytes from the object pointed to by s2 are first
copied into a temporary array of n bytes that does not
overlap the objects pointed to by s1 and s2, and then
the n bytes from the temporary array are copied into
the object pointed to by s1. memmove() returns the
value of s1.
memset() Copy the value of c (converted to an unsigned char)
into each of the first n bytes of the object pointed to
by s. memset() returns the value of s.
bcopy() copies n bytes from the area pointed to by s1 to the
area pointed to by s2.
bcmp() Compare the first n bytes of the area pointed to by s1
with the area pointed to by s2. bcmp() returns zero if
they are identical; non-zero otherwise. Both areas are
assumed to be n bytes in length.
bzero() Clear n bytes in the area pointed to by s by setting
them to zero.
ffs() Find the first bit set (beginning with the least
significant bit) and return the index of that bit.
Bits are numbered starting at one. A return value of 0
indicates that i is zero.
the ‘Volatile’ Keyword in C
One of my favourite interview questions for novice programmers is: “What is the use of the ‘volatile’ keyword?” For experienced programmers, I ask: “Can we qualify a variable as both ‘const’ and ‘volatile’—if so, what is its meaning?” I bet most of you don’t know the answer, right?
The keyword ‘volatile’ is to do with compiler optimisation. Consider the following code:
long *timer = 0x0000ABCD;
// assume that at location 0x0000ABCD the current time is available
long curr_time = *timer;
// initialize curr_time to value from ‘timer’
// wait in while for 1 sec (i.e. 1000 millisec)
while( (curr_time - *timer) < 1000 )
{
curr_time = *timer; // update current time
}
print_time(curr_time);
// this function prints the current time from the
// passed long variable
Usually, hardware has a timer that can be accessed from a memory location. Here, assume that it’s 0×0000ABCD and is accessed using a long * variable ‘timer’ (in the UNIX tradition, time can be represented as a long variable and increments are done in milliseconds). The loop is meant to wait one second (or 1,000 milliseconds) by repeatedly updating curr_time with the new value from the timer. After a one second delay, the program prints the new time. Looks fine, right?
However, from the compiler point of view, what the loop does is stupid—it repeatedly assigns curr_time with *timer, which is equivalent to doing it once outside the loop. Also, the variable ‘timer’ is de-referenced repeatedly in the loop—when it is enough to do it once. So, to make the code more efficient (i.e., to optimise it), it may modify loop code as follows:
curr_time = *timer; // update current time
long temp_time = *timer;
while( (curr_time - temp_timer) < 1000 )
{ /* do nothing here */
}
As you can see, the result of this transformation is disastrous: the loop will never terminate because neither is curr_time updated nor is the timer de-referenced repeatedly to get new (updated time) values.
What we need is a way to tell the compiler not to ‘play around’ with such variables by declaring them volatile, as in:
volatile long * timer = 0x0000ABCD;
volatile curr_time = *timer;
Now, the compiler will not do any optimisation on these variables. This, essentially, is the meaning of the ‘volatile’ keyword: It declares the variables as ‘asynchronous’ variables, i.e., variables that are ‘not-modified-sequentially’. Implicitly, all variables that are not declared volatile are ‘synchronous variables’.
How about qualifying a variable as both const and volatile? As we know, when we declare a variable as const, we mean it’s a ‘read-only’ variable—once we initialise it, we will not change it again, and will only read its value. Here is a modified version of the example:
long * const timer = 0x0000ABCD;
// rest of the code as it was before..
We will never change the address of a timer, so we can put it as a const variable. Now, remember what we did to declare the timer as volatile:
volatile long * timer = 0x0000ABCD;
We can now combine const and volatile together:
volatile long * const timer = 0x0000ABCD;
It reads as follows: the timer is a const pointer to a long volatile variable. In plain English, it means that the timer is a variable that I will not change; it points to a value that can be changed without the knowledge of the compiler!
One of my favourite interview questions for novice programmers is: “What is the use of the ‘volatile’ keyword?” For experienced programmers, I ask: “Can we qualify a variable as both ‘const’ and ‘volatile’—if so, what is its meaning?” I bet most of you don’t know the answer, right?
The keyword ‘volatile’ is to do with compiler optimisation. Consider the following code:
long *timer = 0x0000ABCD;
// assume that at location 0x0000ABCD the current time is available
long curr_time = *timer;
// initialize curr_time to value from ‘timer’
// wait in while for 1 sec (i.e. 1000 millisec)
while( (curr_time - *timer) < 1000 )
{
curr_time = *timer; // update current time
}
print_time(curr_time);
// this function prints the current time from the
// passed long variable
Usually, hardware has a timer that can be accessed from a memory location. Here, assume that it’s 0×0000ABCD and is accessed using a long * variable ‘timer’ (in the UNIX tradition, time can be represented as a long variable and increments are done in milliseconds). The loop is meant to wait one second (or 1,000 milliseconds) by repeatedly updating curr_time with the new value from the timer. After a one second delay, the program prints the new time. Looks fine, right?
However, from the compiler point of view, what the loop does is stupid—it repeatedly assigns curr_time with *timer, which is equivalent to doing it once outside the loop. Also, the variable ‘timer’ is de-referenced repeatedly in the loop—when it is enough to do it once. So, to make the code more efficient (i.e., to optimise it), it may modify loop code as follows:
curr_time = *timer; // update current time
long temp_time = *timer;
while( (curr_time - temp_timer) < 1000 )
{ /* do nothing here */
}
As you can see, the result of this transformation is disastrous: the loop will never terminate because neither is curr_time updated nor is the timer de-referenced repeatedly to get new (updated time) values.
What we need is a way to tell the compiler not to ‘play around’ with such variables by declaring them volatile, as in:
volatile long * timer = 0x0000ABCD;
volatile curr_time = *timer;
Now, the compiler will not do any optimisation on these variables. This, essentially, is the meaning of the ‘volatile’ keyword: It declares the variables as ‘asynchronous’ variables, i.e., variables that are ‘not-modified-sequentially’. Implicitly, all variables that are not declared volatile are ‘synchronous variables’.
How about qualifying a variable as both const and volatile? As we know, when we declare a variable as const, we mean it’s a ‘read-only’ variable—once we initialise it, we will not change it again, and will only read its value. Here is a modified version of the example:
long * const timer = 0x0000ABCD;
// rest of the code as it was before..
We will never change the address of a timer, so we can put it as a const variable. Now, remember what we did to declare the timer as volatile:
volatile long * timer = 0x0000ABCD;
We can now combine const and volatile together:
volatile long * const timer = 0x0000ABCD;
It reads as follows: the timer is a const pointer to a long volatile variable. In plain English, it means that the timer is a variable that I will not change; it points to a value that can be changed without the knowledge of the compiler!
the ‘Volatile’ Keyword in C
One of my favourite interview questions for novice programmers is: “What is the use of the ‘volatile’ keyword?” For experienced programmers, I ask: “Can we qualify a variable as both ‘const’ and ‘volatile’—if so, what is its meaning?” I bet most of you don’t know the answer, right?
The keyword ‘volatile’ is to do with compiler optimisation. Consider the following code:
long *timer = 0x0000ABCD;
// assume that at location 0x0000ABCD the current time is available
long curr_time = *timer;
// initialize curr_time to value from ‘timer’
// wait in while for 1 sec (i.e. 1000 millisec)
while( (curr_time - *timer) < 1000 )
{
curr_time = *timer; // update current time
}
print_time(curr_time);
// this function prints the current time from the
// passed long variable
Usually, hardware has a timer that can be accessed from a memory location. Here, assume that it’s 0×0000ABCD and is accessed using a long * variable ‘timer’ (in the UNIX tradition, time can be represented as a long variable and increments are done in milliseconds). The loop is meant to wait one second (or 1,000 milliseconds) by repeatedly updating curr_time with the new value from the timer. After a one second delay, the program prints the new time. Looks fine, right?
However, from the compiler point of view, what the loop does is stupid—it repeatedly assigns curr_time with *timer, which is equivalent to doing it once outside the loop. Also, the variable ‘timer’ is de-referenced repeatedly in the loop—when it is enough to do it once. So, to make the code more efficient (i.e., to optimise it), it may modify loop code as follows:
curr_time = *timer; // update current time
long temp_time = *timer;
while( (curr_time - temp_timer) < 1000 )
{ /* do nothing here */
}
As you can see, the result of this transformation is disastrous: the loop will never terminate because neither is curr_time updated nor is the timer de-referenced repeatedly to get new (updated time) values.
What we need is a way to tell the compiler not to ‘play around’ with such variables by declaring them volatile, as in:
volatile long * timer = 0x0000ABCD;
volatile curr_time = *timer;
Now, the compiler will not do any optimisation on these variables. This, essentially, is the meaning of the ‘volatile’ keyword: It declares the variables as ‘asynchronous’ variables, i.e., variables that are ‘not-modified-sequentially’. Implicitly, all variables that are not declared volatile are ‘synchronous variables’.
How about qualifying a variable as both const and volatile? As we know, when we declare a variable as const, we mean it’s a ‘read-only’ variable—once we initialise it, we will not change it again, and will only read its value. Here is a modified version of the example:
long * const timer = 0x0000ABCD;
// rest of the code as it was before..
We will never change the address of a timer, so we can put it as a const variable. Now, remember what we did to declare the timer as volatile:
volatile long * timer = 0x0000ABCD;
We can now combine const and volatile together:
volatile long * const timer = 0x0000ABCD;
It reads as follows: the timer is a const pointer to a long volatile variable. In plain English, it means that the timer is a variable that I will not change; it points to a value that can be changed without the knowledge of the compiler!
One of my favourite interview questions for novice programmers is: “What is the use of the ‘volatile’ keyword?” For experienced programmers, I ask: “Can we qualify a variable as both ‘const’ and ‘volatile’—if so, what is its meaning?” I bet most of you don’t know the answer, right?
The keyword ‘volatile’ is to do with compiler optimisation. Consider the following code:
long *timer = 0x0000ABCD;
// assume that at location 0x0000ABCD the current time is available
long curr_time = *timer;
// initialize curr_time to value from ‘timer’
// wait in while for 1 sec (i.e. 1000 millisec)
while( (curr_time - *timer) < 1000 )
{
curr_time = *timer; // update current time
}
print_time(curr_time);
// this function prints the current time from the
// passed long variable
Usually, hardware has a timer that can be accessed from a memory location. Here, assume that it’s 0×0000ABCD and is accessed using a long * variable ‘timer’ (in the UNIX tradition, time can be represented as a long variable and increments are done in milliseconds). The loop is meant to wait one second (or 1,000 milliseconds) by repeatedly updating curr_time with the new value from the timer. After a one second delay, the program prints the new time. Looks fine, right?
However, from the compiler point of view, what the loop does is stupid—it repeatedly assigns curr_time with *timer, which is equivalent to doing it once outside the loop. Also, the variable ‘timer’ is de-referenced repeatedly in the loop—when it is enough to do it once. So, to make the code more efficient (i.e., to optimise it), it may modify loop code as follows:
curr_time = *timer; // update current time
long temp_time = *timer;
while( (curr_time - temp_timer) < 1000 )
{ /* do nothing here */
}
As you can see, the result of this transformation is disastrous: the loop will never terminate because neither is curr_time updated nor is the timer de-referenced repeatedly to get new (updated time) values.
What we need is a way to tell the compiler not to ‘play around’ with such variables by declaring them volatile, as in:
volatile long * timer = 0x0000ABCD;
volatile curr_time = *timer;
Now, the compiler will not do any optimisation on these variables. This, essentially, is the meaning of the ‘volatile’ keyword: It declares the variables as ‘asynchronous’ variables, i.e., variables that are ‘not-modified-sequentially’. Implicitly, all variables that are not declared volatile are ‘synchronous variables’.
How about qualifying a variable as both const and volatile? As we know, when we declare a variable as const, we mean it’s a ‘read-only’ variable—once we initialise it, we will not change it again, and will only read its value. Here is a modified version of the example:
long * const timer = 0x0000ABCD;
// rest of the code as it was before..
We will never change the address of a timer, so we can put it as a const variable. Now, remember what we did to declare the timer as volatile:
volatile long * timer = 0x0000ABCD;
We can now combine const and volatile together:
volatile long * const timer = 0x0000ABCD;
It reads as follows: the timer is a const pointer to a long volatile variable. In plain English, it means that the timer is a variable that I will not change; it points to a value that can be changed without the knowledge of the compiler!
Monday, November 29, 2010
Module
Definition
A module is a set of software that is not part of the core kernel yet can be integrated with it through the kernel kbuild procedure or at runtime. Modules always operate in "kernel space."
Properties
Some properties of all kernel modules:
*
Modules can either be "tightly bound" to the kernel, as part of the linux image, or they can be "loosely bound" and added later, however they all must conform to the standard Linux module interface.
*
Modules are limited to using kernel-provided functions and types. For example they cannot use GNU C-library calls, since the C-Libraries are user-space libraries.
*
Modules can be added and removed without restarting the operating system.
Device Drivers
Device Drivers are special modules that provide one or more files in the /dev filesystem.
Definition
A module is a set of software that is not part of the core kernel yet can be integrated with it through the kernel kbuild procedure or at runtime. Modules always operate in "kernel space."
Properties
Some properties of all kernel modules:
*
Modules can either be "tightly bound" to the kernel, as part of the linux image, or they can be "loosely bound" and added later, however they all must conform to the standard Linux module interface.
*
Modules are limited to using kernel-provided functions and types. For example they cannot use GNU C-library calls, since the C-Libraries are user-space libraries.
*
Modules can be added and removed without restarting the operating system.
Device Drivers
Device Drivers are special modules that provide one or more files in the /dev filesystem.
Sunday, October 24, 2010
Conditional Compilation
The third useful facility provided by the preprocessor is conditional compilation; i.e. the selection of lines of source code to be compiled and those to be ignored. While conditional compilation can be used for many purposes, we will illustrate its use with debug statements. In our previous programming examples, we have discussed the usefulness of printf() statements inserted in the code for the purpose of displaying debug information during program testing. Once the program is debugged and accepted as ``working'', it is desirable to remove these debug statements to use the program. Of course, if later an undetected bug appears during program use, we would like to put some or all debug statements back in the code to pinpoint and fix the bug. One approach to this is to simply ``comment out'' the debug statements; i.e. surround them with comment markers, so that if they are needed again, they can be ``uncommented''. This is a vast improvement over removing them and later having to type them back. However, this approach does require going through the entire source file(s) to find all of the debug statements and comment or uncomment them.
The C preprocessor provides a better alternative, namely conditional compilation. Lines of source code that may be sometimes desired in the program and other times not, are surrounded by #ifdef, #endif directive pairs as follows:
#ifdef DEBUG
printf("debug:x = %d, y = %f\n", x, y);
...
#endif
The #ifdef directive specifies that if DEBUG exists as a defined macro, i.e. is defined by means of a #define directive, then the statements between the #ifdef directive and the #endif directive are retained in the source file passed to the compiler. If DEBUG does not exist as a macro, then these statements are not passed on to the compiler.
Thus to ``turn on'' debugging statements, we simply include a definition:
#define DEBUG 1
in the source file; and to ``turn off'' debug we remove (or comment) the definition. In fact, the replacement string of the macro, DEBUG is not important; all that matters is the fact that its definition exists. Therefore,
#define DEBUG
is a sufficient definition for conditional compilation purposes. During the debug phase, we define DEBUG at the head of a source file, and compile the program. All statements appearing anywhere between #ifdef and matching #endif directives will be compiled as part of the program. When the program has been debugged, we take out the DEBUG definition, and recompile the program. The program will be compiled excluding the debug statements. The advantage is that debug statements do not have to be physically tracked down and removed. Also, if a program needs modification, the debug statements are in place and can simply be reactivated.
In general, conditional compilation directives begin with an if-part and end with an endif-part. Optionally, an else-part or an elseif-part may be present before the endif-part. The keywords for the different parts are:
* <711>> if-part: if, ifdef, ifndef
* else-part: else
* elseif-part: elif
* endif-part: endif
The syntax is:
* #
*
* [ #
* ]
* [ #
* ]
* #
If the if-part is True, then all the statements until the next, or are compiled; otherwise, if the is present, the statements between the and the are compiled.
We have already discussed the keyword ifdef. The keyword ifndef means ``if not defined''. If the identifier following it is NOT defined, then the statements until the next, or are compiled.
The keyword if must be followed by a constant expression, i.e. an expression made up of constants and operators. If the constant expression is True, the statements until the next else-part, elseif-part or endif-part are compiled. In fact, the keyword ifdef is just a special case of the if form. The directive:
#ifdef ident
is equivalent to:
#if defined ident
We can also use #if to test for the presence of a device, for example, so that if it is present, we can include an appropriate header file.
#if DEVICE == MOUSE
#include mouse.h
#endif
Here, both DEVICE and MOUSE are assumed to be constant identifiers.
The #elif provides a multiway branching in conditional compilation analogous to else ... if in C. Suppose, we wish to write a program that must work with any one of a variety of printers. We need to include in the program a header file to support the use of a specific printer. Let us assume that the specific printer used in an installation is defined by a macro DEVICE. We can then write conditional compilation directives to include the appropriate header file.
#if DEVICE == IBM
#include ibmdrv.h
#elif DEVICE == HP
#include hpdrv.h
#else
#include gendrv.h
#endif
Only constant expressions are allowed in conditional compilation directives. Therefore, in the above code, DEVICE, IBM, and HP must be be defined constants.
Exaples:
EX1:
#define VAL 10
int main () {
#if ( (VAL%2) == 0)
printf("Okay , I am executing the printf statement \n ");
#endif
}
The third useful facility provided by the preprocessor is conditional compilation; i.e. the selection of lines of source code to be compiled and those to be ignored. While conditional compilation can be used for many purposes, we will illustrate its use with debug statements. In our previous programming examples, we have discussed the usefulness of printf() statements inserted in the code for the purpose of displaying debug information during program testing. Once the program is debugged and accepted as ``working'', it is desirable to remove these debug statements to use the program. Of course, if later an undetected bug appears during program use, we would like to put some or all debug statements back in the code to pinpoint and fix the bug. One approach to this is to simply ``comment out'' the debug statements; i.e. surround them with comment markers, so that if they are needed again, they can be ``uncommented''. This is a vast improvement over removing them and later having to type them back. However, this approach does require going through the entire source file(s) to find all of the debug statements and comment or uncomment them.
The C preprocessor provides a better alternative, namely conditional compilation. Lines of source code that may be sometimes desired in the program and other times not, are surrounded by #ifdef, #endif directive pairs as follows:
#ifdef DEBUG
printf("debug:x = %d, y = %f\n", x, y);
...
#endif
The #ifdef directive specifies that if DEBUG exists as a defined macro, i.e. is defined by means of a #define directive, then the statements between the #ifdef directive and the #endif directive are retained in the source file passed to the compiler. If DEBUG does not exist as a macro, then these statements are not passed on to the compiler.
Thus to ``turn on'' debugging statements, we simply include a definition:
#define DEBUG 1
in the source file; and to ``turn off'' debug we remove (or comment) the definition. In fact, the replacement string of the macro, DEBUG is not important; all that matters is the fact that its definition exists. Therefore,
#define DEBUG
is a sufficient definition for conditional compilation purposes. During the debug phase, we define DEBUG at the head of a source file, and compile the program. All statements appearing anywhere between #ifdef and matching #endif directives will be compiled as part of the program. When the program has been debugged, we take out the DEBUG definition, and recompile the program. The program will be compiled excluding the debug statements. The advantage is that debug statements do not have to be physically tracked down and removed. Also, if a program needs modification, the debug statements are in place and can simply be reactivated.
In general, conditional compilation directives begin with an if-part and end with an endif-part. Optionally, an else-part or an elseif-part may be present before the endif-part. The keywords for the different parts are:
* <711>> if-part: if, ifdef, ifndef
* else-part: else
* elseif-part: elif
* endif-part: endif
The syntax is:
* #
*
* [ #
*
* [ #
*
* #
If the if-part is True, then all the statements until the next
We have already discussed the keyword ifdef. The keyword ifndef means ``if not defined''. If the identifier following it is NOT defined, then the statements until the next
The keyword if must be followed by a constant expression, i.e. an expression made up of constants and operators. If the constant expression is True, the statements until the next else-part, elseif-part or endif-part are compiled. In fact, the keyword ifdef is just a special case of the if form. The directive:
#ifdef ident
is equivalent to:
#if defined ident
We can also use #if to test for the presence of a device, for example, so that if it is present, we can include an appropriate header file.
#if DEVICE == MOUSE
#include mouse.h
#endif
Here, both DEVICE and MOUSE are assumed to be constant identifiers.
The #elif provides a multiway branching in conditional compilation analogous to else ... if in C. Suppose, we wish to write a program that must work with any one of a variety of printers. We need to include in the program a header file to support the use of a specific printer. Let us assume that the specific printer used in an installation is defined by a macro DEVICE. We can then write conditional compilation directives to include the appropriate header file.
#if DEVICE == IBM
#include ibmdrv.h
#elif DEVICE == HP
#include hpdrv.h
#else
#include gendrv.h
#endif
Only constant expressions are allowed in conditional compilation directives. Therefore, in the above code, DEVICE, IBM, and HP must be be defined constants.
Exaples:
EX1:
#define VAL 10
int main () {
#if ( (VAL%2) == 0)
printf("Okay , I am executing the printf statement \n ");
#endif
}
Thursday, October 21, 2010
Interrupts
Bottom halves and deferring work
Basic idea
ISR must run fast, with current IRQ disabled (and maybe others too),
cannot sleep.
But servicing an interrupt might require lots of work and may require
sleeping.
Defer work to after the ISR finishes.
Only put in ISR:
time-sensitive work
work related directly to the hardware device that interrupted
work that must exclude another interrupt from same device
Can delay either until all ISRs are done (bottom half) or a given time
(kernel timer, Chapter 9)
History
Initially, Linux only had what we will call "BH": static list of 32
bottom halves, globally exclusive. BHs were removed in 2.5.
Later, "task queues" were added, but not lightweight enough for
networking. Task queues were removed in 2.5 (some became tasklets,
others work queues)
2.3 series introduced "softirqs" and "tasklets", which still remain.
2.5 series introduced "work queues", which still remain.
Softirqs
Purpose: high-frequency, highly-threaded work
Representation:
Statically allocated at compile time.
32-entry array: struct softirq_action softirq_vec[32]
in kernel/softirq.c:44
Only 6 are used: HI_SOFTIRQ and friends.
raising softirq:
raise_softirq(number) or raise_softirq_irqoff(number)
sets bit in softirq_pending(cpu)
pending softirqs are checked
during return from interrupt
by ksoftirqd kernel thread
explicitly in a few places, such as networking code
pending softirqs are invoked by do_softirq() -> __do_softirq() [trace]
-> theSoftirq->action(theSoftirq)
Handler execution:
Handler can be preempted by ISR (interrupts are enabled)
Other cpus can run softirqs at the same time, even the same one.
So shared data must be locked.
Typically, softirqs use per-process data to avoid needing locking.
Handler must not sleep (no semaphores, for instance)
to add a new softirq
Place it after HI_SOFTIRQ and before TASKLET_SOFTIRQ.
register by open_softirq(number, action, data)
Tasklets
Purpose: Most cases of deferred work. Preferable to softirqs.
Two categories, represented by TASKLET_SOFTIRQ and HI_SOFTIRQ
Representation: tasklet_struct
field state is one of TASKLET_STATE_SCHED, TASKLET_STATE_RUN.
(latter only used on SMP)
field count: disables tasklet if > 0.
raising (here called "scheduling") tasklet tl:
tasklet_schedule(tl) or tasklet_hi_schedule(tl)
appends to list tasklet_vec or tasklet_hi_vec
kernel/softirq.c:202
then raises appropriate softirq.
if tasklet scheduled again before it runs, it runs only once.
if tasklet already running, it is scheduled to run again.
tasklet only runs on cpu that scheduled it.
associated handler for TASKLET_SOFTIRQ or HI_SOFTIRQ is
tasklet_action() and tasklet_hi_action()
established in softirq_init()
trace tasklet_action()
[what happens to disabled or currently running ones? dropped?]
tasklet execution
only one of any given tasklet runs at a time, but others can run
simultaneously (on other cpus)
must not sleep (no semaphores, for instance)
Handler can be preempted by ISR (interrupts are enabled)
shared data (with other tasklets or interrupts) must be locked
to add a new tasklet
static
DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data)
dynamic
tasklet_init(tl, func, data)
The func should be void func(unsigned long data)
to disable and enable a tasklet
tasklet_disable(tl) tasklet_disable_nosync(tl)
ordinary disable doesn't return until tl finishes, if running.
tasklet_enable(tl) -- re-enables (or enables for the first time)
tasklet_kill(tl) -- unschedules from its pending queue.
may sleep, so don't call tasklet_kill() from interrupt context.
ksoftirqd/n
Purpose: when softirqs are so frequent they interfere with interactive
tasks
Alternative: let kernel not handle reactivated softirqs until next
interrupt.
Each cpu c has a kernel task called ksoftirqd/c running ksoftirqd().
[trace]
Awakened by do_softirq() when a softirq reactivates itself.
Work queues
Deferred work that runs in process context, in a kernel task.
Such work is scheduled, initially holds no locks, and may sleep.
eg to allocate memory, obtain a semaphore, perform blocking I/O.
But must not access user space (which is only borrowed)
Alternative: special-purpose kernel task (deprecated).
Representation (see book 100)
Each type of work queue has its own workqueue_struct
which has one queue per cpu, called cpu_workqueue_struct
the queue is linked by field worklist.
Individual units of work (w) are of struct work_struct,
linked via field entry
running one instance of worker_thread() per cpu.
worker_thread() [trace] is a loop -> run_workqueue() [trace]
The generic kernel task is events/c
"events" is one type of work queue, created in init_workqueues()
Usually use "events". But XFS creates 2 new types of work queue.
To create work w
static
DECLARE_WORK(w, func, data)
dynamic
INIT_WORK(w, func, data)
The func should be void func(void *data)
To create a new type of work queue q
q = create_workqueue(name)
To schedule or cancel work w; where q not given, it is "events".
schedule_work(w) queue_work(q, w)
schedule_delayed_work(w, delay) queue_delayed_work(q, w, delay)
will not run until at least delay ticks have elapsed
flush_scheduled_work() flush_workqueue(q)
sleeps until all work on q completed
cancel_delayed_work(w) (from whatever q it is on)
Which bottom half to use?
If it must sleep, use task queues.
If it must scale well for multiple cpus and run at high frequency,
use softirqs and per-cpu data structures.
Otherwise, use tasklets.
Disabling bottom halves
Needed if KSPC code and a bottom half share data.
Only need to disasble softirqs and tasklets.
Generally also need to grab a lock.
local_bh_disable() local_bh_enable()
Both work only on local processor.
They may be nested.
Basic idea
ISR must run fast, with current IRQ disabled (and maybe others too),
cannot sleep.
But servicing an interrupt might require lots of work and may require
sleeping.
Defer work to after the ISR finishes.
Only put in ISR:
time-sensitive work
work related directly to the hardware device that interrupted
work that must exclude another interrupt from same device
Can delay either until all ISRs are done (bottom half) or a given time
(kernel timer, Chapter 9)
History
Initially, Linux only had what we will call "BH": static list of 32
bottom halves, globally exclusive. BHs were removed in 2.5.
Later, "task queues" were added, but not lightweight enough for
networking. Task queues were removed in 2.5 (some became tasklets,
others work queues)
2.3 series introduced "softirqs" and "tasklets", which still remain.
2.5 series introduced "work queues", which still remain.
Softirqs
Purpose: high-frequency, highly-threaded work
Representation:
Statically allocated at compile time.
32-entry array: struct softirq_action softirq_vec[32]
in kernel/softirq.c:44
Only 6 are used: HI_SOFTIRQ and friends.
raising softirq:
raise_softirq(number) or raise_softirq_irqoff(number)
sets bit in softirq_pending(cpu)
pending softirqs are checked
during return from interrupt
by ksoftirqd kernel thread
explicitly in a few places, such as networking code
pending softirqs are invoked by do_softirq() -> __do_softirq() [trace]
-> theSoftirq->action(theSoftirq)
Handler execution:
Handler can be preempted by ISR (interrupts are enabled)
Other cpus can run softirqs at the same time, even the same one.
So shared data must be locked.
Typically, softirqs use per-process data to avoid needing locking.
Handler must not sleep (no semaphores, for instance)
to add a new softirq
Place it after HI_SOFTIRQ and before TASKLET_SOFTIRQ.
register by open_softirq(number, action, data)
Tasklets
Purpose: Most cases of deferred work. Preferable to softirqs.
Two categories, represented by TASKLET_SOFTIRQ and HI_SOFTIRQ
Representation: tasklet_struct
field state is one of TASKLET_STATE_SCHED, TASKLET_STATE_RUN.
(latter only used on SMP)
field count: disables tasklet if > 0.
raising (here called "scheduling") tasklet tl:
tasklet_schedule(tl) or tasklet_hi_schedule(tl)
appends to list tasklet_vec or tasklet_hi_vec
kernel/softirq.c:202
then raises appropriate softirq.
if tasklet scheduled again before it runs, it runs only once.
if tasklet already running, it is scheduled to run again.
tasklet only runs on cpu that scheduled it.
associated handler for TASKLET_SOFTIRQ or HI_SOFTIRQ is
tasklet_action() and tasklet_hi_action()
established in softirq_init()
trace tasklet_action()
[what happens to disabled or currently running ones? dropped?]
tasklet execution
only one of any given tasklet runs at a time, but others can run
simultaneously (on other cpus)
must not sleep (no semaphores, for instance)
Handler can be preempted by ISR (interrupts are enabled)
shared data (with other tasklets or interrupts) must be locked
to add a new tasklet
static
DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data)
dynamic
tasklet_init(tl, func, data)
The func should be void func(unsigned long data)
to disable and enable a tasklet
tasklet_disable(tl) tasklet_disable_nosync(tl)
ordinary disable doesn't return until tl finishes, if running.
tasklet_enable(tl) -- re-enables (or enables for the first time)
tasklet_kill(tl) -- unschedules from its pending queue.
may sleep, so don't call tasklet_kill() from interrupt context.
ksoftirqd/n
Purpose: when softirqs are so frequent they interfere with interactive
tasks
Alternative: let kernel not handle reactivated softirqs until next
interrupt.
Each cpu c has a kernel task called ksoftirqd/c running ksoftirqd().
[trace]
Awakened by do_softirq() when a softirq reactivates itself.
Work queues
Deferred work that runs in process context, in a kernel task.
Such work is scheduled, initially holds no locks, and may sleep.
eg to allocate memory, obtain a semaphore, perform blocking I/O.
But must not access user space (which is only borrowed)
Alternative: special-purpose kernel task (deprecated).
Representation (see book 100)
Each type of work queue has its own workqueue_struct
which has one queue per cpu, called cpu_workqueue_struct
the queue is linked by field worklist.
Individual units of work (w) are of struct work_struct,
linked via field entry
running one instance of worker_thread() per cpu.
worker_thread() [trace] is a loop -> run_workqueue() [trace]
The generic kernel task is events/c
"events" is one type of work queue, created in init_workqueues()
Usually use "events". But XFS creates 2 new types of work queue.
To create work w
static
DECLARE_WORK(w, func, data)
dynamic
INIT_WORK(w, func, data)
The func should be void func(void *data)
To create a new type of work queue q
q = create_workqueue(name)
To schedule or cancel work w; where q not given, it is "events".
schedule_work(w) queue_work(q, w)
schedule_delayed_work(w, delay) queue_delayed_work(q, w, delay)
will not run until at least delay ticks have elapsed
flush_scheduled_work() flush_workqueue(q)
sleeps until all work on q completed
cancel_delayed_work(w) (from whatever q it is on)
Which bottom half to use?
If it must sleep, use task queues.
If it must scale well for multiple cpus and run at high frequency,
use softirqs and per-cpu data structures.
Otherwise, use tasklets.
Disabling bottom halves
Needed if KSPC code and a bottom half share data.
Only need to disasble softirqs and tasklets.
Generally also need to grab a lock.
local_bh_disable() local_bh_enable()
Both work only on local processor.
They may be nested.
Friday, October 15, 2010
DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
Code :
#include
#include
#define minstacksize 5
typedef struct stackrecord
{
int *a;
int top1,top2,capacity;
}*stack;
void makeempty(stack s, int n)
{
if(n==1)
s->top1=-1;
else
s->top2=s->capacity;
}
stack createstack(int max)
{
stack s;
if(max
{
printf("\n Stack size is too small");
return NULL;
}
else
{
s=(stack)malloc(sizeof(stack));
if(s==NULL)
{
printf("\n Out of space");
return NULL;
}
s->a=(int*)malloc(max*sizeof(int));
if(s->a==NULL)
{
printf("\n Out of space");
return NULL;
}
s->capacity=max;
makeempty(s,1);
makeempty(s,2);
return s;
}
}
void display(stack s,int n)
{
int i;
if(n==1)
{
printf("\n Stack 1 :\n");
for(i=s->top1;i>=0;i--)
printf("%d\t",s->a[i]);
}
else
{
printf("\n Stack 2 :\n");
for(i=s->top2;icapacity-1;i++)
printf("%d\t",s->a[i]);
}
}
void dispose(stack s)
{
if(s!=NULL)
{
free(s->a);
free(s);
}
}
int isfull(stack s)
{
return (s->top1==s->top2-1);
}
int isempty(stack s,int n)
{
return (n==1?s->top1==-1:s->top2==s->capacity);
}
void push(stack s,int n,int x)
{
if(isfull(s))
printf("\n Overflow");
else
{
if(n==1)
s->a[++s->top1]=x;
else
s->a[--s->top2]=x;
}
}
void pop(stack s,int n)
{
if(isempty(s,n))
printf("\n Underflow");
else
{
if(n==1)
s->top1--;
else
s->top2++;
}
}
int top(stack s,int n)
{
if(isempty(s,n))
{
printf("\n Underflow");
return -1;
}
else
{
if(n==1)
return (s->a[s->top1]);
else
return (s->a[s->top2]);
}
}
int topandpop(stack s,int n)
{
if(isempty(s,n))
{
printf("\n Underflow");
return -1;
}
else
{
if(n==1)
return(s->a[s->top1--]);
else
return (s->a[s->top2++]);
}
}
void main()
{
stack s;
s=createstack(10);
int ch,a=1,j,n;
while(a)
{
printf("\n Options :\n1.Push\n2.Topandpop\n3.Pop\n4.Top\n5.Exit\n Enter choice : ");
scanf("%d",&ch);
switch(ch)
{
case 1:printf("\n Enter the stack :");
scanf("%d",&n);
printf("\n Enter value to be inserted : ");
scanf("%d",&j);
push(s,n,j);
display(s,n);
break;
case 2:printf("\n Enter the stack : ");
scanf("%d",&n);
printf("\n Top of stack %d is %d",n,topandpop(s,n));
display(s,n);
break;
case 3:printf("\n Enter the stack : ");
scanf("%d",&n);
pop(s,n);
display(s,n);
break;
case 4:printf("\n Enter the stack : ");
scanf("%d",&n);
printf("\n Top of stack %d is %d",n,top(s,n));
display(s,n);
break;
case 5:printf("\n End");
exit(0);
default:printf("\n Invalid choice");
}
}
dispose(s);
}
Goal of the code :
The aim of this code is to implement two stacks with a single array using structures.
Variables used :
a - The array which contains elements of the double-stack
capacity- The maximum size of the array implementing the 2 stacks
top1 - This points to the top-most element of stack 1
top2 - This points to the top-most element of stack 2
i - Counter variable
s - This points to the stack
n - The variable which holds the stack number (i.e. 1 or 2 indicating stack 1 or stack 2 )
Functions used :
Functions
Use
isfull()
To check whether the array containing the stacks is full
isempty()
To check whether the stack is empty
push
To insert an element into stack
pop()
To delete the top-most element of the stack
top()
To return the top-most element of the stack
topandpop()
To return and delete the top-most element of the stack
makeempty()
To empty the stack
display()
To display contents of stack
main()
To implement the double stack
Implementation :
Initial conditions :
capacity = 10
The array is allocated a default size of 10.
top1 = -1
top2 = 10 (capacity)
In main() :
*
List of options is displayed as menu and the user's choice is stored in the variable 'ch'.
*
Using switch-case the control is directed to the specified function.
*
For all the function, the stack to be worked upon is got as input from user and stored in 'n' variable.
*
In case of push, the value to be inserted is got from the user.
*
In case of pop, the top-most element is printed.
*
In each case, the stack which is being worked upon is displayed.
Test cases :
The output of the code is as below.
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 1
Enter the stack : 1
Enter the value to be inserted : 1
Stack 1 :
1
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 1
Enter the stack : 2
Enter the value to be inserted : 8
Stack 2 :
8
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 4
Enter the stack : 1
Top of stack 1 is 1
Stack 1 :
1
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 2
Enter the stack : 2
Top of stack 2 is 8
Stack 2 :
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 3
Enter the stack : 1
Stack 1 :
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 5
End
Compatibility :
Compatible with all computers which is capable of running gcc-4.4.0. The program has been tested with the above version of gcc as of June 6.
***********
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
Code :
#include
#include
#define minstacksize 5
typedef struct stackrecord
{
int *a;
int top1,top2,capacity;
}*stack;
void makeempty(stack s, int n)
{
if(n==1)
s->top1=-1;
else
s->top2=s->capacity;
}
stack createstack(int max)
{
stack s;
if(max
{
printf("\n Stack size is too small");
return NULL;
}
else
{
s=(stack)malloc(sizeof(stack));
if(s==NULL)
{
printf("\n Out of space");
return NULL;
}
s->a=(int*)malloc(max*sizeof(int));
if(s->a==NULL)
{
printf("\n Out of space");
return NULL;
}
s->capacity=max;
makeempty(s,1);
makeempty(s,2);
return s;
}
}
void display(stack s,int n)
{
int i;
if(n==1)
{
printf("\n Stack 1 :\n");
for(i=s->top1;i>=0;i--)
printf("%d\t",s->a[i]);
}
else
{
printf("\n Stack 2 :\n");
for(i=s->top2;icapacity-1;i++)
printf("%d\t",s->a[i]);
}
}
void dispose(stack s)
{
if(s!=NULL)
{
free(s->a);
free(s);
}
}
int isfull(stack s)
{
return (s->top1==s->top2-1);
}
int isempty(stack s,int n)
{
return (n==1?s->top1==-1:s->top2==s->capacity);
}
void push(stack s,int n,int x)
{
if(isfull(s))
printf("\n Overflow");
else
{
if(n==1)
s->a[++s->top1]=x;
else
s->a[--s->top2]=x;
}
}
void pop(stack s,int n)
{
if(isempty(s,n))
printf("\n Underflow");
else
{
if(n==1)
s->top1--;
else
s->top2++;
}
}
int top(stack s,int n)
{
if(isempty(s,n))
{
printf("\n Underflow");
return -1;
}
else
{
if(n==1)
return (s->a[s->top1]);
else
return (s->a[s->top2]);
}
}
int topandpop(stack s,int n)
{
if(isempty(s,n))
{
printf("\n Underflow");
return -1;
}
else
{
if(n==1)
return(s->a[s->top1--]);
else
return (s->a[s->top2++]);
}
}
void main()
{
stack s;
s=createstack(10);
int ch,a=1,j,n;
while(a)
{
printf("\n Options :\n1.Push\n2.Topandpop\n3.Pop\n4.Top\n5.Exit\n Enter choice : ");
scanf("%d",&ch);
switch(ch)
{
case 1:printf("\n Enter the stack :");
scanf("%d",&n);
printf("\n Enter value to be inserted : ");
scanf("%d",&j);
push(s,n,j);
display(s,n);
break;
case 2:printf("\n Enter the stack : ");
scanf("%d",&n);
printf("\n Top of stack %d is %d",n,topandpop(s,n));
display(s,n);
break;
case 3:printf("\n Enter the stack : ");
scanf("%d",&n);
pop(s,n);
display(s,n);
break;
case 4:printf("\n Enter the stack : ");
scanf("%d",&n);
printf("\n Top of stack %d is %d",n,top(s,n));
display(s,n);
break;
case 5:printf("\n End");
exit(0);
default:printf("\n Invalid choice");
}
}
dispose(s);
}
Goal of the code :
The aim of this code is to implement two stacks with a single array using structures.
Variables used :
a - The array which contains elements of the double-stack
capacity- The maximum size of the array implementing the 2 stacks
top1 - This points to the top-most element of stack 1
top2 - This points to the top-most element of stack 2
i - Counter variable
s - This points to the stack
n - The variable which holds the stack number (i.e. 1 or 2 indicating stack 1 or stack 2 )
Functions used :
Functions
Use
isfull()
To check whether the array containing the stacks is full
isempty()
To check whether the stack is empty
push
To insert an element into stack
pop()
To delete the top-most element of the stack
top()
To return the top-most element of the stack
topandpop()
To return and delete the top-most element of the stack
makeempty()
To empty the stack
display()
To display contents of stack
main()
To implement the double stack
Implementation :
Initial conditions :
capacity = 10
The array is allocated a default size of 10.
top1 = -1
top2 = 10 (capacity)
In main() :
*
List of options is displayed as menu and the user's choice is stored in the variable 'ch'.
*
Using switch-case the control is directed to the specified function.
*
For all the function, the stack to be worked upon is got as input from user and stored in 'n' variable.
*
In case of push, the value to be inserted is got from the user.
*
In case of pop, the top-most element is printed.
*
In each case, the stack which is being worked upon is displayed.
Test cases :
The output of the code is as below.
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 1
Enter the stack : 1
Enter the value to be inserted : 1
Stack 1 :
1
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 1
Enter the stack : 2
Enter the value to be inserted : 8
Stack 2 :
8
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 4
Enter the stack : 1
Top of stack 1 is 1
Stack 1 :
1
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 2
Enter the stack : 2
Top of stack 2 is 8
Stack 2 :
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 3
Enter the stack : 1
Stack 1 :
OPTIONS :
1.Push
2.Topandpop
3.Pop
4.Top
5.Exit
Enter choice : 5
End
Compatibility :
Compatible with all computers which is capable of running gcc-4.4.0. The program has been tested with the above version of gcc as of June 6.
***********
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Code :
#include
#include
int s1[30],s2[30],capacity=5;
int top1=-1,top2=5;
int isfull()
{
return (top1==top2-1);
}
int isempty()
{
return (top1==-1 && top2==capacity);
}
void pushs1(int x)
{
if(!isfull())
s1[++top1]=x;
else
printf("\n Overflow");
}
void displays1()
{
int i;
printf("\n Stack 1 is\n");
for(i=top1;i>-1;i--)
printf("%d\t",s1[i]);
}
void displays2()
{
int i;
printf("\n Stack 2 is\n");
for(i=capacity-1;i>=top2;i--)
printf("%d\t",s2[i]);
}
void pushs2(int x)
{
if(!isfull())
s2[--top2]=x;
else
printf("\n Overflow");
}
int pops1()
{
if(top1==-1)
printf("\n Error");
else
{
int t;
t=s1[top1];
top1--;
return t;
}
return 0;
}
int pops2()
{
if(top2==capacity)
printf("\n Error");
else
{
int t;
t=s2[top2];
top2++;
return t;
}
return 0;
}
void makeempty()
{
top1=-1;
top2=5;
}
void main()
{
int ch,a=1,j;
while(a)
{
printf("\nOPTIONS :\n1.Push into stack 1 \n2.Push into stack 2 \n3.Topandpop stack 1 \n4.Topandpop stack 2 \n5.Makeempty \n6.Exit \n Enter choice : ");
scanf("%d",&ch);
switch(ch)
{
case 1:printf("\n Enter value to push into stack 1 : ");
scanf("%d",&j);
pushs1(j);
displays1();
break;
case 2:printf("\n Enter value to push into stack 2 : ");
scanf("%d",&j);
pushs2(j);
displays2();
break;
case 3:j=pops1();
printf("\n Top of stack 1 : %d is popped",j);
displays1();
break;
case 4:j=pops2();
printf("\n Top of stack 2 : %d is popped",j);
displays2();
break;
case 5:makeempty();
printf(“\n Both stacks are emptied”);
break;
case 6:a=0;
exit(0);
default:printf("\nInvalid choice");
}
}
}
Goal of the code :
The aim of this code is to implement two stacks using a single array.
Variables used :
s1 - The array which contains elements of stack 1
s2 - The array which contains elements of stack 2
capacity- The maximum size of the array implementing the 2 stacks
top1 -This points to the top-most element of stack 1
top2 -This points to the top-most element of stack 2
I - counter variable
t -temporary variable to hold value momentarily
Functions used :
Functions
Use
isfull()
To check whether the array containing the stacks is full
isempty()
To check whether the array containing the stacks is empty
pushs1()
To insert element into stack 1
pushs2()
To insert element into stack 2
pops1()
To return and delete the top-most element in stack 1
pops2()
To return and delete the top-most element in stack 2
makeempty()
To empty both the stacks
displays1()
To display contents of stack 1
displays2()
To display contents of stack 2
main()
To implement the double stack
Implementation :
Initial condition :
capacity = 5
The array is allocated a default size of 5.
top1=-1
top2=5(capacity)
In main() :DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATIONDOUBLE STACK IMPLEMENTATION
DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestackare pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
*
List of options is displayed as menu and the user's choice is stored in the variable 'ch'.
*
Using switch-case the control is directed to the specified function.
*
In case of push, the value to be inserted is got from the user.
*
In case of pop, the top-most element is printed.
*
In each case, the stack which is being worked upon is displayed.
Test case :
The output of the code is :
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 1
Enter value to push into stack 1 : 1
Stack 1 is
1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 1
Enter value to push into stack 1 : 2
Stack 1 is
2 1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 2
Enter value to push into stack 1 : 9
Stack 2 is
9
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 2
Enter value to push into stack 1 : 8
Stack 1 is
9 8
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 3
Top of stack 1: 2 is popped
Stack 1 is
1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 4
Top of stack 2: 8 is popped
Stack 2 is
9
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 5
Both stacks are emptied.
Compatibility :
Compatible with all computers which is capable of running gcc-4.4.0. The program has been tested with the above version of gcc as of June 3.
**********
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Code :
#include
#include
int s1[30],s2[30],capacity=5;
int top1=-1,top2=5;
int isfull()
{
return (top1==top2-1);
}
int isempty()
{
return (top1==-1 && top2==capacity);
}
void pushs1(int x)
{
if(!isfull())
s1[++top1]=x;
else
printf("\n Overflow");
}
void displays1()
{
int i;
printf("\n Stack 1 is\n");
for(i=top1;i>-1;i--)
printf("%d\t",s1[i]);
}
void displays2()
{
int i;
printf("\n Stack 2 is\n");
for(i=capacity-1;i>=top2;i--)
printf("%d\t",s2[i]);
}
void pushs2(int x)
{
if(!isfull())
s2[--top2]=x;
else
printf("\n Overflow");
}
int pops1()
{
if(top1==-1)
printf("\n Error");
else
{
int t;
t=s1[top1];
top1--;
return t;
}
return 0;
}
int pops2()
{
if(top2==capacity)
printf("\n Error");
else
{
int t;
t=s2[top2];
top2++;
return t;
}
return 0;
}
void makeempty()
{
top1=-1;
top2=5;
}
void main()
{
int ch,a=1,j;
while(a)
{
printf("\nOPTIONS :\n1.Push into stack 1 \n2.Push into stack 2 \n3.Topandpop stack 1 \n4.Topandpop stack 2 \n5.Makeempty \n6.Exit \n Enter choice : ");
scanf("%d",&ch);
switch(ch)
{
case 1:printf("\n Enter value to push into stack 1 : ");
scanf("%d",&j);
pushs1(j);
displays1();
break;
case 2:printf("\n Enter value to push into stack 2 : ");
scanf("%d",&j);
pushs2(j);
displays2();
break;
case 3:j=pops1();
printf("\n Top of stack 1 : %d is popped",j);
displays1();
break;
case 4:j=pops2();
printf("\n Top of stack 2 : %d is popped",j);
displays2();
break;
case 5:makeempty();
printf(“\n Both stacks are emptied”);
break;
case 6:a=0;
exit(0);
default:printf("\nInvalid choice");
}
}
}
Goal of the code :
The aim of this code is to implement two stacks using a single array.
Variables used :
s1 - The array which contains elements of stack 1
s2 - The array which contains elements of stack 2
capacity- The maximum size of the array implementing the 2 stacks
top1 -This points to the top-most element of stack 1
top2 -This points to the top-most element of stack 2
I - counter variable
t -temporary variable to hold value momentarily
Functions used :
Functions
Use
isfull()
To check whether the array containing the stacks is full
isempty()
To check whether the array containing the stacks is empty
pushs1()
To insert element into stack 1
pushs2()
To insert element into stack 2
pops1()
To return and delete the top-most element in stack 1
pops2()
To return and delete the top-most element in stack 2
makeempty()
To empty both the stacks
displays1()
To display contents of stack 1
displays2()
To display contents of stack 2
main()
To implement the double stack
Implementation :
Initial condition :
capacity = 5
The array is allocated a default size of 5.
top1=-1
top2=5(capacity)
In main() :DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATIONDOUBLE STACK IMPLEMENTATION
DOUBLE STACK IMPLEMENTATION
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
(with structures)
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction.
Structure has the following advantages.
*
Clarity
As programs get bigger, they get more confusing if they're not structured. Restricting programming logic to structures that have only one point of exit and entry makes it much easier to track the 'flow' of logic.
*
Efficiency
*
Maintenance
Programs that follow a consistent set of clearly defined structures (having single points of entry and exit) are much easier to read and debug. Especially when maintenance issues arise long after the program is written.
In this program, the pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestackare pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
doublestack1
DOUBLE STACK - ARRAY IMPLEMENTATION
Introduction :
Double stack means two stacks which are implemented using a single array. To prevent memory wastage, the two stacks are grown in opposite direction. The pointer tops1 and tops2 points to top-most element of stack 1 and stack 2 respectively. Initially, tops1 is initialized as -1 and tops2 is initialized the capacity. As the elements are pushed into stack 1, tops1 is incremented. Similarly, as the elements are pushed into stack 2, tops2 is decremented. So, the array is full when tops1=tops2-1. Beyond this, pushing an element into any stack will lead to overflow condition. The pictorial representation of double stack is shown below.
doublestack
*
List of options is displayed as menu and the user's choice is stored in the variable 'ch'.
*
Using switch-case the control is directed to the specified function.
*
In case of push, the value to be inserted is got from the user.
*
In case of pop, the top-most element is printed.
*
In each case, the stack which is being worked upon is displayed.
Test case :
The output of the code is :
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 1
Enter value to push into stack 1 : 1
Stack 1 is
1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 1
Enter value to push into stack 1 : 2
Stack 1 is
2 1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 2
Enter value to push into stack 1 : 9
Stack 2 is
9
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 2
Enter value to push into stack 1 : 8
Stack 1 is
9 8
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 3
Top of stack 1: 2 is popped
Stack 1 is
1
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 4
Top of stack 2: 8 is popped
Stack 2 is
9
OPTIONS :
1.Push into stack 1
2.Push into stack 2
3.Topandpop stack1
4.Topandpop stack 2
5.Make empty
6.Exit
Enter choice : 5
Both stacks are emptied.
Compatibility :
Compatible with all computers which is capable of running gcc-4.4.0. The program has been tested with the above version of gcc as of June 3.
**********
Implementation of Stack using Singly Linked Lists
Stacks are linear data structures which means the data is stored in what looks like a line (although vertically). In simple words we can say
A stack is a last in, first out (LIFO) abstract data type and data structure.
Basic usage of stack at the Architecture level is as a means of allocating and accessing memory.
Stack
file:///root/Desktop/Stack_4.jpg
We can only perform two fundamental operations on a stack: push and pop.
The push operation adds to the top of the list, hiding any items already on the stack, or initializing the stack if it is empty. The pop operation removes an item from the top of the list, and returns this value to the caller. A pop either reveals previously concealed items, or results in an empty list.
A stack is a restricted data structure, because only a small number of operations are performed on it.
Let us 'C' code for implementing a stack using Singly Linked Lists.
view plainprint?
1. #include
2. #include
3.
4. //Structure containing a Data part & a
5. //Link part to the next node in the List
6. struct Node
7. {
8. int Data;
9. struct Node *Next;
10. }*Head;
11.
12. //Poping from a Stack
13. void popStack()
14. {
15. struct Node *tmp_ptr=NULL, *cur_ptr=Head;
16.
17. if(cur_ptr)
18. {
19. Head = Head->Next;
20. free(cur_ptr);
21. }
22. else
23. printf("\nStack is Empty");
24. }
25.
26. //Pushing into Stack
27. void pushIntoStack(int num)
28. {
29. struct Node *tmp_ptr=NULL;
30.
31. if((tmp_ptr=(struct Node *)malloc(sizeof(struct Node))) != NULL)
32. tmp_ptr->Data=num;
33. else
34. printf("\nMemory Allocation Failed");
35.
36. if(Head)
37. {
38. tmp_ptr->Next=Head;
39. Head=tmp_ptr;
40. }
41. else { //List is Empty
42. Head=tmp_ptr;
43. Head->Next=NULL;
44. }
45. }
46.
47. //Displaying Stack
48. void displayStack()
49. {
50. struct Node *cur_ptr=Head;
51.
52. if(cur_ptr)
53. {
54. printf("\nElements in Stack:\n");
55. while(cur_ptr)
56. {
57. printf("\t%d\n",cur_ptr->Data);
58. cur_ptr=cur_ptr->Next;
59. }
60. printf("\n");
61. }
62. else
63. printf("\nStack is Empty");
64. }
65.
66. int main(int argc, char *argv[])
67. {
68. int i=0;
69.
70. //Set HEAD as NULL
71. Head=NULL;
72.
73. while(1)
74. {
75. printf("\n####################################################\n");
76. printf("MAIN MENU\n");
77. printf("####################################################\n");
78. printf(" \n1. Push into stack");
79. printf(" \n2. Pop from Stack");
80. printf(" \n3. Display Stack");
81. printf(" \n4. Exit\n");
82. printf(" \nChoose Option: ");
83. scanf("%d",&i);
84.
85. switch(i)
86. {
87. case 1:
88. {
89. int num;
90. printf("\nEnter a Number to push into Stack: ");
91. scanf("%d",&num);
92. pushIntoStack(num);
93. displayStack();
94. break;
95. }
96.
97. case 2:
98. {
99. popStack();
100. displayStack();
101. break;
102. }
103.
104. case 3:
105. {
106. displayStack();
107. break;
108. }
109.
110. case 4:
111. {
112. struct Node *temp;
113.
114. while(Head!=NULL)
115. {
116. temp = Head->Next;
117. free(Head);
118. Head=temp;
119. }
120. exit(0);
121. }
122.
123. default:
124. {
125. printf("\nWrong Option choosen");
126. }
127. }/* end if switch */
128. }/* end of while */
129. }/* end of main */
#include
#include
//Structure containing a Data part & a
//Link part to the next node in the List
struct Node
{
int Data;
struct Node *Next;
}*Head;
//Poping from a Stack
void popStack()
{
struct Node *tmp_ptr=NULL, *cur_ptr=Head;
if(cur_ptr)
{
Head = Head->Next;
free(cur_ptr);
}
else
printf("\nStack is Empty");
}
//Pushing into Stack
void pushIntoStack(int num)
{
struct Node *tmp_ptr=NULL;
if((tmp_ptr=(struct Node *)malloc(sizeof(struct Node))) != NULL)
tmp_ptr->Data=num;
else
printf("\nMemory Allocation Failed");
if(Head)
{
tmp_ptr->Next=Head;
Head=tmp_ptr;
}
else { //List is Empty
Head=tmp_ptr;
Head->Next=NULL;
}
}
//Displaying Stack
void displayStack()
{
struct Node *cur_ptr=Head;
if(cur_ptr)
{
printf("\nElements in Stack:\n");
while(cur_ptr)
{
printf("\t%d\n",cur_ptr->Data);
cur_ptr=cur_ptr->Next;
}
printf("\n");
}
else
printf("\nStack is Empty");
}
int main(int argc, char *argv[])
{
int i=0;
//Set HEAD as NULL
Head=NULL;
while(1)
{
printf("\n####################################################\n");
printf("MAIN MENU\n");
printf("####################################################\n");
printf(" \n1. Push into stack");
printf(" \n2. Pop from Stack");
printf(" \n3. Display Stack");
printf(" \n4. Exit\n");
printf(" \nChoose Option: ");
scanf("%d",&i);
switch(i)
{
case 1:
{
int num;
printf("\nEnter a Number to push into Stack: ");
scanf("%d",&num);
pushIntoStack(num);
displayStack();
break;
}
case 2:
{
popStack();
displayStack();
break;
}
case 3:
{
displayStack();
break;
}
case 4:
{
struct Node *temp;
while(Head!=NULL)
{
temp = Head->Next;
free(Head);
Head=temp;
}
exit(0);
}
default:
{
printf("\nWrong Option choosen");
}
}/* end if switch */
}/* end of while */
}/* end of main */
References:
Stacks are linear data structures which means the data is stored in what looks like a line (although vertically). In simple words we can say
A stack is a last in, first out (LIFO) abstract data type and data structure.
Basic usage of stack at the Architecture level is as a means of allocating and accessing memory.
Stack
file:///root/Desktop/Stack_4.jpg
We can only perform two fundamental operations on a stack: push and pop.
The push operation adds to the top of the list, hiding any items already on the stack, or initializing the stack if it is empty. The pop operation removes an item from the top of the list, and returns this value to the caller. A pop either reveals previously concealed items, or results in an empty list.
A stack is a restricted data structure, because only a small number of operations are performed on it.
Let us 'C' code for implementing a stack using Singly Linked Lists.
view plainprint?
1. #include
2. #include
3.
4. //Structure containing a Data part & a
5. //Link part to the next node in the List
6. struct Node
7. {
8. int Data;
9. struct Node *Next;
10. }*Head;
11.
12. //Poping from a Stack
13. void popStack()
14. {
15. struct Node *tmp_ptr=NULL, *cur_ptr=Head;
16.
17. if(cur_ptr)
18. {
19. Head = Head->Next;
20. free(cur_ptr);
21. }
22. else
23. printf("\nStack is Empty");
24. }
25.
26. //Pushing into Stack
27. void pushIntoStack(int num)
28. {
29. struct Node *tmp_ptr=NULL;
30.
31. if((tmp_ptr=(struct Node *)malloc(sizeof(struct Node))) != NULL)
32. tmp_ptr->Data=num;
33. else
34. printf("\nMemory Allocation Failed");
35.
36. if(Head)
37. {
38. tmp_ptr->Next=Head;
39. Head=tmp_ptr;
40. }
41. else { //List is Empty
42. Head=tmp_ptr;
43. Head->Next=NULL;
44. }
45. }
46.
47. //Displaying Stack
48. void displayStack()
49. {
50. struct Node *cur_ptr=Head;
51.
52. if(cur_ptr)
53. {
54. printf("\nElements in Stack:\n");
55. while(cur_ptr)
56. {
57. printf("\t%d\n",cur_ptr->Data);
58. cur_ptr=cur_ptr->Next;
59. }
60. printf("\n");
61. }
62. else
63. printf("\nStack is Empty");
64. }
65.
66. int main(int argc, char *argv[])
67. {
68. int i=0;
69.
70. //Set HEAD as NULL
71. Head=NULL;
72.
73. while(1)
74. {
75. printf("\n####################################################\n");
76. printf("MAIN MENU\n");
77. printf("####################################################\n");
78. printf(" \n1. Push into stack");
79. printf(" \n2. Pop from Stack");
80. printf(" \n3. Display Stack");
81. printf(" \n4. Exit\n");
82. printf(" \nChoose Option: ");
83. scanf("%d",&i);
84.
85. switch(i)
86. {
87. case 1:
88. {
89. int num;
90. printf("\nEnter a Number to push into Stack: ");
91. scanf("%d",&num);
92. pushIntoStack(num);
93. displayStack();
94. break;
95. }
96.
97. case 2:
98. {
99. popStack();
100. displayStack();
101. break;
102. }
103.
104. case 3:
105. {
106. displayStack();
107. break;
108. }
109.
110. case 4:
111. {
112. struct Node *temp;
113.
114. while(Head!=NULL)
115. {
116. temp = Head->Next;
117. free(Head);
118. Head=temp;
119. }
120. exit(0);
121. }
122.
123. default:
124. {
125. printf("\nWrong Option choosen");
126. }
127. }/* end if switch */
128. }/* end of while */
129. }/* end of main */
#include
#include
//Structure containing a Data part & a
//Link part to the next node in the List
struct Node
{
int Data;
struct Node *Next;
}*Head;
//Poping from a Stack
void popStack()
{
struct Node *tmp_ptr=NULL, *cur_ptr=Head;
if(cur_ptr)
{
Head = Head->Next;
free(cur_ptr);
}
else
printf("\nStack is Empty");
}
//Pushing into Stack
void pushIntoStack(int num)
{
struct Node *tmp_ptr=NULL;
if((tmp_ptr=(struct Node *)malloc(sizeof(struct Node))) != NULL)
tmp_ptr->Data=num;
else
printf("\nMemory Allocation Failed");
if(Head)
{
tmp_ptr->Next=Head;
Head=tmp_ptr;
}
else { //List is Empty
Head=tmp_ptr;
Head->Next=NULL;
}
}
//Displaying Stack
void displayStack()
{
struct Node *cur_ptr=Head;
if(cur_ptr)
{
printf("\nElements in Stack:\n");
while(cur_ptr)
{
printf("\t%d\n",cur_ptr->Data);
cur_ptr=cur_ptr->Next;
}
printf("\n");
}
else
printf("\nStack is Empty");
}
int main(int argc, char *argv[])
{
int i=0;
//Set HEAD as NULL
Head=NULL;
while(1)
{
printf("\n####################################################\n");
printf("MAIN MENU\n");
printf("####################################################\n");
printf(" \n1. Push into stack");
printf(" \n2. Pop from Stack");
printf(" \n3. Display Stack");
printf(" \n4. Exit\n");
printf(" \nChoose Option: ");
scanf("%d",&i);
switch(i)
{
case 1:
{
int num;
printf("\nEnter a Number to push into Stack: ");
scanf("%d",&num);
pushIntoStack(num);
displayStack();
break;
}
case 2:
{
popStack();
displayStack();
break;
}
case 3:
{
displayStack();
break;
}
case 4:
{
struct Node *temp;
while(Head!=NULL)
{
temp = Head->Next;
free(Head);
Head=temp;
}
exit(0);
}
default:
{
printf("\nWrong Option choosen");
}
}/* end if switch */
}/* end of while */
}/* end of main */
References:
Thursday, October 14, 2010
Detect deadlocks
A deadlock condition can occur when a job holds a lock on an object, object A, and is waiting to obtain a lock on another object, object B. At the same time, another job or transaction currently holds a lock on object B and is waiting to obtain a lock on object A.
Do the following steps to find out if a deadlock condition has occurred and fix it if it has:
1. Locate the suspended job in the list of active jobs. Determining the status of a job to determine if a job is suspended.
2. Look at the objects the job is waiting to lock. For transactions with transaction-scoped locks see Display locked objects for a transaction for steps. For transactions with job-scoped locks see Details: active job properties.
3. For all the objects the job is waiting to lock, look at the list of lock holders (transactions or jobs) and try to find a conflicting lock corresponding to the level requested by the suspended job.
4. If a transaction is holding a conflicting lock, display the jobs associated with this transaction and see if one of them is waiting to lock.
5. Determine if this waiting job is trying to lock one of the objects locked by the initial suspended job. When you find the job that is trying to lock on of the objects locked by the initial suspended job, you can identify the objects in question as the trouble spots.
6. Investigate the transaction in order to determine the appropriate course of action.
1. Look at the transaction properties to find out what application initiated it and then look at the application code.
2. Or trace the transaction's actions up to this point by finding the Commit cycle ID in the transaction properties and then searching in a journal for entries containing this ID. To do this, you can use the Retrieve Journal Entry (RTVJRNE) command and specify the CMTCYCID parameter.
3. After obtaining relevant information the user may choose to force a rollback or commit operation.
A deadlock condition can occur when a job holds a lock on an object, object A, and is waiting to obtain a lock on another object, object B. At the same time, another job or transaction currently holds a lock on object B and is waiting to obtain a lock on object A.
Do the following steps to find out if a deadlock condition has occurred and fix it if it has:
1. Locate the suspended job in the list of active jobs. Determining the status of a job to determine if a job is suspended.
2. Look at the objects the job is waiting to lock. For transactions with transaction-scoped locks see Display locked objects for a transaction for steps. For transactions with job-scoped locks see Details: active job properties.
3. For all the objects the job is waiting to lock, look at the list of lock holders (transactions or jobs) and try to find a conflicting lock corresponding to the level requested by the suspended job.
4. If a transaction is holding a conflicting lock, display the jobs associated with this transaction and see if one of them is waiting to lock.
5. Determine if this waiting job is trying to lock one of the objects locked by the initial suspended job. When you find the job that is trying to lock on of the objects locked by the initial suspended job, you can identify the objects in question as the trouble spots.
6. Investigate the transaction in order to determine the appropriate course of action.
1. Look at the transaction properties to find out what application initiated it and then look at the application code.
2. Or trace the transaction's actions up to this point by finding the Commit cycle ID in the transaction properties and then searching in a journal for entries containing this ID. To do this, you can use the Retrieve Journal Entry (RTVJRNE) command and specify the CMTCYCID parameter.
3. After obtaining relevant information the user may choose to force a rollback or commit operation.
Inline assembly for x86 in Linux
If you're a Linux kernel developer, you probably find yourself coding highly architecture-dependent functions or optimizing a code path pretty often. And you probably do this by inserting assembly language instructions into the middle of C statements (a method otherwise known as inline assembly). Let's take a look at the specific usage of inline assembly in Linux. (We'll limit our discussion to the IA32 assembly.)
GNU assembler syntax in brief
Let's first look at the basic assembler syntax used in Linux. GCC, the GNU C Compiler for Linux, uses AT&T assembly syntax. Some of the basic rules of this syntax are listed below. (The list is by no means complete; I've included only those rules pertinent to inline assembly.)
Register naming
Register names are prefixed by %. That is, if eax has to be used, it should be used as %eax.
Source and destination ordering
In any instruction, source comes first and destination follows. This differs from Intel syntax, where source comes after destination.
mov %eax, %ebx, transfers the contents of eax to ebx.
Size of operand
The instructions are suffixed by b, w, or l, depending on whether the operand is a byte, word, or long. This is not mandatory; GCC tries provide the appropriate suffix by reading the operands. But specifying the suffixes manually improves the code readability and eliminates the possibility of the compilers guessing incorrectly.
movb %al, %bl -- Byte move
movw %ax, %bx -- Word move
movl %eax, %ebx -- Longword move
Immediate operand
An immediate operand is specified by using $.
movl $0xffff, %eax -- will move the value of 0xffff into eax register.
Indirect memory reference
Any indirect references to memory are done by using ( ).
movb (%esi), %al -- will transfer the byte in the memory
pointed by esi into al register
Back to top
Inline assembly
GCC provides the special construct "asm" for inline assembly, which has the following format:
asm ( assembler template
: output operands (optional)
: input operands (optional)
: list of clobbered registers (optional)
);
In this example, the assembler template consists of assembly instructions. The input operands are the C expressions that serve as input operands to the instructions. The output operands are the C expressions on which the output of the assembly instructions will be performed.
asm ("movl %%cr3, %0\n" :"=r"(cr3val));
a %eax
b %ebx
c %ecx
d %edx
S %esi
D %edi
Memory operand constraint(m)
When the operands are in the memory, any operations performed on them will occur directly in the memory location, as opposed to register constraints, which first store the value in a register to be modified and then write it back to the memory location. But register constraints are usually used only when they are absolutely necessary for an instruction or they significantly speed up the process. Memory constraints can be used most efficiently in cases where a C variable needs to be updated inside "asm" and you really don't want to use a register to hold its value. For example, the value of idtr is stored in the memory location loc:
("sidt %0\n" : :"m"(loc));
Matching(Digit) constraints
In some cases, a single variable may serve as both the input and the output operand. Such cases may be specified in "asm" by using matching constraints.
asm ("incl %0" :"=a"(var):"0"(var));
In our example for matching constraints, the register %eax is used as both the input and the output variable. var input is read to %eax and updated %eax is stored in var again after increment. "0" here specifies the same constraint as the 0th output variable. That is, it specifies that the output instance of var should be stored in %eax only. This constraint can be used:
* In cases where input is read from a variable or the variable is modified and modification is written back to the same variable
* In cases where separate instances of input and output operands are not necessary
The most important effect of using matching restraints is that they lead to the efficient use of available registers.
Back to top
Examples of common inline assembly usage
The following examples illustrate usage through different operand constraints. There are too many constraints to give examples for each one, but these are the most frequently used constraint types.
"asm" and the register constraint "r"
Let's first take a look at "asm" with the register constraint 'r'. Our example shows how GCC allocates registers, and how it updates the value of output variables.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=r"(y) /* y is output operand */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
In this example, the value of x is copied to y inside "asm". x and y are passed to "asm" by being stored in registers. The assembly code generated for this example looks like this:
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%edx /* x=10 is stored in %edx */
#APP /* asm starts here */
movl %edx, %eax /* x is moved to %eax */
movl %eax, %edx /* y is allocated in edx and updated */
#NO_APP /* asm ends here */
movl %edx,-8(%ebp) /* value of y in stack is updated with
the value in %edx */
GCC is free here to allocate any register when the "r" constraint is used. In our example it chose %edx for storing x. After reading the value of x in %edx, it allocated the same register for y.
Since y is specified in the output operand section, the updated value in %edx is stored in -8(%ebp), the location of y on stack. If y were specified in the input section, the value of y on stack would not be updated, even though it does get updated in the temporary register storage of y(%edx).
And since %eax is specified in the clobbered list, GCC doesn't use it anywhere else to store data.
Both input x and output y were allocated in the same %edx register, assuming that inputs are consumed before outputs are produced. Note that if you have a lot of instructions, this may not be the case. To make sure that input and output are allocated in different registers, we can specify the & constraint modifier. Here is our example with the constraint modifier added.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=&r"(y) /* y is output operand, note the
& constraint modifier. */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
And here is the assembly code generated for this example, from which it is evident that x and y have been stored in different registers across "asm".
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%ecx /* x, the input is in %ecx */
#APP
movl %ecx, %eax
movl %eax, %edx /* y, the output is in %edx */
#NO_APP
movl %edx,-8(%ebp)
Back to top
Use of specific register constraints
Now let's take a look at how to specify individual registers as constraints for the operands. In the following example, the cpuid instruction takes the input in the %eax register and gives output in four registers: %eax, %ebx, %ecx, %edx. The input to cpuid (the variable "op") is passed to "asm" in the eax register, as cpuid expects it to. The a, b, c, and d constraints are used in the output to collect the values in the four registers, respectively.
asm ("cpuid"
: "=a" (_eax),
"=b" (_ebx),
"=c" (_ecx),
"=d" (_edx)
: "a" (op));
And below you can see the generated assembly code for this (assuming the _eax, _ebx, etc.... variables are stored on stack):
movl -20(%ebp),%eax /* store 'op' in %eax -- input */
#APP
cpuid
#NO_APP
movl %eax,-4(%ebp) /* store %eax in _eax -- output */
movl %ebx,-8(%ebp) /* store other registers in
movl %ecx,-12(%ebp) respective output variables */
movl %edx,-16(%ebp)
The strcpy function can be implemented using the "S" and "D" constraints in the following manner:
asm ("cld\n
rep\n
movsb"
: /* no input */
:"S"(src), "D"(dst), "c"(count));
The source pointer src is put into %esi by using the "S" constraint, and the destination pointer dst is put into %edi using the "D" constraint. The count value is put into %ecx as it is needed by rep prefix.
And here you can see another constraint that uses the two registers %eax and %edx to combine two 32-bit values and generate a 64-bit value:
#define rdtscll(val) \
__asm__ __volatile__ ("rdtsc" : "=A" (val))
The generated assembly looks like this (if val has a 64 bit memory space).
#APP
rdtsc
#NO_APP
movl %eax,-8(%ebp) /* As a result of A constraint
movl %edx,-4(%ebp) %eax and %edx serve as outputs */
Note here that the values in %edx:%eax serve as 64 bit output.
Back to top
Using matching constraints
Here you can see the code for the system call, with four parameters:
#define _syscall4(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4) \
type name (type1 arg1, type2 arg2, type3 arg3, type4 arg4) \
{ \
long __res; \
__asm__ volatile ("int $0x80" \
: "=a" (__res) \
: "0" (__NR_##name),"b" ((long)(arg1)),"c" ((long)(arg2)), \
"d" ((long)(arg3)),"S" ((long)(arg4))); \
__syscall_return(type,__res); \
}
In the above example, four arguments to the system call are put into %ebx, %ecx, %edx, and %esi by using the constraints b, c, d, and S. Note that the "=a" constraint is used in the output so that the return value of the system call, which is in %eax, is put into the variable __res. By using the matching constraint "0" as the first operand constraint in the input section, the syscall number __NR_##name is put into %eax and serves as the input to the system call. Thus %eax serves here as both input and output register. No separate registers are used for this purpose. Note also that the input (syscall number) is consumed (used) before the output (the return value of syscall) is produced.
Back to top
Use of memory operand constraint
Consider the following atomic decrement operation:
__asm__ __volatile__(
"lock; decl %0"
:"=m" (counter)
:"m" (counter));
The generated assembly for this would look something like this:
#APP
lock
decl -24(%ebp) /* counter is modified on its memory location */
#NO_APP.
You might think of using the register constraint here for the counter. If you do, the value of the counter must first be copied on to a register, decremented, and then updated to its memory. But then you lose the whole purpose of locking and atomicity, which clearly shows the necessity of using the memory constraint.
Back to top
Using clobbered registers
Consider an elementary implementation of memory copy.
asm ("movl $count, %%ecx;
up: lodsl;
stosl;
loop up;"
: /* no output */
:"S"(src), "D"(dst) /* input */
:"%ecx", "%eax" ); /* clobbered list */
While lodsl modifies %eax, the lodsl and stosl instructions use it implicitly. And the %ecx register explicitly loads the count. But GCC won't know this unless we inform it, which is exactly what we do by including %eax and %ecx in the clobbered register set. Unless this is done, GCC assumes that %eax and %ecx are free, and it may decide to use them for storing other data. Note here that %esi and %edi are used by "asm", and are not in the clobbered list. This is because it has been declared that "asm" will use them in the input operand list. The bottom line here is that if a register is used inside "asm" (implicitly or explicitly), and it is not present in either the input or output operand list, you must list it as a clobbered register.
Back to top
Conclusion
On the whole, inline assembly is huge and provides a lot of features that we did not even touch on here. But with a basic grasp of the material in this article, you should be able to start coding inline assembly on your own.
If you're a Linux kernel developer, you probably find yourself coding highly architecture-dependent functions or optimizing a code path pretty often. And you probably do this by inserting assembly language instructions into the middle of C statements (a method otherwise known as inline assembly). Let's take a look at the specific usage of inline assembly in Linux. (We'll limit our discussion to the IA32 assembly.)
GNU assembler syntax in brief
Let's first look at the basic assembler syntax used in Linux. GCC, the GNU C Compiler for Linux, uses AT&T assembly syntax. Some of the basic rules of this syntax are listed below. (The list is by no means complete; I've included only those rules pertinent to inline assembly.)
Register naming
Register names are prefixed by %. That is, if eax has to be used, it should be used as %eax.
Source and destination ordering
In any instruction, source comes first and destination follows. This differs from Intel syntax, where source comes after destination.
mov %eax, %ebx, transfers the contents of eax to ebx.
Size of operand
The instructions are suffixed by b, w, or l, depending on whether the operand is a byte, word, or long. This is not mandatory; GCC tries provide the appropriate suffix by reading the operands. But specifying the suffixes manually improves the code readability and eliminates the possibility of the compilers guessing incorrectly.
movb %al, %bl -- Byte move
movw %ax, %bx -- Word move
movl %eax, %ebx -- Longword move
Immediate operand
An immediate operand is specified by using $.
movl $0xffff, %eax -- will move the value of 0xffff into eax register.
Indirect memory reference
Any indirect references to memory are done by using ( ).
movb (%esi), %al -- will transfer the byte in the memory
pointed by esi into al register
Back to top
Inline assembly
GCC provides the special construct "asm" for inline assembly, which has the following format:
asm ( assembler template
: output operands (optional)
: input operands (optional)
: list of clobbered registers (optional)
);
In this example, the assembler template consists of assembly instructions. The input operands are the C expressions that serve as input operands to the instructions. The output operands are the C expressions on which the output of the assembly instructions will be performed.
asm ("movl %%cr3, %0\n" :"=r"(cr3val));
a %eax
b %ebx
c %ecx
d %edx
S %esi
D %edi
Memory operand constraint(m)
When the operands are in the memory, any operations performed on them will occur directly in the memory location, as opposed to register constraints, which first store the value in a register to be modified and then write it back to the memory location. But register constraints are usually used only when they are absolutely necessary for an instruction or they significantly speed up the process. Memory constraints can be used most efficiently in cases where a C variable needs to be updated inside "asm" and you really don't want to use a register to hold its value. For example, the value of idtr is stored in the memory location loc:
("sidt %0\n" : :"m"(loc));
Matching(Digit) constraints
In some cases, a single variable may serve as both the input and the output operand. Such cases may be specified in "asm" by using matching constraints.
asm ("incl %0" :"=a"(var):"0"(var));
In our example for matching constraints, the register %eax is used as both the input and the output variable. var input is read to %eax and updated %eax is stored in var again after increment. "0" here specifies the same constraint as the 0th output variable. That is, it specifies that the output instance of var should be stored in %eax only. This constraint can be used:
* In cases where input is read from a variable or the variable is modified and modification is written back to the same variable
* In cases where separate instances of input and output operands are not necessary
The most important effect of using matching restraints is that they lead to the efficient use of available registers.
Back to top
Examples of common inline assembly usage
The following examples illustrate usage through different operand constraints. There are too many constraints to give examples for each one, but these are the most frequently used constraint types.
"asm" and the register constraint "r"
Let's first take a look at "asm" with the register constraint 'r'. Our example shows how GCC allocates registers, and how it updates the value of output variables.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=r"(y) /* y is output operand */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
In this example, the value of x is copied to y inside "asm". x and y are passed to "asm" by being stored in registers. The assembly code generated for this example looks like this:
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%edx /* x=10 is stored in %edx */
#APP /* asm starts here */
movl %edx, %eax /* x is moved to %eax */
movl %eax, %edx /* y is allocated in edx and updated */
#NO_APP /* asm ends here */
movl %edx,-8(%ebp) /* value of y in stack is updated with
the value in %edx */
GCC is free here to allocate any register when the "r" constraint is used. In our example it chose %edx for storing x. After reading the value of x in %edx, it allocated the same register for y.
Since y is specified in the output operand section, the updated value in %edx is stored in -8(%ebp), the location of y on stack. If y were specified in the input section, the value of y on stack would not be updated, even though it does get updated in the temporary register storage of y(%edx).
And since %eax is specified in the clobbered list, GCC doesn't use it anywhere else to store data.
Both input x and output y were allocated in the same %edx register, assuming that inputs are consumed before outputs are produced. Note that if you have a lot of instructions, this may not be the case. To make sure that input and output are allocated in different registers, we can specify the & constraint modifier. Here is our example with the constraint modifier added.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=&r"(y) /* y is output operand, note the
& constraint modifier. */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
And here is the assembly code generated for this example, from which it is evident that x and y have been stored in different registers across "asm".
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%ecx /* x, the input is in %ecx */
#APP
movl %ecx, %eax
movl %eax, %edx /* y, the output is in %edx */
#NO_APP
movl %edx,-8(%ebp)
Back to top
Use of specific register constraints
Now let's take a look at how to specify individual registers as constraints for the operands. In the following example, the cpuid instruction takes the input in the %eax register and gives output in four registers: %eax, %ebx, %ecx, %edx. The input to cpuid (the variable "op") is passed to "asm" in the eax register, as cpuid expects it to. The a, b, c, and d constraints are used in the output to collect the values in the four registers, respectively.
asm ("cpuid"
: "=a" (_eax),
"=b" (_ebx),
"=c" (_ecx),
"=d" (_edx)
: "a" (op));
And below you can see the generated assembly code for this (assuming the _eax, _ebx, etc.... variables are stored on stack):
movl -20(%ebp),%eax /* store 'op' in %eax -- input */
#APP
cpuid
#NO_APP
movl %eax,-4(%ebp) /* store %eax in _eax -- output */
movl %ebx,-8(%ebp) /* store other registers in
movl %ecx,-12(%ebp) respective output variables */
movl %edx,-16(%ebp)
The strcpy function can be implemented using the "S" and "D" constraints in the following manner:
asm ("cld\n
rep\n
movsb"
: /* no input */
:"S"(src), "D"(dst), "c"(count));
The source pointer src is put into %esi by using the "S" constraint, and the destination pointer dst is put into %edi using the "D" constraint. The count value is put into %ecx as it is needed by rep prefix.
And here you can see another constraint that uses the two registers %eax and %edx to combine two 32-bit values and generate a 64-bit value:
#define rdtscll(val) \
__asm__ __volatile__ ("rdtsc" : "=A" (val))
The generated assembly looks like this (if val has a 64 bit memory space).
#APP
rdtsc
#NO_APP
movl %eax,-8(%ebp) /* As a result of A constraint
movl %edx,-4(%ebp) %eax and %edx serve as outputs */
Note here that the values in %edx:%eax serve as 64 bit output.
Back to top
Using matching constraints
Here you can see the code for the system call, with four parameters:
#define _syscall4(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4) \
type name (type1 arg1, type2 arg2, type3 arg3, type4 arg4) \
{ \
long __res; \
__asm__ volatile ("int $0x80" \
: "=a" (__res) \
: "0" (__NR_##name),"b" ((long)(arg1)),"c" ((long)(arg2)), \
"d" ((long)(arg3)),"S" ((long)(arg4))); \
__syscall_return(type,__res); \
}
In the above example, four arguments to the system call are put into %ebx, %ecx, %edx, and %esi by using the constraints b, c, d, and S. Note that the "=a" constraint is used in the output so that the return value of the system call, which is in %eax, is put into the variable __res. By using the matching constraint "0" as the first operand constraint in the input section, the syscall number __NR_##name is put into %eax and serves as the input to the system call. Thus %eax serves here as both input and output register. No separate registers are used for this purpose. Note also that the input (syscall number) is consumed (used) before the output (the return value of syscall) is produced.
Back to top
Use of memory operand constraint
Consider the following atomic decrement operation:
__asm__ __volatile__(
"lock; decl %0"
:"=m" (counter)
:"m" (counter));
The generated assembly for this would look something like this:
#APP
lock
decl -24(%ebp) /* counter is modified on its memory location */
#NO_APP.
You might think of using the register constraint here for the counter. If you do, the value of the counter must first be copied on to a register, decremented, and then updated to its memory. But then you lose the whole purpose of locking and atomicity, which clearly shows the necessity of using the memory constraint.
Back to top
Using clobbered registers
Consider an elementary implementation of memory copy.
asm ("movl $count, %%ecx;
up: lodsl;
stosl;
loop up;"
: /* no output */
:"S"(src), "D"(dst) /* input */
:"%ecx", "%eax" ); /* clobbered list */
While lodsl modifies %eax, the lodsl and stosl instructions use it implicitly. And the %ecx register explicitly loads the count. But GCC won't know this unless we inform it, which is exactly what we do by including %eax and %ecx in the clobbered register set. Unless this is done, GCC assumes that %eax and %ecx are free, and it may decide to use them for storing other data. Note here that %esi and %edi are used by "asm", and are not in the clobbered list. This is because it has been declared that "asm" will use them in the input operand list. The bottom line here is that if a register is used inside "asm" (implicitly or explicitly), and it is not present in either the input or output operand list, you must list it as a clobbered register.
Back to top
Conclusion
On the whole, inline assembly is huge and provides a lot of features that we did not even touch on here. But with a basic grasp of the material in this article, you should be able to start coding inline assembly on your own.
Inline assembly for x86 in Linux
If you're a Linux kernel developer, you probably find yourself coding highly architecture-dependent functions or optimizing a code path pretty often. And you probably do this by inserting assembly language instructions into the middle of C statements (a method otherwise known as inline assembly). Let's take a look at the specific usage of inline assembly in Linux. (We'll limit our discussion to the IA32 assembly.)
GNU assembler syntax in brief
Let's first look at the basic assembler syntax used in Linux. GCC, the GNU C Compiler for Linux, uses AT&T assembly syntax. Some of the basic rules of this syntax are listed below. (The list is by no means complete; I've included only those rules pertinent to inline assembly.)
Register naming
Register names are prefixed by %. That is, if eax has to be used, it should be used as %eax.
Source and destination ordering
In any instruction, source comes first and destination follows. This differs from Intel syntax, where source comes after destination.
mov %eax, %ebx, transfers the contents of eax to ebx.
Size of operand
The instructions are suffixed by b, w, or l, depending on whether the operand is a byte, word, or long. This is not mandatory; GCC tries provide the appropriate suffix by reading the operands. But specifying the suffixes manually improves the code readability and eliminates the possibility of the compilers guessing incorrectly.
movb %al, %bl -- Byte move
movw %ax, %bx -- Word move
movl %eax, %ebx -- Longword move
Immediate operand
An immediate operand is specified by using $.
movl $0xffff, %eax -- will move the value of 0xffff into eax register.
Indirect memory reference
Any indirect references to memory are done by using ( ).
movb (%esi), %al -- will transfer the byte in the memory
pointed by esi into al register
Back to top
Inline assembly
GCC provides the special construct "asm" for inline assembly, which has the following format:
asm ( assembler template
: output operands (optional)
: input operands (optional)
: list of clobbered registers (optional)
);
In this example, the assembler template consists of assembly instructions. The input operands are the C expressions that serve as input operands to the instructions. The output operands are the C expressions on which the output of the assembly instructions will be performed.
asm ("movl %%cr3, %0\n" :"=r"(cr3val));
a %eax
b %ebx
c %ecx
d %edx
S %esi
D %edi
Memory operand constraint(m)
When the operands are in the memory, any operations performed on them will occur directly in the memory location, as opposed to register constraints, which first store the value in a register to be modified and then write it back to the memory location. But register constraints are usually used only when they are absolutely necessary for an instruction or they significantly speed up the process. Memory constraints can be used most efficiently in cases where a C variable needs to be updated inside "asm" and you really don't want to use a register to hold its value. For example, the value of idtr is stored in the memory location loc:
("sidt %0\n" : :"m"(loc));
Matching(Digit) constraints
In some cases, a single variable may serve as both the input and the output operand. Such cases may be specified in "asm" by using matching constraints.
asm ("incl %0" :"=a"(var):"0"(var));
In our example for matching constraints, the register %eax is used as both the input and the output variable. var input is read to %eax and updated %eax is stored in var again after increment. "0" here specifies the same constraint as the 0th output variable. That is, it specifies that the output instance of var should be stored in %eax only. This constraint can be used:
* In cases where input is read from a variable or the variable is modified and modification is written back to the same variable
* In cases where separate instances of input and output operands are not necessary
The most important effect of using matching restraints is that they lead to the efficient use of available registers.
Back to top
Examples of common inline assembly usage
The following examples illustrate usage through different operand constraints. There are too many constraints to give examples for each one, but these are the most frequently used constraint types.
"asm" and the register constraint "r"
Let's first take a look at "asm" with the register constraint 'r'. Our example shows how GCC allocates registers, and how it updates the value of output variables.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=r"(y) /* y is output operand */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
In this example, the value of x is copied to y inside "asm". x and y are passed to "asm" by being stored in registers. The assembly code generated for this example looks like this:
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%edx /* x=10 is stored in %edx */
#APP /* asm starts here */
movl %edx, %eax /* x is moved to %eax */
movl %eax, %edx /* y is allocated in edx and updated */
#NO_APP /* asm ends here */
movl %edx,-8(%ebp) /* value of y in stack is updated with
the value in %edx */
GCC is free here to allocate any register when the "r" constraint is used. In our example it chose %edx for storing x. After reading the value of x in %edx, it allocated the same register for y.
Since y is specified in the output operand section, the updated value in %edx is stored in -8(%ebp), the location of y on stack. If y were specified in the input section, the value of y on stack would not be updated, even though it does get updated in the temporary register storage of y(%edx).
And since %eax is specified in the clobbered list, GCC doesn't use it anywhere else to store data.
Both input x and output y were allocated in the same %edx register, assuming that inputs are consumed before outputs are produced. Note that if you have a lot of instructions, this may not be the case. To make sure that input and output are allocated in different registers, we can specify the & constraint modifier. Here is our example with the constraint modifier added.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=&r"(y) /* y is output operand, note the
& constraint modifier. */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
And here is the assembly code generated for this example, from which it is evident that x and y have been stored in different registers across "asm".
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%ecx /* x, the input is in %ecx */
#APP
movl %ecx, %eax
movl %eax, %edx /* y, the output is in %edx */
#NO_APP
movl %edx,-8(%ebp)
Back to top
Use of specific register constraints
Now let's take a look at how to specify individual registers as constraints for the operands. In the following example, the cpuid instruction takes the input in the %eax register and gives output in four registers: %eax, %ebx, %ecx, %edx. The input to cpuid (the variable "op") is passed to "asm" in the eax register, as cpuid expects it to. The a, b, c, and d constraints are used in the output to collect the values in the four registers, respectively.
asm ("cpuid"
: "=a" (_eax),
"=b" (_ebx),
"=c" (_ecx),
"=d" (_edx)
: "a" (op));
And below you can see the generated assembly code for this (assuming the _eax, _ebx, etc.... variables are stored on stack):
movl -20(%ebp),%eax /* store 'op' in %eax -- input */
#APP
cpuid
#NO_APP
movl %eax,-4(%ebp) /* store %eax in _eax -- output */
movl %ebx,-8(%ebp) /* store other registers in
movl %ecx,-12(%ebp) respective output variables */
movl %edx,-16(%ebp)
The strcpy function can be implemented using the "S" and "D" constraints in the following manner:
asm ("cld\n
rep\n
movsb"
: /* no input */
:"S"(src), "D"(dst), "c"(count));
The source pointer src is put into %esi by using the "S" constraint, and the destination pointer dst is put into %edi using the "D" constraint. The count value is put into %ecx as it is needed by rep prefix.
And here you can see another constraint that uses the two registers %eax and %edx to combine two 32-bit values and generate a 64-bit value:
#define rdtscll(val) \
__asm__ __volatile__ ("rdtsc" : "=A" (val))
The generated assembly looks like this (if val has a 64 bit memory space).
#APP
rdtsc
#NO_APP
movl %eax,-8(%ebp) /* As a result of A constraint
movl %edx,-4(%ebp) %eax and %edx serve as outputs */
Note here that the values in %edx:%eax serve as 64 bit output.
Back to top
Using matching constraints
Here you can see the code for the system call, with four parameters:
#define _syscall4(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4) \
type name (type1 arg1, type2 arg2, type3 arg3, type4 arg4) \
{ \
long __res; \
__asm__ volatile ("int $0x80" \
: "=a" (__res) \
: "0" (__NR_##name),"b" ((long)(arg1)),"c" ((long)(arg2)), \
"d" ((long)(arg3)),"S" ((long)(arg4))); \
__syscall_return(type,__res); \
}
In the above example, four arguments to the system call are put into %ebx, %ecx, %edx, and %esi by using the constraints b, c, d, and S. Note that the "=a" constraint is used in the output so that the return value of the system call, which is in %eax, is put into the variable __res. By using the matching constraint "0" as the first operand constraint in the input section, the syscall number __NR_##name is put into %eax and serves as the input to the system call. Thus %eax serves here as both input and output register. No separate registers are used for this purpose. Note also that the input (syscall number) is consumed (used) before the output (the return value of syscall) is produced.
Back to top
Use of memory operand constraint
Consider the following atomic decrement operation:
__asm__ __volatile__(
"lock; decl %0"
:"=m" (counter)
:"m" (counter));
The generated assembly for this would look something like this:
#APP
lock
decl -24(%ebp) /* counter is modified on its memory location */
#NO_APP.
You might think of using the register constraint here for the counter. If you do, the value of the counter must first be copied on to a register, decremented, and then updated to its memory. But then you lose the whole purpose of locking and atomicity, which clearly shows the necessity of using the memory constraint.
Back to top
Using clobbered registers
Consider an elementary implementation of memory copy.
asm ("movl $count, %%ecx;
up: lodsl;
stosl;
loop up;"
: /* no output */
:"S"(src), "D"(dst) /* input */
:"%ecx", "%eax" ); /* clobbered list */
While lodsl modifies %eax, the lodsl and stosl instructions use it implicitly. And the %ecx register explicitly loads the count. But GCC won't know this unless we inform it, which is exactly what we do by including %eax and %ecx in the clobbered register set. Unless this is done, GCC assumes that %eax and %ecx are free, and it may decide to use them for storing other data. Note here that %esi and %edi are used by "asm", and are not in the clobbered list. This is because it has been declared that "asm" will use them in the input operand list. The bottom line here is that if a register is used inside "asm" (implicitly or explicitly), and it is not present in either the input or output operand list, you must list it as a clobbered register.
Back to top
Conclusion
On the whole, inline assembly is huge and provides a lot of features that we did not even touch on here. But with a basic grasp of the material in this article, you should be able to start coding inline assembly on your own.
If you're a Linux kernel developer, you probably find yourself coding highly architecture-dependent functions or optimizing a code path pretty often. And you probably do this by inserting assembly language instructions into the middle of C statements (a method otherwise known as inline assembly). Let's take a look at the specific usage of inline assembly in Linux. (We'll limit our discussion to the IA32 assembly.)
GNU assembler syntax in brief
Let's first look at the basic assembler syntax used in Linux. GCC, the GNU C Compiler for Linux, uses AT&T assembly syntax. Some of the basic rules of this syntax are listed below. (The list is by no means complete; I've included only those rules pertinent to inline assembly.)
Register naming
Register names are prefixed by %. That is, if eax has to be used, it should be used as %eax.
Source and destination ordering
In any instruction, source comes first and destination follows. This differs from Intel syntax, where source comes after destination.
mov %eax, %ebx, transfers the contents of eax to ebx.
Size of operand
The instructions are suffixed by b, w, or l, depending on whether the operand is a byte, word, or long. This is not mandatory; GCC tries provide the appropriate suffix by reading the operands. But specifying the suffixes manually improves the code readability and eliminates the possibility of the compilers guessing incorrectly.
movb %al, %bl -- Byte move
movw %ax, %bx -- Word move
movl %eax, %ebx -- Longword move
Immediate operand
An immediate operand is specified by using $.
movl $0xffff, %eax -- will move the value of 0xffff into eax register.
Indirect memory reference
Any indirect references to memory are done by using ( ).
movb (%esi), %al -- will transfer the byte in the memory
pointed by esi into al register
Back to top
Inline assembly
GCC provides the special construct "asm" for inline assembly, which has the following format:
asm ( assembler template
: output operands (optional)
: input operands (optional)
: list of clobbered registers (optional)
);
In this example, the assembler template consists of assembly instructions. The input operands are the C expressions that serve as input operands to the instructions. The output operands are the C expressions on which the output of the assembly instructions will be performed.
asm ("movl %%cr3, %0\n" :"=r"(cr3val));
a %eax
b %ebx
c %ecx
d %edx
S %esi
D %edi
Memory operand constraint(m)
When the operands are in the memory, any operations performed on them will occur directly in the memory location, as opposed to register constraints, which first store the value in a register to be modified and then write it back to the memory location. But register constraints are usually used only when they are absolutely necessary for an instruction or they significantly speed up the process. Memory constraints can be used most efficiently in cases where a C variable needs to be updated inside "asm" and you really don't want to use a register to hold its value. For example, the value of idtr is stored in the memory location loc:
("sidt %0\n" : :"m"(loc));
Matching(Digit) constraints
In some cases, a single variable may serve as both the input and the output operand. Such cases may be specified in "asm" by using matching constraints.
asm ("incl %0" :"=a"(var):"0"(var));
In our example for matching constraints, the register %eax is used as both the input and the output variable. var input is read to %eax and updated %eax is stored in var again after increment. "0" here specifies the same constraint as the 0th output variable. That is, it specifies that the output instance of var should be stored in %eax only. This constraint can be used:
* In cases where input is read from a variable or the variable is modified and modification is written back to the same variable
* In cases where separate instances of input and output operands are not necessary
The most important effect of using matching restraints is that they lead to the efficient use of available registers.
Back to top
Examples of common inline assembly usage
The following examples illustrate usage through different operand constraints. There are too many constraints to give examples for each one, but these are the most frequently used constraint types.
"asm" and the register constraint "r"
Let's first take a look at "asm" with the register constraint 'r'. Our example shows how GCC allocates registers, and how it updates the value of output variables.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=r"(y) /* y is output operand */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
In this example, the value of x is copied to y inside "asm". x and y are passed to "asm" by being stored in registers. The assembly code generated for this example looks like this:
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%edx /* x=10 is stored in %edx */
#APP /* asm starts here */
movl %edx, %eax /* x is moved to %eax */
movl %eax, %edx /* y is allocated in edx and updated */
#NO_APP /* asm ends here */
movl %edx,-8(%ebp) /* value of y in stack is updated with
the value in %edx */
GCC is free here to allocate any register when the "r" constraint is used. In our example it chose %edx for storing x. After reading the value of x in %edx, it allocated the same register for y.
Since y is specified in the output operand section, the updated value in %edx is stored in -8(%ebp), the location of y on stack. If y were specified in the input section, the value of y on stack would not be updated, even though it does get updated in the temporary register storage of y(%edx).
And since %eax is specified in the clobbered list, GCC doesn't use it anywhere else to store data.
Both input x and output y were allocated in the same %edx register, assuming that inputs are consumed before outputs are produced. Note that if you have a lot of instructions, this may not be the case. To make sure that input and output are allocated in different registers, we can specify the & constraint modifier. Here is our example with the constraint modifier added.
int main(void)
{
int x = 10, y;
asm ("movl %1, %%eax;
"movl %%eax, %0;"
:"=&r"(y) /* y is output operand, note the
& constraint modifier. */
:"r"(x) /* x is input operand */
:"%eax"); /* %eax is clobbered register */
}
And here is the assembly code generated for this example, from which it is evident that x and y have been stored in different registers across "asm".
main:
pushl %ebp
movl %esp,%ebp
subl $8,%esp
movl $10,-4(%ebp)
movl -4(%ebp),%ecx /* x, the input is in %ecx */
#APP
movl %ecx, %eax
movl %eax, %edx /* y, the output is in %edx */
#NO_APP
movl %edx,-8(%ebp)
Back to top
Use of specific register constraints
Now let's take a look at how to specify individual registers as constraints for the operands. In the following example, the cpuid instruction takes the input in the %eax register and gives output in four registers: %eax, %ebx, %ecx, %edx. The input to cpuid (the variable "op") is passed to "asm" in the eax register, as cpuid expects it to. The a, b, c, and d constraints are used in the output to collect the values in the four registers, respectively.
asm ("cpuid"
: "=a" (_eax),
"=b" (_ebx),
"=c" (_ecx),
"=d" (_edx)
: "a" (op));
And below you can see the generated assembly code for this (assuming the _eax, _ebx, etc.... variables are stored on stack):
movl -20(%ebp),%eax /* store 'op' in %eax -- input */
#APP
cpuid
#NO_APP
movl %eax,-4(%ebp) /* store %eax in _eax -- output */
movl %ebx,-8(%ebp) /* store other registers in
movl %ecx,-12(%ebp) respective output variables */
movl %edx,-16(%ebp)
The strcpy function can be implemented using the "S" and "D" constraints in the following manner:
asm ("cld\n
rep\n
movsb"
: /* no input */
:"S"(src), "D"(dst), "c"(count));
The source pointer src is put into %esi by using the "S" constraint, and the destination pointer dst is put into %edi using the "D" constraint. The count value is put into %ecx as it is needed by rep prefix.
And here you can see another constraint that uses the two registers %eax and %edx to combine two 32-bit values and generate a 64-bit value:
#define rdtscll(val) \
__asm__ __volatile__ ("rdtsc" : "=A" (val))
The generated assembly looks like this (if val has a 64 bit memory space).
#APP
rdtsc
#NO_APP
movl %eax,-8(%ebp) /* As a result of A constraint
movl %edx,-4(%ebp) %eax and %edx serve as outputs */
Note here that the values in %edx:%eax serve as 64 bit output.
Back to top
Using matching constraints
Here you can see the code for the system call, with four parameters:
#define _syscall4(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4) \
type name (type1 arg1, type2 arg2, type3 arg3, type4 arg4) \
{ \
long __res; \
__asm__ volatile ("int $0x80" \
: "=a" (__res) \
: "0" (__NR_##name),"b" ((long)(arg1)),"c" ((long)(arg2)), \
"d" ((long)(arg3)),"S" ((long)(arg4))); \
__syscall_return(type,__res); \
}
In the above example, four arguments to the system call are put into %ebx, %ecx, %edx, and %esi by using the constraints b, c, d, and S. Note that the "=a" constraint is used in the output so that the return value of the system call, which is in %eax, is put into the variable __res. By using the matching constraint "0" as the first operand constraint in the input section, the syscall number __NR_##name is put into %eax and serves as the input to the system call. Thus %eax serves here as both input and output register. No separate registers are used for this purpose. Note also that the input (syscall number) is consumed (used) before the output (the return value of syscall) is produced.
Back to top
Use of memory operand constraint
Consider the following atomic decrement operation:
__asm__ __volatile__(
"lock; decl %0"
:"=m" (counter)
:"m" (counter));
The generated assembly for this would look something like this:
#APP
lock
decl -24(%ebp) /* counter is modified on its memory location */
#NO_APP.
You might think of using the register constraint here for the counter. If you do, the value of the counter must first be copied on to a register, decremented, and then updated to its memory. But then you lose the whole purpose of locking and atomicity, which clearly shows the necessity of using the memory constraint.
Back to top
Using clobbered registers
Consider an elementary implementation of memory copy.
asm ("movl $count, %%ecx;
up: lodsl;
stosl;
loop up;"
: /* no output */
:"S"(src), "D"(dst) /* input */
:"%ecx", "%eax" ); /* clobbered list */
While lodsl modifies %eax, the lodsl and stosl instructions use it implicitly. And the %ecx register explicitly loads the count. But GCC won't know this unless we inform it, which is exactly what we do by including %eax and %ecx in the clobbered register set. Unless this is done, GCC assumes that %eax and %ecx are free, and it may decide to use them for storing other data. Note here that %esi and %edi are used by "asm", and are not in the clobbered list. This is because it has been declared that "asm" will use them in the input operand list. The bottom line here is that if a register is used inside "asm" (implicitly or explicitly), and it is not present in either the input or output operand list, you must list it as a clobbered register.
Back to top
Conclusion
On the whole, inline assembly is huge and provides a lot of features that we did not even touch on here. But with a basic grasp of the material in this article, you should be able to start coding inline assembly on your own.
Spinlocks and Read-Write Locks
Most parallel programming in some way will involve the use of locking at the lowest levels. Locks are primitives that provide mutual exclusion that allow data structures to remain in consistent states. Without locking, multiple threads of execution may simultaneously modify a data structure. Without a carefully thought out (and usually complex) lock-free algorithm, the result is usually a crash of hang as unintended program states are entered. Since the creation of a lock-free algorithm is extremely difficult, most programs use locks.
If updating a data structure is slow, the lock of choice is a mutex of some kind. These will transfer control to the operating system when they block. This allows another thread to run, and perhaps make progress whilst the first thread sleeps. This transfer of control consists of a pair of context switches, which are quite a slow operation. Thus, if the lock-hold time is expected to be short, then this may not be the fastest method.
Spinlocks
Instead of context switches, a spinlock will "spin", and repeatedly check to see if the lock is unlocked. Spinning is very fast, so the latency between an unlock-lock pair is small. However, spinning doesn't accomplish any work, so may not be as efficient as a sleeping mutex if the time spent becomes significant.
Before we describe the implementation of spin locks, we first need a set of atomic primitives. Fortunately, gcc provides some of these as built-in functions:
#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define atomic_inc(P) __sync_add_and_fetch((P), 1)
#define atomic_dec(P) __sync_add_and_fetch((P), -1)
#define atomic_add(P, V) __sync_add_and_fetch((P), (V))
#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))
Unfortunately, we will require a few others that are not, and so must be implemented in assembly
/* Compile read-write barrier */
#define barrier() asm volatile("": : :"memory")
/* Pause instruction to prevent excess processor bus usage */
#define cpu_relax() asm volatile("pause\n": : :"memory")
/* Atomic exchange (of various sizes) */
static inline void *xchg_64(void *ptr, void *x)
{
__asm__ __volatile__("xchgq %0,%1"
:"=r" ((unsigned long long) x)
:"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)
:"memory");
return x;
}
static inline unsigned xchg_32(void *ptr, unsigned x)
{
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");
return x;
}
static inline unsigned short xchg_16(void *ptr, unsigned short x)
{
__asm__ __volatile__("xchgw %0,%1"
:"=r" ((unsigned short) x)
:"m" (*(volatile unsigned short *)ptr), "0" (x)
:"memory");
return x;
}
/* Test and set a bit */
static inline char atomic_bitsetandtest(void *ptr, int x)
{
char out;
__asm__ __volatile__("lock; bts %2,%1\n"
"sbb %0,%0\n"
:"=r" (out), "=m" (*(volatile long long *)ptr)
:"Ir" (x)
:"memory");
return out;
}
A spinlock can be implemented in an obvious way, using the atomic exchange primitive.
#define EBUSY 1
typedef unsigned spinlock;
static void spin_lock(spinlock *lock)
{
while (1)
{
if (!xchg_32(lock, EBUSY)) return;
while (*lock) cpu_relax();
}
}
static void spin_unlock(spinlock *lock)
{
barrier();
*lock = 0;
}
static int spin_trylock(spinlock *lock)
{
return xchg_32(lock, EBUSY);
}
So how fast is the above code? A simple benchmark to test the overhead of a lock is to have a given number of threads attempting to lock and unlock it, doing a fixed amount of work each time. If the total number of lock-unlock pairs is maintained as the number of threads is increased, it is possible to measure the affect of contention on performance. A good spinlock implementation will be as fast as possible for any given number of threads attempting to use that lock simultaneously.
The results for the above spinlock implementation are:
Threads 1 2 3 4 5
Time (s) 5.5 5.6 5.7 5.7 5.7
These results are pretty good, but can be improved. The problem is that if there are multiple threads contending, then they all attempt to take the lock at the same time once it is released. This results in a huge amount of processor bus traffic, which is a huge performance killer. Thus, if we somehow order the lock-takers so that they know who is next in line for the resource we can vastly reduce the amount of bus traffic.
One spinlock algorithm that does this is called the MCS lock. This uses a list to maintain the order of acquirers.
typedef struct mcs_lock_t mcs_lock_t;
struct mcs_lock_t
{
mcs_lock_t *next;
int spin;
};
typedef struct mcs_lock_t *mcs_lock;
static void lock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
tail = xchg_64(m, me);
/* No one there? */
if (!tail) return;
/* Someone there, need to link in */
tail->next = me;
/* Make sure we do the above setting of next. */
barrier();
/* Spin on my spin variable */
while (!me->spin) cpu_relax();
return;
}
static void unlock_mcs(mcs_lock *m, mcs_lock_t *me)
{
/* No successor yet? */
if (!me->next)
{
/* Try to atomically unlock */
if (cmpxchg(m, me, NULL) == me) return;
/* Wait for successor to appear */
while (!me->next) cpu_relax();
}
/* Unlock next one */
me->next->spin = 1;
}
static int trylock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
/* Try to lock */
tail = cmpxchg(m, NULL, &me);
/* No one was there - can quickly return */
if (!tail) return 0;
return EBUSY;
}
This has quite different timings:
Threads 1 2 3 4 5
Time (s) 3.6 4.4 4.5 4.8 >1min
The MCS lock takes a hugely longer time when the number of threads is greater than the number of processors (four in this case). This is because if the next thread in the queue isn't active when the lock is unlocked, then everyone must wait until the operating system scheduler decides to run it. Every "fair" lock algorithm has this problem. Thus, the simple unfair spinlock still can be quite useful when you don't know that the number of threads is bounded by the number of cpus.
A bigger problem with the MCS lock is its API. It requires a second structure to be passed in addition to the address of the lock. The algorithm uses this second structure to store the information which describes the queue of threads waiting for the lock. Unfortunately, most code written using spinlocks doesn't have this extra information, so the fact that the MCS algorithm isn't a drop-in replacement to a standard spin lock is a problem.
An IBM working group found a way to improve the MCS algorithm to remove the need to pass the extra structure as a parameter. Instead, on-stack information was used instead. The result is the K42 lock algorithm:
typedef struct k42lock k42lock;
struct k42lock
{
k42lock *next;
k42lock *tail;
};
static void k42_lock(k42lock *l)
{
k42lock me;
k42lock *pred, *succ;
me.next = NULL;
barrier();
pred = xchg_64(&l->tail, &me);
if (pred)
{
me.tail = (void *) 1;
barrier();
pred->next = &me;
barrier();
while (me.tail) cpu_relax();
}
succ = me.next;
if (!succ)
{
barrier();
l->next = NULL;
if (cmpxchg(&l->tail, &me, &l->next) != &me)
{
while (!me.next) cpu_relax();
l->next = me.next;
}
}
else
{
l->next = succ;
}
}
static void k42_unlock(k42lock *l)
{
k42lock *succ = l->next;
barrier();
if (!succ)
{
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;
while (!l->next) cpu_relax();
succ = l->next;
}
succ->tail = NULL;
}
static int k42_trylock(k42lock *l)
{
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;
return EBUSY;
}
The timings of the K42 algorithm are as good as, if not better than the MCS lock:
Threads 1 2 3 4 5
Time (s) 3.7 4.8 4.5 4.9 >1min
Unfortunately, the K42 algorithm has another problem. It appears that it may be patented by IBM. Thus it cannot be used either. (Without perhaps paying royalties to IBM.)
One way around this is to use a different type of list. The K42 and MCS locks use lists ordered so that finding the next thread to run is easy, and adding to the end is hard. What about flipping the direction of the pointers so that finding the end is easy, and find who's next hard? The result is the following algorithm:
typedef struct listlock_t listlock_t;
struct listlock_t
{
listlock_t *next;
int spin;
};
typedef struct listlock_t *listlock;
#define LLOCK_FLAG (void *)1
static void listlock_lock(listlock *l)
{
listlock_t me;
listlock_t *tail;
/* Fast path - no users */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;
me.next = LLOCK_FLAG;
me.spin = 0;
/* Convert into a wait list */
tail = xchg_64(l, &me);
if (tail)
{
/* Add myself to the list of waiters */
if (tail == LLOCK_FLAG) tail = NULL;
me.next = tail;
/* Wait for being able to go */
while (!me.spin) cpu_relax();
return;
}
/* Try to convert to an exclusive lock */
if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;
/* Failed - there is now a wait list */
tail = *l;
/* Scan to find who is after me */
while (1)
{
/* Wait for them to enter their next link */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (tail->next == &me)
{
/* Fix their next pointer */
tail->next = NULL;
return;
}
tail = tail->next;
}
}
static void listlock_unlock(listlock *l)
{
listlock_t *tail;
listlock_t *tp;
while (1)
{
tail = *l;
barrier();
/* Fast path */
if (tail == LLOCK_FLAG)
{
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;
continue;
}
tp = NULL;
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
/* There is a wait list */
if (tail->next) break;
/* Try to convert to a single-waiter lock */
if (cmpxchg(l, tail, LLOCK_FLAG) == tail)
{
/* Unlock */
tail->spin = 1;
return;
}
cpu_relax();
}
/* A long list */
tp = tail;
tail = tail->next;
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (!tail->next) break;
tp = tail;
tail = tail->next;
}
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int listlock_trylock(listlock *l)
{
/* Simple part of a spin-lock */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;
/* Failure! */
return EBUSY;
}
This unfortunately is extremely complex, and doesn't perform well either:
Threads 1 2 3 4 5
Time (s) 3.6 5.1 5.8 6.3 >1min
It is still faster than the standard spinlock when contention is low, but once more than two threads are attempting to lock at the same time it is worse, and gets slower from there on.
Another possible trick is to use a spinlock within a spinlock. The first lock can be very light weight since we know it will only be held for a short time. It can then control the locking for the wait list describing the acquirers of the real spinlock. If done right, the number of waiters on the sub-lock can be kept low, and thus minimize bus traffic. The result is:
typedef struct bitlistlock_t bitlistlock_t;
struct bitlistlock_t
{
bitlistlock_t *next;
int spin;
};
typedef bitlistlock_t *bitlistlock;
#define BLL_USED ((bitlistlock_t *) -2LL)
static void bitlistlock_lock(bitlistlock *l)
{
bitlistlock_t me;
bitlistlock_t *tail;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
/* Remove locked bit */
tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);
/* Fast path, no waiters */
if (!tail)
{
/* Set to be a flag value */
*l = BLL_USED;
return;
}
if (tail == BLL_USED) tail = NULL;
me.next = tail;
me.spin = 0;
barrier();
/* Unlock, and add myself to the wait list */
*l = &me;
/* Wait for the go-ahead */
while (!me.spin) cpu_relax();
}
static void bitlistlock_unlock(bitlistlock *l)
{
bitlistlock_t *tail;
bitlistlock_t *tp;
/* Fast path - no wait list */
if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
tp = *l;
barrier();
/* Get end of list */
tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);
/* Actually no users? */
if (tail == BLL_USED)
{
barrier();
*l = NULL;
return;
}
/* Only one entry on wait list? */
if (!tail->next)
{
barrier();
/* Unlock bitlock */
*l = BLL_USED;
barrier();
/* Unlock lock */
tail->spin = 1;
return;
}
barrier();
/* Unlock bitlock */
*l = tail;
barrier();
/* Scan wait list for start */
do
{
tp = tail;
tail = tail->next;
}
while (tail->next);
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int bitlistlock_trylock(bitlistlock *l)
{
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;
return EBUSY;
}
Unfortunately, this is even worse than the previous listlock algorithm. This is only good for the uncontended case.
Threads 1 2 3 4 5
Time (s) 3.6 5.3 6.3 6.8 >1min
Another possibility is to modify some other type of locking algorithm to be a spinlock. The read-write locks from Reactos are designed to be scale extremely well. If the "read" part of them is removed, then the mutual exclusion between the writers will act just like a spinlock. Doing this yields:
/* Bit-lock for editing the wait block */
#define SLOCK_LOCK 1
#define SLOCK_LOCK_BIT 0
/* Has an active user */
#define SLOCK_USED 2
#define SLOCK_BITS 3
typedef struct slock slock;
struct slock
{
uintptr_t p;
};
typedef struct slock_wb slock_wb;
struct slock_wb
{
/*
* last points to the last wait block in the chain.
* The value is only valid when read from the first wait block.
*/
slock_wb *last;
/* next points to the next wait block in the chain. */
slock_wb *next;
/* Wake up? */
int wake;
};
/* Wait for control of wait block */
static slock_wb *slockwb(slock *s)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))
{
cpu_relax();
}
p = s->p;
if (p <= SLOCK_BITS)
{
/* Oops, looks like the wait block was removed. */
atomic_dec(&s->p);
return NULL;
}
return (slock_wb *)(p - SLOCK_LOCK);
}
static void slock_lock(slock *s)
{
slock_wb swblock;
/* Fastpath - no other readers or writers */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;
/* Initialize wait block */
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = 0;
while (1)
{
uintptr_t p = s->p;
cpu_relax();
/* Fastpath - no other readers or writers */
if (!p)
{
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;
continue;
}
if (p > SLOCK_BITS)
{
slock_wb *first_wb, *last;
first_wb = slockwb(s);
if (!first_wb) continue;
last = first_wb->last;
last->next = &swblock;
first_wb->last = &swblock;
/* Unlock */
barrier();
s->p &= ~SLOCK_LOCK;
break;
}
/* Try to add the first wait block */
if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;
}
/* Wait to acquire exclusive lock */
while (!swblock.wake) cpu_relax();
}
static void slock_unlock(slock *s)
{
slock_wb *next;
slock_wb *wb;
uintptr_t np;
while (1)
{
uintptr_t p = s->p;
/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */
if (p == SLOCK_USED)
{
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;
continue;
}
/* There's a wait block, we need to wake the next pending user */
wb = slockwb(s);
if (wb) break;
cpu_relax();
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t) next;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
np = SLOCK_USED;
}
barrier();
/* Also unlocks lock bit */
s->p = np;
barrier();
/* Notify the next waiter */
wb->wake = 1;
/* We released the lock */
}
static int slock_trylock(slock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;
return EBUSY;
}
Again, this algorithm disappoints. The results are similar to the bitlistlock algorithm. This isn't surprising, as the wait-block that controls the waiter list is synchronized by a bit lock.
Threads 1 2 3 4 5
Time (s) 3.7 5.1 5.8 6.5 >1min
Time to think laterally. One of the problems with the above algorithms is synchronization of the wait list. The core issue is that we need some way to recognize the head and tail of that list. The head of the list is needed to add a new waiter. The tail is needed to decide who is to go next. The MCS lock used the extra structure information so that the list tail could be quickly found. The K42 Lock used the patented method of storing the tail in a second list pointer within the lock itself.
There is another trick we can do though. If the extra information is allocated on the stack, then it may be possible to recognize that a pointer is pointing within our own stack frame. If so, then we can use that information within the algorithm to decide where the wait list ends. The result is the stack-lock algorithm:
typedef struct stlock_t stlock_t;
struct stlock_t
{
stlock_t *next;
};
typedef struct stlock_t *stlock;
static __attribute__((noinline)) void stlock_lock(stlock *l)
{
stlock_t *me = NULL;
barrier();
me = xchg_64(l, &me);
/* Wait until we get the lock */
while (me) cpu_relax();
}
#define MAX_STACK_SIZE (1<<12)
static __attribute__((noinline)) int on_stack(void *p)
{
int x;
uintptr_t u = (uintptr_t) &x;
return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2);
}
static __attribute__((noinline)) void stlock_unlock(stlock *l)
{
stlock_t *tail = *l;
barrier();
/* Fast case */
if (on_stack(tail))
{
/* Try to remove the wait list */
if (cmpxchg(l, tail, NULL) == tail) return;
tail = *l;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (on_stack(tail->next)) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int stlock_trylock(stlock *l)
{
stlock_t me;
if (!cmpxchg(l, NULL, &me)) return 0;
return EBUSY;
}
This algorithm is quite a bit simpler if you know that a thread's stack is aligned a certain way. (Then the stack-check turns into an XOR and a mask operation.) Unfortunately, it is still quite slow.
Threads 1 2 3 4 5
Time (s) 3.6 5.3 5.7 6.2 >1min
The lock operation above looks to be fairly efficient, it is the unlock routine that is slow and complex. Perhaps if we save a little more information within the lock itself, then the unlock operation can be made faster. Since quite a bit of time seems to be spent finding the previous node to ourselves (which is the one to wake up), it might be better to do that while we are spinning waiting for our turn to take the lock. If we save this previous point within the lock, we then will not need to calculate it within the unlock routine.
typedef struct plock_t plock_t;
struct plock_t
{
plock_t *next;
};
typedef struct plock plock;
struct plock
{
plock_t *next;
plock_t *prev;
plock_t *last;
};
static void plock_lock(plock *l)
{
plock_t *me = NULL;
plock_t *prev;
barrier();
me = xchg_64(l, &me);
prev = NULL;
/* Wait until we get the lock */
while (me)
{
/* Scan wait list for my previous */
if (l->next != (plock_t *) &me)
{
plock_t *t = l->next;
while (me)
{
if (t->next == (plock_t *) &me)
{
prev = t;
while (me) cpu_relax();
goto done;
}
if (t->next) t = t->next;
cpu_relax();
}
}
cpu_relax();
}
done:
l->prev = prev;
l->last = (plock_t *) &me;
}
static void plock_unlock(plock *l)
{
plock_t *tail;
/* Do I know my previous? */
if (l->prev)
{
/* Unlock */
l->prev->next = NULL;
return;
}
tail = l->next;
barrier();
/* Fast case */
if (tail == l->last)
{
/* Try to remove the wait list */
if (cmpxchg(&l->next, tail, NULL) == tail) return;
tail = l->next;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (tail->next == l->last) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int plock_trylock(plock *l)
{
plock_t me;
if (!cmpxchg(&l->next, NULL, &me))
{
l->last = &me;
return 0;
}
return EBUSY;
}
This starts regaining some of the speed we have lost, but still isn't quite as good as the K42 algorithm. (It is however, always faster than the original naive spinlock provided that the number of threads is less than the number of processors.)
Threads 1 2 3 4 5
Time (s) 3.7 5.1 5.3 5.4 >1min
A careful reading of the plock algorithm shows that it can be improved even more. We don't actually need to know the pointer value of the next waiter. Some other unique value will do instead. Instead of saving a pointer, we can use a counter that we increment. If a waiter knows which counter value corresponds to its turn, then it just needs to wait until that value appears. The result is called the ticket lock algorithm:
typedef union ticketlock ticketlock;
union ticketlock
{
unsigned u;
struct
{
unsigned short ticket;
unsigned short users;
} s;
};
static void ticket_lock(ticketlock *t)
{
unsigned short me = atomic_xadd(&t->s.users, 1);
while (t->s.ticket != me) cpu_relax();
}
static void ticket_unlock(ticketlock *t)
{
barrier();
t->s.ticket++;
}
static int ticket_trylock(ticketlock *t)
{
unsigned short me = t->s.users;
unsigned short menew = me + 1;
unsigned cmp = ((unsigned) me << 16) + me;
unsigned cmpnew = ((unsigned) me << 16) + menew;
if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static int ticket_lockable(ticketlock *t)
{
ticketlock u = *t;
barrier();
return (u.s.ticket == u.s.users);
}
The above algorithm is extremely fast, and beats all the other fair-locks described.
Threads 1 2 3 4 5
Time (s) 3.6 4.4 4.5 4.8 >1min
In fact, this is the spinlock algorithm used in the Linux kernel, although for extra speed, the kernel version is written in assembly language rather than the semi-portable C shown above. Also note that the above code depends on the endianness of the computer architecture. It is designed for little-endian machines. Big endian processors will require a swap of the two fields within the structure in the union.
The ticket lock shows that an oft-repeated fallacy is untrue. Many of the above fair-lock algorithms are meant to scale well because the waiters are spinning on different memory locations. This is meant to reduce bus traffic and thus increase performance. However, it appears that that effect is small. The more important thing is to make sure that the waiters are ordered by who gets to take the lock next. This is what the ticket lock does admirably. The fact that multiple waiters are spinning on the same ticket lock location does not seem to be a performance drain.
Read Write Locks
Quite often, some users of a data structure will make no modifications to it. They just require read access to its fields to do their work. If multiple threads require read access to the same data, there is no reason why they should not be able to execute simultaneously. Spinlocks don't differentiate between read and read/write access. Thus spinlocks do not exploit this potential parallelism. To do so, read-write locks are required.
The simplest read-write lock uses a spinlock to control write access, and a counter field for the readers.
typedef struct dumbrwlock dumbrwlock;
struct dumbrwlock
{
spinlock lock;
unsigned readers;
};
static void dumb_wrlock(dumbrwlock *l)
{
/* Get write lock */
spin_lock(&l->lock);
/* Wait for readers to finish */
while (l->readers) cpu_relax();
}
static void dumb_wrunlock(dumbrwlock *l)
{
spin_unlock(&l->lock);
}
static int dumb_wrtrylock(dumbrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (spin_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
spin_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumb_rdlock(dumbrwlock *l)
{
while (1)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return;
/* Failure - undo, and wait until we can try again */
atomic_dec(&l->readers);
while (l->lock) cpu_relax();
}
}
static void dumb_rdunlock(dumbrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumb_rdtrylock(dumbrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumb_rdupgradelock(dumbrwlock *l)
{
/* Try to convert into a write lock */
if (spin_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
The benchmark the above code, we need a little more information than the spinlock case. The fraction of readers is important. The more readers, the more parallelism we should get, and the faster the code should run. It is also important to have a random distribution of readers and writers, just like real-world situations. Thus a parallel random number generator is used. By selecting a random byte, and choosing 1, 25, 128, or 250 out of 256 possibilities to be a writer we can explore the mostly-reader case through to where most users of the lock are writers. Finally, it is important to find out the effects of contention. In general, read-write locks tend to be used where contention is high, so we will mostly look at the case where the number of threads is equal to the number of processors.
The dumb lock above performs fairly poorly when there is no contention. If one thread is used we get:
Writers per 256 1 25 128 250
Time (s) 3.7 3.8 4.6 5.4
As expected, we asymptote to the relatively slow timings of the standard spinlock algorithm as the write fraction increases. If there is contention, however, the dumb lock actually performs quite well. Using four threads:
Writers per 256 1 25 128 250
Time (s) 1.1 1.9 4.4 5.7
The obvious thing to do to try to gain speed would be to replace the slow spinlock with a ticketlock algorithm. If this is done, we have:
typedef struct dumbtrwlock dumbtrwlock;
struct dumbtrwlock
{
ticketlock lock;
unsigned readers;
};
static void dumbt_wrlock(dumbtrwlock *l)
{
/* Get lock */
ticket_lock(&l->lock);
/* Wait for readers to finish */
while (l->readers) cpu_relax();
}
static void dumbt_wrunlock(dumbtrwlock *l)
{
ticket_unlock(&l->lock);
}
static int dumbt_wrtrylock(dumbtrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
ticket_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumbt_rdlock(dumbtrwlock *l)
{
while (1)
{
/* Success? */
if (ticket_lockable(&l->lock))
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return;
/* Failure - undo, and wait until we can try again */
atomic_dec(&l->readers);
}
while (!ticket_lockable(&l->lock)) cpu_relax();
}
}
static void dumbt_rdunlock(dumbtrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumbt_rdtrylock(dumbtrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumbt_rdupgradelock(dumbtrwlock *l)
{
/* Try to convert into a write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
This performs much better in the uncontended case, taking 3.7 seconds for all write fractions. It is surprising that it doesn't beat the contended case though:
Writers per 256 1 25 128 250
Time (s) 2.0 2.5 3.7 4.5
This is slower for low write fractions, and faster for large write fractions. Since most of the time we use a read-write lock when the write fraction is low, this is really bad for this algorithm, which can be twice as slow as its competitor.
To try to reduce contention, and to gain speed, lets explore the rather complex algorithm used in Reactos to emulate Microsoft Window's slim read-write (SRW) locks. This uses a wait list, with a bitlock to control access to the wait list data structure. It is designed so that waiters will spin on separate memory locations for extra scalability.
/* Have a wait block */
#define SRWLOCK_WAIT 1
/* Users are readers */
#define SRWLOCK_SHARED 2
/* Bit-lock for editing the wait block */
#define SRWLOCK_LOCK 4
#define SRWLOCK_LOCK_BIT 2
/* Mask for the above bits */
#define SRWLOCK_MASK 7
/* Number of current users * 8 */
#define SRWLOCK_USERS 8
typedef struct srwlock srwlock;
struct srwlock
{
uintptr_t p;
};
typedef struct srw_sw srw_sw;
struct srw_sw
{
uintptr_t spin;
srw_sw *next;
};
typedef struct srw_wb srw_wb;
struct srw_wb
{
/* s_count is the number of shared acquirers * SRWLOCK_USERS. */
uintptr_t s_count;
/* Last points to the last wait block in the chain. The value
is only valid when read from the first wait block. */
srw_wb *last;
/* Next points to the next wait block in the chain. */
srw_wb *next;
/* The wake chain is only valid for shared wait blocks */
srw_sw *wake;
srw_sw *last_shared;
int ex;
};
/* Wait for control of wait block */
static srw_wb *lock_wb(srwlock *l)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&l->p, SRWLOCK_LOCK_BIT)) cpu_relax();
p = l->p;
barrier();
if (!(p & SRWLOCK_WAIT))
{
/* Oops, looks like the wait block was removed. */
atomic_clear_bit(&l->p, SRWLOCK_LOCK_BIT);
return NULL;
}
return (srw_wb *)(p & ~SRWLOCK_MASK);
}
static void srwlock_init(srwlock *l)
{
l->p = 0;
}
static void srwlock_rdlock(srwlock *l)
{
srw_wb swblock;
srw_sw sw;
uintptr_t p;
srw_wb *wb, *shared;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (!p)
{
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
if (p & SRWLOCK_SHARED)
{
if (!(p & SRWLOCK_WAIT))
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&l->p, p, p + SRWLOCK_USERS) == p) return;
}
else
{
/* There's other waiters already, lock the wait blocks and increment the shared count */
wb = lock_wb(l);
if (wb) break;
}
continue;
}
/* Initialize wait block */
swblock.ex = FALSE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
sw.next = NULL;
sw.spin = 0;
if (!(p & SRWLOCK_WAIT))
{
/*
* We need to setup the first wait block.
* Currently an exclusive lock is held, change the lock to contended mode.
*/
swblock.s_count = SRWLOCK_USERS;
swblock.last_shared = &sw;
if (cmpxchg(&l->p, p, (uintptr_t)&swblock | SRWLOCK_WAIT) == p)
{
while (!sw.spin) cpu_relax();
return;
}
continue;
}
/* Handle the contended but not shared case */
/*
* There's other waiters already, lock the wait blocks and increment the shared count.
* If the last block in the chain is an exclusive lock, add another block.
*/
swblock.s_count = 0;
wb = lock_wb(l);
if (!wb) continue;
shared = wb->last;
if (shared->ex)
{
shared->next = &swblock;
wb->last = &swblock;
shared = &swblock;
}
else
{
shared->last_shared->next = &sw;
}
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
return;
}
/* The contended and shared case */
sw.next = NULL;
sw.spin = 0;
if (wb->ex)
{
/*
* We need to setup a new wait block.
* Although we're currently in a shared lock and we're acquiring
* a shared lock, there are exclusive locks queued in between.
* We need to wait until those are released.
*/
shared = wb->last;
if (shared->ex)
{
swblock.ex = FALSE;
swblock.s_count = SRWLOCK_USERS;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
swblock.last_shared = &sw;
shared->next = &swblock;
wb->last = &swblock;
}
else
{
shared->last_shared->next = &sw;
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
}
}
else
{
wb->last_shared->next = &sw;
wb->s_count += SRWLOCK_USERS;
wb->last_shared = &sw;
}
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
}
static void srwlock_rdunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
/*
* There's a wait block, we need to wake a pending exclusive acquirer,
* if this is the last shared release.
*/
wb = lock_wb(l);
if (wb) break;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/*
* This is a fast path, we can simply decrement the shared
* count and store the pointer
*/
np = p - SRWLOCK_USERS;
/* If we are the last reader, then the lock is unused */
if (np == SRWLOCK_SHARED) np = 0;
/* Try to release the lock */
if (cmpxchg(&l->p, p, np) == p) return;
}
wb->s_count -= SRWLOCK_USERS;
if (wb->s_count)
{
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
return;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple exclusive lock. */
np = SRWLOCK_USERS;
}
barrier();
/* This also unlocks wb lock bit */
l->p = np;
barrier();
wb->wake = (void *) 1;
barrier();
/* We released the lock */
}
static int srwlock_rdtrylock(srwlock *s)
{
uintptr_t p = s->p;
barrier();
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!p && (cmpxchg(&s->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED) == 0)) return 0;
if ((p & (SRWLOCK_SHARED | SRWLOCK_WAIT)) == SRWLOCK_SHARED)
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&s->p, p, p + SRWLOCK_USERS) == p) return 0;
}
return EBUSY;
}
static void srwlock_wrlock(srwlock *l)
{
srw_wb swblock;
uintptr_t p, np;
/* Fastpath - no other readers or writers */
if (!l->p && (!cmpxchg(&l->p, 0, SRWLOCK_USERS))) return;
/* Initialize wait block */
swblock.ex = TRUE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = NULL;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
srw_wb *wb = lock_wb(l);
if (!wb) continue;
/* Complete Initialization of block */
swblock.s_count = 0;
wb->last->next = &swblock;
wb->last = &swblock;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
return;
}
/* Fastpath - no other readers or writers */
if (!p)
{
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/* There are no wait blocks so far, we need to add ourselves as the first wait block. */
if (p & SRWLOCK_SHARED)
{
swblock.s_count = p & ~SRWLOCK_MASK;
np = (uintptr_t)&swblock | SRWLOCK_SHARED | SRWLOCK_WAIT;
}
else
{
swblock.s_count = 0;
np = (uintptr_t)&swblock | SRWLOCK_WAIT;
}
/* Try to make change */
if (cmpxchg(&l->p, p, np) == p) break;
}
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
}
static void srwlock_wrunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
srw_sw *wake, *wake_next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p == SRWLOCK_USERS)
{
/*
* This is the fast path, we can simply clear the SRWLOCK_USERS bit.
* All other bits should be 0 now because this is a simple exclusive lock,
* and no one else is waiting.
*/
if (cmpxchg(&l->p, SRWLOCK_USERS, 0) == SRWLOCK_USERS) return;
continue;
}
/* There's a wait block, we need to wake the next pending acquirer */
wb = lock_wb(l);
if (wb) break;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
if (!wb->ex)
{
/* Save the shared count */
next->s_count = wb->s_count;
np |= SRWLOCK_SHARED;
}
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
if (wb->ex)
{
np = SRWLOCK_USERS;
}
else
{
np = wb->s_count | SRWLOCK_SHARED;
}
}
barrier();
/* Also unlocks lock bit */
l->p = np;
barrier();
if (wb->ex)
{
barrier();
/* Notify the next waiter */
wb->wake = (void *) 1;
barrier();
return;
}
/* We now need to wake all others required. */
for (wake = wb->wake; wake; wake = wake_next)
{
barrier();
wake_next = wake->next;
barrier();
wake->spin = 1;
barrier();
}
}
static int srwlock_wrtrylock(srwlock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SRWLOCK_USERS) == 0)) return 0;
return EBUSY;
}
The above code is not exactly the code in Reactos. It has been simplified and cleaned up somewhat. One of the controlling bit flags has been removed, and replaced with altered control flow. So how does it perform? In the uncontended case, it is just like the dumb ticket-based read-write lock, and takes 3.7 seconds for all cases. For the contended case with four threads:
Writers per 256 1 25 128 250
Time (s) 2.2 3.2 5.7 6.4
This is quite bad, slower than the dumb lock in all contended cases. The extra complexity simply isn't worth any performance gain.
Another possibility is to combine the reader count with some bits describing the state of the writers. A similar technique is used by the Linux kernel to describe its (reader-preferring) read-write locks. Making the lock starvation-proof for writers instead, yields something like the following:
#define RW_WAIT_BIT 0
#define RW_WRITE_BIT 1
#define RW_READ_BIT 2
#define RW_WAIT 1
#define RW_WRITE 2
#define RW_READ 4
typedef unsigned rwlock;
static void wrlock(rwlock *l)
{
while (1)
{
unsigned state = *l;
/* No readers or writers? */
if (state < RW_WRITE)
{
/* Turn off RW_WAIT, and turn on RW_WRITE */
if (cmpxchg(l, state, RW_WRITE) == state) return;
/* Someone else got there... time to wait */
state = *l;
}
/* Turn on writer wait bit */
if (!(state & RW_WAIT)) atomic_set_bit(l, RW_WAIT_BIT);
/* Wait until can try to take the lock */
while (*l > RW_WAIT) cpu_relax();
}
}
static void wrunlock(rwlock *l)
{
atomic_add(l, -RW_WRITE);
}
static int wrtrylock(rwlock *l)
{
unsigned state = *l;
if ((state < RW_WRITE) && (cmpxchg(l, state, state + RW_WRITE) == state)) return 0;
return EBUSY;
}
static void rdlock(rwlock *l)
{
while (1)
{
/* A writer exists? */
while (*l & (RW_WAIT | RW_WRITE)) cpu_relax();
/* Try to get read lock */
if (!(atomic_xadd(l, RW_READ) & (RW_WAIT | RW_WRITE))) return;
/* Undo */
atomic_add(l, -RW_READ);
}
}
static void rdunlock(rwlock *l)
{
atomic_add(l, -RW_READ);
}
static int rdtrylock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
if (!(state & (RW_WAIT | RW_WRITE))) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Get a read lock, even if a writer is waiting */
static int rdforcelock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
/* We succeed even if a writer is waiting */
if (!(state & RW_WRITE)) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Try to upgrade from a read to a write lock atomically */
static int rdtryupgradelock(rwlock *l)
{
/* Someone else is trying (and will succeed) to upgrade to a write lock? */
if (atomic_bitsetandtest(l, RW_WRITE_BIT)) return EBUSY;
/* Don't count myself any more */
atomic_add(l, -RW_READ);
/* Wait until there are no more readers */
while (*l > (RW_WAIT | RW_WRITE)) cpu_relax();
return 0;
}
This lock unfortunately, has a similar performance to the dumb lock using a ticket lock as its spinlock.
Writers per 256 1 25 128 250
Time (s) 2.0 3.4 3.9 4.6
The version in the Linux kernel is written in assembler, so may be a fair bit faster. It uses the fact that the atomic add instruction can set the zero flag. This means that the slower add-and-test method isn't needed, and a two-instruction fast path is used instead.
Sticking to semi-portable C code, we can still do a little better. There exists a form of the ticket lock that is designed for read-write locks. An example written in assembly was posted to the Linux kernel mailing list in 2002 by David Howells from RedHat. This was a highly optimized version of a read-write ticket lock developed at IBM in the early 90's by Joseph Seigh. Note that a similar (but not identical) algorithm was published by John Mellor-Crummey and Michael Scott in their landmark paper "Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors". Converting the algorithm from assembly language to C yields:
typedef union rwticket rwticket;
union rwticket
{
unsigned u;
unsigned short us;
__extension__ struct
{
unsigned char write;
unsigned char read;
unsigned char users;
} s;
};
static void rwticket_wrlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.write) cpu_relax();
}
static void rwticket_wrunlock(rwticket *l)
{
rwticket t = *l;
barrier();
t.s.write++;
t.s.read++;
*(unsigned short *) l = t.us;
}
static int rwticket_wrtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned char menew = me + 1;
unsigned read = l->s.read << 8;
unsigned cmp = (me << 16) + read + me;
unsigned cmpnew = (me << 16) + read + menew;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static void rwticket_rdlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.read) cpu_relax();
l->s.read++;
}
static void rwticket_rdunlock(rwticket *l)
{
atomic_inc(&l->s.write);
}
static int rwticket_rdtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned write = l->s.write;
unsigned char menew = me + 1;
unsigned cmp = (me << 16) + (me << 8) + write;
unsigned cmpnew = ((unsigned) menew << 16) + (menew << 8) + write;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
This read-write lock performs extremely well. It is as fast as the dumb spinlock rwlock for low writer fraction, and nearly as fast as the dumb ticketlock rwlock for large number of writers. It also doesn't suffer any slowdown when there is no contention, taking 3.7 seconds for all cases. With contention:
Writers per 256 1 25 128 250
Time (s) 1.1 1.8 3.9 4.7
This algorithm is five times faster than using a simple spin lock for the reader-dominated case. Its only drawback is that it is difficult to upgrade read locks into write locks atomically. (It can be done, but then rwticket_wrunlock() needs to use an atomic instruction, and the resulting code becomes quite a bit slower.) This drawback is the reason why this algorithm is not used within the Linux kernel. Some parts depend on the fact that if you have a read lock, then acquiring a new read lock recursively will always succeed. However, if that requirement were to be removed, this algorithm probably would be a good replacement.
One final thing to note is that the read-write ticket lock is not optimal. The problem is the situation where readers and writers alternate in the wait queue: writer (executing), reader 1, writer, reader 2. The two reader threads can be shuffled so that they execute in parallel. i.e. the second reader should probably not have to wait until the second writer finishes to execute. Fortunately, this situation is encountered rarely when the thread count is low. For four processors and threads, this happens one time in 16 if readers and writers are equally likely, and less often otherwise. Unfortunately, as the number of threads increases this will lead to an asymptotic factor of two slowdown compared with the optimal ordering.
The obvious thing to do to fix this is to add a test to see if readers should be reordered in wait-order. However, since the effect is so rare with four concurrent threads, it is extremely hard (if not impossible) to add the check with a low enough overhead that the result is a performance win. Thus it seems that the problem of exactly which algorithm is best will need to be revisited when larger multicore machines become common.
Most parallel programming in some way will involve the use of locking at the lowest levels. Locks are primitives that provide mutual exclusion that allow data structures to remain in consistent states. Without locking, multiple threads of execution may simultaneously modify a data structure. Without a carefully thought out (and usually complex) lock-free algorithm, the result is usually a crash of hang as unintended program states are entered. Since the creation of a lock-free algorithm is extremely difficult, most programs use locks.
If updating a data structure is slow, the lock of choice is a mutex of some kind. These will transfer control to the operating system when they block. This allows another thread to run, and perhaps make progress whilst the first thread sleeps. This transfer of control consists of a pair of context switches, which are quite a slow operation. Thus, if the lock-hold time is expected to be short, then this may not be the fastest method.
Spinlocks
Instead of context switches, a spinlock will "spin", and repeatedly check to see if the lock is unlocked. Spinning is very fast, so the latency between an unlock-lock pair is small. However, spinning doesn't accomplish any work, so may not be as efficient as a sleeping mutex if the time spent becomes significant.
Before we describe the implementation of spin locks, we first need a set of atomic primitives. Fortunately, gcc provides some of these as built-in functions:
#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define atomic_inc(P) __sync_add_and_fetch((P), 1)
#define atomic_dec(P) __sync_add_and_fetch((P), -1)
#define atomic_add(P, V) __sync_add_and_fetch((P), (V))
#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))
Unfortunately, we will require a few others that are not, and so must be implemented in assembly
/* Compile read-write barrier */
#define barrier() asm volatile("": : :"memory")
/* Pause instruction to prevent excess processor bus usage */
#define cpu_relax() asm volatile("pause\n": : :"memory")
/* Atomic exchange (of various sizes) */
static inline void *xchg_64(void *ptr, void *x)
{
__asm__ __volatile__("xchgq %0,%1"
:"=r" ((unsigned long long) x)
:"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)
:"memory");
return x;
}
static inline unsigned xchg_32(void *ptr, unsigned x)
{
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");
return x;
}
static inline unsigned short xchg_16(void *ptr, unsigned short x)
{
__asm__ __volatile__("xchgw %0,%1"
:"=r" ((unsigned short) x)
:"m" (*(volatile unsigned short *)ptr), "0" (x)
:"memory");
return x;
}
/* Test and set a bit */
static inline char atomic_bitsetandtest(void *ptr, int x)
{
char out;
__asm__ __volatile__("lock; bts %2,%1\n"
"sbb %0,%0\n"
:"=r" (out), "=m" (*(volatile long long *)ptr)
:"Ir" (x)
:"memory");
return out;
}
A spinlock can be implemented in an obvious way, using the atomic exchange primitive.
#define EBUSY 1
typedef unsigned spinlock;
static void spin_lock(spinlock *lock)
{
while (1)
{
if (!xchg_32(lock, EBUSY)) return;
while (*lock) cpu_relax();
}
}
static void spin_unlock(spinlock *lock)
{
barrier();
*lock = 0;
}
static int spin_trylock(spinlock *lock)
{
return xchg_32(lock, EBUSY);
}
So how fast is the above code? A simple benchmark to test the overhead of a lock is to have a given number of threads attempting to lock and unlock it, doing a fixed amount of work each time. If the total number of lock-unlock pairs is maintained as the number of threads is increased, it is possible to measure the affect of contention on performance. A good spinlock implementation will be as fast as possible for any given number of threads attempting to use that lock simultaneously.
The results for the above spinlock implementation are:
Threads 1 2 3 4 5
Time (s) 5.5 5.6 5.7 5.7 5.7
These results are pretty good, but can be improved. The problem is that if there are multiple threads contending, then they all attempt to take the lock at the same time once it is released. This results in a huge amount of processor bus traffic, which is a huge performance killer. Thus, if we somehow order the lock-takers so that they know who is next in line for the resource we can vastly reduce the amount of bus traffic.
One spinlock algorithm that does this is called the MCS lock. This uses a list to maintain the order of acquirers.
typedef struct mcs_lock_t mcs_lock_t;
struct mcs_lock_t
{
mcs_lock_t *next;
int spin;
};
typedef struct mcs_lock_t *mcs_lock;
static void lock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
tail = xchg_64(m, me);
/* No one there? */
if (!tail) return;
/* Someone there, need to link in */
tail->next = me;
/* Make sure we do the above setting of next. */
barrier();
/* Spin on my spin variable */
while (!me->spin) cpu_relax();
return;
}
static void unlock_mcs(mcs_lock *m, mcs_lock_t *me)
{
/* No successor yet? */
if (!me->next)
{
/* Try to atomically unlock */
if (cmpxchg(m, me, NULL) == me) return;
/* Wait for successor to appear */
while (!me->next) cpu_relax();
}
/* Unlock next one */
me->next->spin = 1;
}
static int trylock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
/* Try to lock */
tail = cmpxchg(m, NULL, &me);
/* No one was there - can quickly return */
if (!tail) return 0;
return EBUSY;
}
This has quite different timings:
Threads 1 2 3 4 5
Time (s) 3.6 4.4 4.5 4.8 >1min
The MCS lock takes a hugely longer time when the number of threads is greater than the number of processors (four in this case). This is because if the next thread in the queue isn't active when the lock is unlocked, then everyone must wait until the operating system scheduler decides to run it. Every "fair" lock algorithm has this problem. Thus, the simple unfair spinlock still can be quite useful when you don't know that the number of threads is bounded by the number of cpus.
A bigger problem with the MCS lock is its API. It requires a second structure to be passed in addition to the address of the lock. The algorithm uses this second structure to store the information which describes the queue of threads waiting for the lock. Unfortunately, most code written using spinlocks doesn't have this extra information, so the fact that the MCS algorithm isn't a drop-in replacement to a standard spin lock is a problem.
An IBM working group found a way to improve the MCS algorithm to remove the need to pass the extra structure as a parameter. Instead, on-stack information was used instead. The result is the K42 lock algorithm:
typedef struct k42lock k42lock;
struct k42lock
{
k42lock *next;
k42lock *tail;
};
static void k42_lock(k42lock *l)
{
k42lock me;
k42lock *pred, *succ;
me.next = NULL;
barrier();
pred = xchg_64(&l->tail, &me);
if (pred)
{
me.tail = (void *) 1;
barrier();
pred->next = &me;
barrier();
while (me.tail) cpu_relax();
}
succ = me.next;
if (!succ)
{
barrier();
l->next = NULL;
if (cmpxchg(&l->tail, &me, &l->next) != &me)
{
while (!me.next) cpu_relax();
l->next = me.next;
}
}
else
{
l->next = succ;
}
}
static void k42_unlock(k42lock *l)
{
k42lock *succ = l->next;
barrier();
if (!succ)
{
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;
while (!l->next) cpu_relax();
succ = l->next;
}
succ->tail = NULL;
}
static int k42_trylock(k42lock *l)
{
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;
return EBUSY;
}
The timings of the K42 algorithm are as good as, if not better than the MCS lock:
Threads 1 2 3 4 5
Time (s) 3.7 4.8 4.5 4.9 >1min
Unfortunately, the K42 algorithm has another problem. It appears that it may be patented by IBM. Thus it cannot be used either. (Without perhaps paying royalties to IBM.)
One way around this is to use a different type of list. The K42 and MCS locks use lists ordered so that finding the next thread to run is easy, and adding to the end is hard. What about flipping the direction of the pointers so that finding the end is easy, and find who's next hard? The result is the following algorithm:
typedef struct listlock_t listlock_t;
struct listlock_t
{
listlock_t *next;
int spin;
};
typedef struct listlock_t *listlock;
#define LLOCK_FLAG (void *)1
static void listlock_lock(listlock *l)
{
listlock_t me;
listlock_t *tail;
/* Fast path - no users */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;
me.next = LLOCK_FLAG;
me.spin = 0;
/* Convert into a wait list */
tail = xchg_64(l, &me);
if (tail)
{
/* Add myself to the list of waiters */
if (tail == LLOCK_FLAG) tail = NULL;
me.next = tail;
/* Wait for being able to go */
while (!me.spin) cpu_relax();
return;
}
/* Try to convert to an exclusive lock */
if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;
/* Failed - there is now a wait list */
tail = *l;
/* Scan to find who is after me */
while (1)
{
/* Wait for them to enter their next link */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (tail->next == &me)
{
/* Fix their next pointer */
tail->next = NULL;
return;
}
tail = tail->next;
}
}
static void listlock_unlock(listlock *l)
{
listlock_t *tail;
listlock_t *tp;
while (1)
{
tail = *l;
barrier();
/* Fast path */
if (tail == LLOCK_FLAG)
{
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;
continue;
}
tp = NULL;
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
/* There is a wait list */
if (tail->next) break;
/* Try to convert to a single-waiter lock */
if (cmpxchg(l, tail, LLOCK_FLAG) == tail)
{
/* Unlock */
tail->spin = 1;
return;
}
cpu_relax();
}
/* A long list */
tp = tail;
tail = tail->next;
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (!tail->next) break;
tp = tail;
tail = tail->next;
}
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int listlock_trylock(listlock *l)
{
/* Simple part of a spin-lock */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;
/* Failure! */
return EBUSY;
}
This unfortunately is extremely complex, and doesn't perform well either:
Threads 1 2 3 4 5
Time (s) 3.6 5.1 5.8 6.3 >1min
It is still faster than the standard spinlock when contention is low, but once more than two threads are attempting to lock at the same time it is worse, and gets slower from there on.
Another possible trick is to use a spinlock within a spinlock. The first lock can be very light weight since we know it will only be held for a short time. It can then control the locking for the wait list describing the acquirers of the real spinlock. If done right, the number of waiters on the sub-lock can be kept low, and thus minimize bus traffic. The result is:
typedef struct bitlistlock_t bitlistlock_t;
struct bitlistlock_t
{
bitlistlock_t *next;
int spin;
};
typedef bitlistlock_t *bitlistlock;
#define BLL_USED ((bitlistlock_t *) -2LL)
static void bitlistlock_lock(bitlistlock *l)
{
bitlistlock_t me;
bitlistlock_t *tail;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
/* Remove locked bit */
tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);
/* Fast path, no waiters */
if (!tail)
{
/* Set to be a flag value */
*l = BLL_USED;
return;
}
if (tail == BLL_USED) tail = NULL;
me.next = tail;
me.spin = 0;
barrier();
/* Unlock, and add myself to the wait list */
*l = &me;
/* Wait for the go-ahead */
while (!me.spin) cpu_relax();
}
static void bitlistlock_unlock(bitlistlock *l)
{
bitlistlock_t *tail;
bitlistlock_t *tp;
/* Fast path - no wait list */
if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
tp = *l;
barrier();
/* Get end of list */
tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);
/* Actually no users? */
if (tail == BLL_USED)
{
barrier();
*l = NULL;
return;
}
/* Only one entry on wait list? */
if (!tail->next)
{
barrier();
/* Unlock bitlock */
*l = BLL_USED;
barrier();
/* Unlock lock */
tail->spin = 1;
return;
}
barrier();
/* Unlock bitlock */
*l = tail;
barrier();
/* Scan wait list for start */
do
{
tp = tail;
tail = tail->next;
}
while (tail->next);
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int bitlistlock_trylock(bitlistlock *l)
{
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;
return EBUSY;
}
Unfortunately, this is even worse than the previous listlock algorithm. This is only good for the uncontended case.
Threads 1 2 3 4 5
Time (s) 3.6 5.3 6.3 6.8 >1min
Another possibility is to modify some other type of locking algorithm to be a spinlock. The read-write locks from Reactos are designed to be scale extremely well. If the "read" part of them is removed, then the mutual exclusion between the writers will act just like a spinlock. Doing this yields:
/* Bit-lock for editing the wait block */
#define SLOCK_LOCK 1
#define SLOCK_LOCK_BIT 0
/* Has an active user */
#define SLOCK_USED 2
#define SLOCK_BITS 3
typedef struct slock slock;
struct slock
{
uintptr_t p;
};
typedef struct slock_wb slock_wb;
struct slock_wb
{
/*
* last points to the last wait block in the chain.
* The value is only valid when read from the first wait block.
*/
slock_wb *last;
/* next points to the next wait block in the chain. */
slock_wb *next;
/* Wake up? */
int wake;
};
/* Wait for control of wait block */
static slock_wb *slockwb(slock *s)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))
{
cpu_relax();
}
p = s->p;
if (p <= SLOCK_BITS)
{
/* Oops, looks like the wait block was removed. */
atomic_dec(&s->p);
return NULL;
}
return (slock_wb *)(p - SLOCK_LOCK);
}
static void slock_lock(slock *s)
{
slock_wb swblock;
/* Fastpath - no other readers or writers */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;
/* Initialize wait block */
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = 0;
while (1)
{
uintptr_t p = s->p;
cpu_relax();
/* Fastpath - no other readers or writers */
if (!p)
{
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;
continue;
}
if (p > SLOCK_BITS)
{
slock_wb *first_wb, *last;
first_wb = slockwb(s);
if (!first_wb) continue;
last = first_wb->last;
last->next = &swblock;
first_wb->last = &swblock;
/* Unlock */
barrier();
s->p &= ~SLOCK_LOCK;
break;
}
/* Try to add the first wait block */
if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;
}
/* Wait to acquire exclusive lock */
while (!swblock.wake) cpu_relax();
}
static void slock_unlock(slock *s)
{
slock_wb *next;
slock_wb *wb;
uintptr_t np;
while (1)
{
uintptr_t p = s->p;
/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */
if (p == SLOCK_USED)
{
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;
continue;
}
/* There's a wait block, we need to wake the next pending user */
wb = slockwb(s);
if (wb) break;
cpu_relax();
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t) next;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
np = SLOCK_USED;
}
barrier();
/* Also unlocks lock bit */
s->p = np;
barrier();
/* Notify the next waiter */
wb->wake = 1;
/* We released the lock */
}
static int slock_trylock(slock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;
return EBUSY;
}
Again, this algorithm disappoints. The results are similar to the bitlistlock algorithm. This isn't surprising, as the wait-block that controls the waiter list is synchronized by a bit lock.
Threads 1 2 3 4 5
Time (s) 3.7 5.1 5.8 6.5 >1min
Time to think laterally. One of the problems with the above algorithms is synchronization of the wait list. The core issue is that we need some way to recognize the head and tail of that list. The head of the list is needed to add a new waiter. The tail is needed to decide who is to go next. The MCS lock used the extra structure information so that the list tail could be quickly found. The K42 Lock used the patented method of storing the tail in a second list pointer within the lock itself.
There is another trick we can do though. If the extra information is allocated on the stack, then it may be possible to recognize that a pointer is pointing within our own stack frame. If so, then we can use that information within the algorithm to decide where the wait list ends. The result is the stack-lock algorithm:
typedef struct stlock_t stlock_t;
struct stlock_t
{
stlock_t *next;
};
typedef struct stlock_t *stlock;
static __attribute__((noinline)) void stlock_lock(stlock *l)
{
stlock_t *me = NULL;
barrier();
me = xchg_64(l, &me);
/* Wait until we get the lock */
while (me) cpu_relax();
}
#define MAX_STACK_SIZE (1<<12)
static __attribute__((noinline)) int on_stack(void *p)
{
int x;
uintptr_t u = (uintptr_t) &x;
return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2);
}
static __attribute__((noinline)) void stlock_unlock(stlock *l)
{
stlock_t *tail = *l;
barrier();
/* Fast case */
if (on_stack(tail))
{
/* Try to remove the wait list */
if (cmpxchg(l, tail, NULL) == tail) return;
tail = *l;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (on_stack(tail->next)) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int stlock_trylock(stlock *l)
{
stlock_t me;
if (!cmpxchg(l, NULL, &me)) return 0;
return EBUSY;
}
This algorithm is quite a bit simpler if you know that a thread's stack is aligned a certain way. (Then the stack-check turns into an XOR and a mask operation.) Unfortunately, it is still quite slow.
Threads 1 2 3 4 5
Time (s) 3.6 5.3 5.7 6.2 >1min
The lock operation above looks to be fairly efficient, it is the unlock routine that is slow and complex. Perhaps if we save a little more information within the lock itself, then the unlock operation can be made faster. Since quite a bit of time seems to be spent finding the previous node to ourselves (which is the one to wake up), it might be better to do that while we are spinning waiting for our turn to take the lock. If we save this previous point within the lock, we then will not need to calculate it within the unlock routine.
typedef struct plock_t plock_t;
struct plock_t
{
plock_t *next;
};
typedef struct plock plock;
struct plock
{
plock_t *next;
plock_t *prev;
plock_t *last;
};
static void plock_lock(plock *l)
{
plock_t *me = NULL;
plock_t *prev;
barrier();
me = xchg_64(l, &me);
prev = NULL;
/* Wait until we get the lock */
while (me)
{
/* Scan wait list for my previous */
if (l->next != (plock_t *) &me)
{
plock_t *t = l->next;
while (me)
{
if (t->next == (plock_t *) &me)
{
prev = t;
while (me) cpu_relax();
goto done;
}
if (t->next) t = t->next;
cpu_relax();
}
}
cpu_relax();
}
done:
l->prev = prev;
l->last = (plock_t *) &me;
}
static void plock_unlock(plock *l)
{
plock_t *tail;
/* Do I know my previous? */
if (l->prev)
{
/* Unlock */
l->prev->next = NULL;
return;
}
tail = l->next;
barrier();
/* Fast case */
if (tail == l->last)
{
/* Try to remove the wait list */
if (cmpxchg(&l->next, tail, NULL) == tail) return;
tail = l->next;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (tail->next == l->last) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int plock_trylock(plock *l)
{
plock_t me;
if (!cmpxchg(&l->next, NULL, &me))
{
l->last = &me;
return 0;
}
return EBUSY;
}
This starts regaining some of the speed we have lost, but still isn't quite as good as the K42 algorithm. (It is however, always faster than the original naive spinlock provided that the number of threads is less than the number of processors.)
Threads 1 2 3 4 5
Time (s) 3.7 5.1 5.3 5.4 >1min
A careful reading of the plock algorithm shows that it can be improved even more. We don't actually need to know the pointer value of the next waiter. Some other unique value will do instead. Instead of saving a pointer, we can use a counter that we increment. If a waiter knows which counter value corresponds to its turn, then it just needs to wait until that value appears. The result is called the ticket lock algorithm:
typedef union ticketlock ticketlock;
union ticketlock
{
unsigned u;
struct
{
unsigned short ticket;
unsigned short users;
} s;
};
static void ticket_lock(ticketlock *t)
{
unsigned short me = atomic_xadd(&t->s.users, 1);
while (t->s.ticket != me) cpu_relax();
}
static void ticket_unlock(ticketlock *t)
{
barrier();
t->s.ticket++;
}
static int ticket_trylock(ticketlock *t)
{
unsigned short me = t->s.users;
unsigned short menew = me + 1;
unsigned cmp = ((unsigned) me << 16) + me;
unsigned cmpnew = ((unsigned) me << 16) + menew;
if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static int ticket_lockable(ticketlock *t)
{
ticketlock u = *t;
barrier();
return (u.s.ticket == u.s.users);
}
The above algorithm is extremely fast, and beats all the other fair-locks described.
Threads 1 2 3 4 5
Time (s) 3.6 4.4 4.5 4.8 >1min
In fact, this is the spinlock algorithm used in the Linux kernel, although for extra speed, the kernel version is written in assembly language rather than the semi-portable C shown above. Also note that the above code depends on the endianness of the computer architecture. It is designed for little-endian machines. Big endian processors will require a swap of the two fields within the structure in the union.
The ticket lock shows that an oft-repeated fallacy is untrue. Many of the above fair-lock algorithms are meant to scale well because the waiters are spinning on different memory locations. This is meant to reduce bus traffic and thus increase performance. However, it appears that that effect is small. The more important thing is to make sure that the waiters are ordered by who gets to take the lock next. This is what the ticket lock does admirably. The fact that multiple waiters are spinning on the same ticket lock location does not seem to be a performance drain.
Read Write Locks
Quite often, some users of a data structure will make no modifications to it. They just require read access to its fields to do their work. If multiple threads require read access to the same data, there is no reason why they should not be able to execute simultaneously. Spinlocks don't differentiate between read and read/write access. Thus spinlocks do not exploit this potential parallelism. To do so, read-write locks are required.
The simplest read-write lock uses a spinlock to control write access, and a counter field for the readers.
typedef struct dumbrwlock dumbrwlock;
struct dumbrwlock
{
spinlock lock;
unsigned readers;
};
static void dumb_wrlock(dumbrwlock *l)
{
/* Get write lock */
spin_lock(&l->lock);
/* Wait for readers to finish */
while (l->readers) cpu_relax();
}
static void dumb_wrunlock(dumbrwlock *l)
{
spin_unlock(&l->lock);
}
static int dumb_wrtrylock(dumbrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (spin_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
spin_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumb_rdlock(dumbrwlock *l)
{
while (1)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return;
/* Failure - undo, and wait until we can try again */
atomic_dec(&l->readers);
while (l->lock) cpu_relax();
}
}
static void dumb_rdunlock(dumbrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumb_rdtrylock(dumbrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumb_rdupgradelock(dumbrwlock *l)
{
/* Try to convert into a write lock */
if (spin_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
The benchmark the above code, we need a little more information than the spinlock case. The fraction of readers is important. The more readers, the more parallelism we should get, and the faster the code should run. It is also important to have a random distribution of readers and writers, just like real-world situations. Thus a parallel random number generator is used. By selecting a random byte, and choosing 1, 25, 128, or 250 out of 256 possibilities to be a writer we can explore the mostly-reader case through to where most users of the lock are writers. Finally, it is important to find out the effects of contention. In general, read-write locks tend to be used where contention is high, so we will mostly look at the case where the number of threads is equal to the number of processors.
The dumb lock above performs fairly poorly when there is no contention. If one thread is used we get:
Writers per 256 1 25 128 250
Time (s) 3.7 3.8 4.6 5.4
As expected, we asymptote to the relatively slow timings of the standard spinlock algorithm as the write fraction increases. If there is contention, however, the dumb lock actually performs quite well. Using four threads:
Writers per 256 1 25 128 250
Time (s) 1.1 1.9 4.4 5.7
The obvious thing to do to try to gain speed would be to replace the slow spinlock with a ticketlock algorithm. If this is done, we have:
typedef struct dumbtrwlock dumbtrwlock;
struct dumbtrwlock
{
ticketlock lock;
unsigned readers;
};
static void dumbt_wrlock(dumbtrwlock *l)
{
/* Get lock */
ticket_lock(&l->lock);
/* Wait for readers to finish */
while (l->readers) cpu_relax();
}
static void dumbt_wrunlock(dumbtrwlock *l)
{
ticket_unlock(&l->lock);
}
static int dumbt_wrtrylock(dumbtrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
ticket_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumbt_rdlock(dumbtrwlock *l)
{
while (1)
{
/* Success? */
if (ticket_lockable(&l->lock))
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return;
/* Failure - undo, and wait until we can try again */
atomic_dec(&l->readers);
}
while (!ticket_lockable(&l->lock)) cpu_relax();
}
}
static void dumbt_rdunlock(dumbtrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumbt_rdtrylock(dumbtrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumbt_rdupgradelock(dumbtrwlock *l)
{
/* Try to convert into a write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
This performs much better in the uncontended case, taking 3.7 seconds for all write fractions. It is surprising that it doesn't beat the contended case though:
Writers per 256 1 25 128 250
Time (s) 2.0 2.5 3.7 4.5
This is slower for low write fractions, and faster for large write fractions. Since most of the time we use a read-write lock when the write fraction is low, this is really bad for this algorithm, which can be twice as slow as its competitor.
To try to reduce contention, and to gain speed, lets explore the rather complex algorithm used in Reactos to emulate Microsoft Window's slim read-write (SRW) locks. This uses a wait list, with a bitlock to control access to the wait list data structure. It is designed so that waiters will spin on separate memory locations for extra scalability.
/* Have a wait block */
#define SRWLOCK_WAIT 1
/* Users are readers */
#define SRWLOCK_SHARED 2
/* Bit-lock for editing the wait block */
#define SRWLOCK_LOCK 4
#define SRWLOCK_LOCK_BIT 2
/* Mask for the above bits */
#define SRWLOCK_MASK 7
/* Number of current users * 8 */
#define SRWLOCK_USERS 8
typedef struct srwlock srwlock;
struct srwlock
{
uintptr_t p;
};
typedef struct srw_sw srw_sw;
struct srw_sw
{
uintptr_t spin;
srw_sw *next;
};
typedef struct srw_wb srw_wb;
struct srw_wb
{
/* s_count is the number of shared acquirers * SRWLOCK_USERS. */
uintptr_t s_count;
/* Last points to the last wait block in the chain. The value
is only valid when read from the first wait block. */
srw_wb *last;
/* Next points to the next wait block in the chain. */
srw_wb *next;
/* The wake chain is only valid for shared wait blocks */
srw_sw *wake;
srw_sw *last_shared;
int ex;
};
/* Wait for control of wait block */
static srw_wb *lock_wb(srwlock *l)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&l->p, SRWLOCK_LOCK_BIT)) cpu_relax();
p = l->p;
barrier();
if (!(p & SRWLOCK_WAIT))
{
/* Oops, looks like the wait block was removed. */
atomic_clear_bit(&l->p, SRWLOCK_LOCK_BIT);
return NULL;
}
return (srw_wb *)(p & ~SRWLOCK_MASK);
}
static void srwlock_init(srwlock *l)
{
l->p = 0;
}
static void srwlock_rdlock(srwlock *l)
{
srw_wb swblock;
srw_sw sw;
uintptr_t p;
srw_wb *wb, *shared;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (!p)
{
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
if (p & SRWLOCK_SHARED)
{
if (!(p & SRWLOCK_WAIT))
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&l->p, p, p + SRWLOCK_USERS) == p) return;
}
else
{
/* There's other waiters already, lock the wait blocks and increment the shared count */
wb = lock_wb(l);
if (wb) break;
}
continue;
}
/* Initialize wait block */
swblock.ex = FALSE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
sw.next = NULL;
sw.spin = 0;
if (!(p & SRWLOCK_WAIT))
{
/*
* We need to setup the first wait block.
* Currently an exclusive lock is held, change the lock to contended mode.
*/
swblock.s_count = SRWLOCK_USERS;
swblock.last_shared = &sw;
if (cmpxchg(&l->p, p, (uintptr_t)&swblock | SRWLOCK_WAIT) == p)
{
while (!sw.spin) cpu_relax();
return;
}
continue;
}
/* Handle the contended but not shared case */
/*
* There's other waiters already, lock the wait blocks and increment the shared count.
* If the last block in the chain is an exclusive lock, add another block.
*/
swblock.s_count = 0;
wb = lock_wb(l);
if (!wb) continue;
shared = wb->last;
if (shared->ex)
{
shared->next = &swblock;
wb->last = &swblock;
shared = &swblock;
}
else
{
shared->last_shared->next = &sw;
}
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
return;
}
/* The contended and shared case */
sw.next = NULL;
sw.spin = 0;
if (wb->ex)
{
/*
* We need to setup a new wait block.
* Although we're currently in a shared lock and we're acquiring
* a shared lock, there are exclusive locks queued in between.
* We need to wait until those are released.
*/
shared = wb->last;
if (shared->ex)
{
swblock.ex = FALSE;
swblock.s_count = SRWLOCK_USERS;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
swblock.last_shared = &sw;
shared->next = &swblock;
wb->last = &swblock;
}
else
{
shared->last_shared->next = &sw;
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
}
}
else
{
wb->last_shared->next = &sw;
wb->s_count += SRWLOCK_USERS;
wb->last_shared = &sw;
}
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
}
static void srwlock_rdunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
/*
* There's a wait block, we need to wake a pending exclusive acquirer,
* if this is the last shared release.
*/
wb = lock_wb(l);
if (wb) break;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/*
* This is a fast path, we can simply decrement the shared
* count and store the pointer
*/
np = p - SRWLOCK_USERS;
/* If we are the last reader, then the lock is unused */
if (np == SRWLOCK_SHARED) np = 0;
/* Try to release the lock */
if (cmpxchg(&l->p, p, np) == p) return;
}
wb->s_count -= SRWLOCK_USERS;
if (wb->s_count)
{
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
return;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple exclusive lock. */
np = SRWLOCK_USERS;
}
barrier();
/* This also unlocks wb lock bit */
l->p = np;
barrier();
wb->wake = (void *) 1;
barrier();
/* We released the lock */
}
static int srwlock_rdtrylock(srwlock *s)
{
uintptr_t p = s->p;
barrier();
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!p && (cmpxchg(&s->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED) == 0)) return 0;
if ((p & (SRWLOCK_SHARED | SRWLOCK_WAIT)) == SRWLOCK_SHARED)
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&s->p, p, p + SRWLOCK_USERS) == p) return 0;
}
return EBUSY;
}
static void srwlock_wrlock(srwlock *l)
{
srw_wb swblock;
uintptr_t p, np;
/* Fastpath - no other readers or writers */
if (!l->p && (!cmpxchg(&l->p, 0, SRWLOCK_USERS))) return;
/* Initialize wait block */
swblock.ex = TRUE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = NULL;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
srw_wb *wb = lock_wb(l);
if (!wb) continue;
/* Complete Initialization of block */
swblock.s_count = 0;
wb->last->next = &swblock;
wb->last = &swblock;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
return;
}
/* Fastpath - no other readers or writers */
if (!p)
{
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/* There are no wait blocks so far, we need to add ourselves as the first wait block. */
if (p & SRWLOCK_SHARED)
{
swblock.s_count = p & ~SRWLOCK_MASK;
np = (uintptr_t)&swblock | SRWLOCK_SHARED | SRWLOCK_WAIT;
}
else
{
swblock.s_count = 0;
np = (uintptr_t)&swblock | SRWLOCK_WAIT;
}
/* Try to make change */
if (cmpxchg(&l->p, p, np) == p) break;
}
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
}
static void srwlock_wrunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
srw_sw *wake, *wake_next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p == SRWLOCK_USERS)
{
/*
* This is the fast path, we can simply clear the SRWLOCK_USERS bit.
* All other bits should be 0 now because this is a simple exclusive lock,
* and no one else is waiting.
*/
if (cmpxchg(&l->p, SRWLOCK_USERS, 0) == SRWLOCK_USERS) return;
continue;
}
/* There's a wait block, we need to wake the next pending acquirer */
wb = lock_wb(l);
if (wb) break;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
if (!wb->ex)
{
/* Save the shared count */
next->s_count = wb->s_count;
np |= SRWLOCK_SHARED;
}
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
if (wb->ex)
{
np = SRWLOCK_USERS;
}
else
{
np = wb->s_count | SRWLOCK_SHARED;
}
}
barrier();
/* Also unlocks lock bit */
l->p = np;
barrier();
if (wb->ex)
{
barrier();
/* Notify the next waiter */
wb->wake = (void *) 1;
barrier();
return;
}
/* We now need to wake all others required. */
for (wake = wb->wake; wake; wake = wake_next)
{
barrier();
wake_next = wake->next;
barrier();
wake->spin = 1;
barrier();
}
}
static int srwlock_wrtrylock(srwlock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SRWLOCK_USERS) == 0)) return 0;
return EBUSY;
}
The above code is not exactly the code in Reactos. It has been simplified and cleaned up somewhat. One of the controlling bit flags has been removed, and replaced with altered control flow. So how does it perform? In the uncontended case, it is just like the dumb ticket-based read-write lock, and takes 3.7 seconds for all cases. For the contended case with four threads:
Writers per 256 1 25 128 250
Time (s) 2.2 3.2 5.7 6.4
This is quite bad, slower than the dumb lock in all contended cases. The extra complexity simply isn't worth any performance gain.
Another possibility is to combine the reader count with some bits describing the state of the writers. A similar technique is used by the Linux kernel to describe its (reader-preferring) read-write locks. Making the lock starvation-proof for writers instead, yields something like the following:
#define RW_WAIT_BIT 0
#define RW_WRITE_BIT 1
#define RW_READ_BIT 2
#define RW_WAIT 1
#define RW_WRITE 2
#define RW_READ 4
typedef unsigned rwlock;
static void wrlock(rwlock *l)
{
while (1)
{
unsigned state = *l;
/* No readers or writers? */
if (state < RW_WRITE)
{
/* Turn off RW_WAIT, and turn on RW_WRITE */
if (cmpxchg(l, state, RW_WRITE) == state) return;
/* Someone else got there... time to wait */
state = *l;
}
/* Turn on writer wait bit */
if (!(state & RW_WAIT)) atomic_set_bit(l, RW_WAIT_BIT);
/* Wait until can try to take the lock */
while (*l > RW_WAIT) cpu_relax();
}
}
static void wrunlock(rwlock *l)
{
atomic_add(l, -RW_WRITE);
}
static int wrtrylock(rwlock *l)
{
unsigned state = *l;
if ((state < RW_WRITE) && (cmpxchg(l, state, state + RW_WRITE) == state)) return 0;
return EBUSY;
}
static void rdlock(rwlock *l)
{
while (1)
{
/* A writer exists? */
while (*l & (RW_WAIT | RW_WRITE)) cpu_relax();
/* Try to get read lock */
if (!(atomic_xadd(l, RW_READ) & (RW_WAIT | RW_WRITE))) return;
/* Undo */
atomic_add(l, -RW_READ);
}
}
static void rdunlock(rwlock *l)
{
atomic_add(l, -RW_READ);
}
static int rdtrylock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
if (!(state & (RW_WAIT | RW_WRITE))) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Get a read lock, even if a writer is waiting */
static int rdforcelock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
/* We succeed even if a writer is waiting */
if (!(state & RW_WRITE)) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Try to upgrade from a read to a write lock atomically */
static int rdtryupgradelock(rwlock *l)
{
/* Someone else is trying (and will succeed) to upgrade to a write lock? */
if (atomic_bitsetandtest(l, RW_WRITE_BIT)) return EBUSY;
/* Don't count myself any more */
atomic_add(l, -RW_READ);
/* Wait until there are no more readers */
while (*l > (RW_WAIT | RW_WRITE)) cpu_relax();
return 0;
}
This lock unfortunately, has a similar performance to the dumb lock using a ticket lock as its spinlock.
Writers per 256 1 25 128 250
Time (s) 2.0 3.4 3.9 4.6
The version in the Linux kernel is written in assembler, so may be a fair bit faster. It uses the fact that the atomic add instruction can set the zero flag. This means that the slower add-and-test method isn't needed, and a two-instruction fast path is used instead.
Sticking to semi-portable C code, we can still do a little better. There exists a form of the ticket lock that is designed for read-write locks. An example written in assembly was posted to the Linux kernel mailing list in 2002 by David Howells from RedHat. This was a highly optimized version of a read-write ticket lock developed at IBM in the early 90's by Joseph Seigh. Note that a similar (but not identical) algorithm was published by John Mellor-Crummey and Michael Scott in their landmark paper "Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors". Converting the algorithm from assembly language to C yields:
typedef union rwticket rwticket;
union rwticket
{
unsigned u;
unsigned short us;
__extension__ struct
{
unsigned char write;
unsigned char read;
unsigned char users;
} s;
};
static void rwticket_wrlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.write) cpu_relax();
}
static void rwticket_wrunlock(rwticket *l)
{
rwticket t = *l;
barrier();
t.s.write++;
t.s.read++;
*(unsigned short *) l = t.us;
}
static int rwticket_wrtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned char menew = me + 1;
unsigned read = l->s.read << 8;
unsigned cmp = (me << 16) + read + me;
unsigned cmpnew = (me << 16) + read + menew;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static void rwticket_rdlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.read) cpu_relax();
l->s.read++;
}
static void rwticket_rdunlock(rwticket *l)
{
atomic_inc(&l->s.write);
}
static int rwticket_rdtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned write = l->s.write;
unsigned char menew = me + 1;
unsigned cmp = (me << 16) + (me << 8) + write;
unsigned cmpnew = ((unsigned) menew << 16) + (menew << 8) + write;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
This read-write lock performs extremely well. It is as fast as the dumb spinlock rwlock for low writer fraction, and nearly as fast as the dumb ticketlock rwlock for large number of writers. It also doesn't suffer any slowdown when there is no contention, taking 3.7 seconds for all cases. With contention:
Writers per 256 1 25 128 250
Time (s) 1.1 1.8 3.9 4.7
This algorithm is five times faster than using a simple spin lock for the reader-dominated case. Its only drawback is that it is difficult to upgrade read locks into write locks atomically. (It can be done, but then rwticket_wrunlock() needs to use an atomic instruction, and the resulting code becomes quite a bit slower.) This drawback is the reason why this algorithm is not used within the Linux kernel. Some parts depend on the fact that if you have a read lock, then acquiring a new read lock recursively will always succeed. However, if that requirement were to be removed, this algorithm probably would be a good replacement.
One final thing to note is that the read-write ticket lock is not optimal. The problem is the situation where readers and writers alternate in the wait queue: writer (executing), reader 1, writer, reader 2. The two reader threads can be shuffled so that they execute in parallel. i.e. the second reader should probably not have to wait until the second writer finishes to execute. Fortunately, this situation is encountered rarely when the thread count is low. For four processors and threads, this happens one time in 16 if readers and writers are equally likely, and less often otherwise. Unfortunately, as the number of threads increases this will lead to an asymptotic factor of two slowdown compared with the optimal ordering.
The obvious thing to do to fix this is to add a test to see if readers should be reordered in wait-order. However, since the effect is so rare with four concurrent threads, it is extremely hard (if not impossible) to add the check with a low enough overhead that the result is a performance win. Thus it seems that the problem of exactly which algorithm is best will need to be revisited when larger multicore machines become common.
Subscribe to:
Comments (Atom)