Exchanger

It offers a simplified way of communicating between threads, that is, by passing a specific object between two threads. That's why there's the <V> after the class name.

import java.util.*;
import java.util.concurrent.*;
 
public class ExchangerTest
{
 
    private static final int FULL = 10;
 
    private static final int COUNT = FULL * 20;
 
    private static final Random random = new Random();
 
    private static volatile int sum = 0;
 
    //the constructor simply identifies the type of object to exchange
    private static Exchanger<List<Integer>> 
        exchanger = new Exchanger<List<Integer>>();
 
    private static List<Integer> initiallyEmptyBuffer;
 
    private static List<Integer> initiallyFullBuffer;
 
    private static CountDownLatch stopLatch = new CountDownLatch(2);
 
    //Producer
    private static class FillingLoop implements Runnable
    {
        public void run()
        {
            List<Integer> currentBuffer = initiallyEmptyBuffer;
            try
            {
                for (int i = 0; i < COUNT; i++)
                {
                    if (currentBuffer == null)
                        break; // stop on null
                    Integer item = random.nextInt(100);
                    System.out.println("Added: " + item);
                    currentBuffer.add(item);
                    if (currentBuffer.size() == FULL)
                        //when buffer is full exchange iteam with consumer
                        currentBuffer = exchanger.exchange(currentBuffer);
                }
            } catch (InterruptedException ex)
            {
                System.out.println("Bad exchange on filling side");
            }
            stopLatch.countDown();
        }
    }
 
    //Consumer
    private static class EmptyingLoop implements Runnable
    {
        public void run()
        {
            List<Integer> currentBuffer = initiallyFullBuffer;
            try
            {
                for (int i = 0; i < COUNT; i++)
                {
                    if (currentBuffer == null)
                        break; // stop on null
                    Integer item = currentBuffer.remove(0);
                    System.out.println("Got: " + item);
                    sum += item.intValue();
                    if (currentBuffer.isEmpty())
                    {
                        //when buffer is empty exchange iteam with producer
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                }
            } catch (InterruptedException ex)
            {
                System.out.println("Bad exchange on emptying side");
            }
            stopLatch.countDown();
        }
    }
 
    public static void main(String args[])
    {
        initiallyEmptyBuffer = new ArrayList<Integer>();
        initiallyFullBuffer = new ArrayList<Integer>(FULL);
        for (int i = 0; i < FULL; i++)
        {
            initiallyFullBuffer.add(random.nextInt(100));
        }
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
        try
        {
            stopLatch.await();
        } catch (InterruptedException ex)
        {
            ex.printStackTrace();
        }
        System.out.println("Sum of all items is.... " + sum);
    }
}
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-Share Alike 2.5 License.