Java Program to Implement SynchronosQueue API

This Java program Implements SynchronousQueue API.A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null.

Here is the source code of the Java Program to Implement SynchronosQueue. The Java program is successfully compiled and run on a Linux system. The program output is also shown below.

  1. import java.util.Collection;
  2. import java.util.Iterator;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.SynchronousQueue;
  5. import java.util.concurrent.TimeUnit;
  6.  
  7. public class SynchronousQueueImpl<E>
  8. {
  9.     private SynchronousQueue<E> synchronousQueue;
  10.  
  11.     /** Creates a SynchronousQueue with nonfair access policy. **/
  12.     public SynchronousQueueImpl()
  13.     {
  14.         synchronousQueue = new SynchronousQueue<E>();
  15.     }
  16.  
  17.     /** Creates a SynchronousQueue with the specified fairness policy. **/
  18.     public SynchronousQueueImpl(boolean fair)
  19.     {
  20.         synchronousQueue = new SynchronousQueue<E>(fair);
  21.     }
  22.  
  23.     /** Inserts the specified element at the tail of this queue. **/
  24.     public boolean add(E e)
  25.     {
  26.         return synchronousQueue.add(e);
  27.     }
  28.  
  29.     /** Atomically removes all of the elements from this queue. **/
  30.     public void clear()
  31.     {
  32.         synchronousQueue.clear();
  33.     }
  34.  
  35.     /** Returns true if this queue contains the specified element. **/
  36.     public boolean contains(Object o)
  37.     {
  38.         return synchronousQueue.contains(o);
  39.     }
  40.  
  41.     /**
  42.      * Removes all available elements from this queue and adds them to the given
  43.      * collection.
  44.      **/
  45.     public int drainTo(Collection<? super E> c)
  46.     {
  47.         return synchronousQueue.drainTo(c);
  48.     }
  49.  
  50.     /**
  51.      * Removes at most the given number of available elements from this queue
  52.      * and adds them to the given collection.
  53.      **/
  54.     public int drainTo(Collection<? super E> c, int maxElements)
  55.     {
  56.         return synchronousQueue.drainTo(c, maxElements);
  57.     }
  58.  
  59.     /** Returns an iterator over the elements in this queue in proper sequence. **/
  60.     public Iterator<E> iterator()
  61.     {
  62.         return synchronousQueue.iterator();
  63.     }
  64.  
  65.     /**
  66.      * Inserts the specified element at the tail of this queue if it is possible
  67.      * to do so immediately without exceeding the queue's capacity, returning
  68.      * true upon success and false if this queue is full.
  69.      **/
  70.     public boolean offer(E e)
  71.     {
  72.         return synchronousQueue.offer(e);
  73.     }
  74.  
  75.     /**
  76.      * Inserts the specified element at the tail of this queue, waiting up to
  77.      * the specified wait time for space to become available if the queue is
  78.      * full.
  79.      **/
  80.     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
  81.     {
  82.         return synchronousQueue.offer(e, timeout, unit);
  83.     }
  84.  
  85.     /**
  86.      * Retrieves, but does not remove, the head of this queue, or returns null
  87.      * if this queue is empty.
  88.      **/
  89.     public E peek()
  90.     {
  91.         return synchronousQueue.peek();
  92.     }
  93.  
  94.     /**
  95.      * Retrieves and removes the head of this queue, or returns null if this
  96.      * queue is empty.
  97.      **/
  98.     public E poll()
  99.     {
  100.         return synchronousQueue.poll();
  101.     }
  102.  
  103.     /**
  104.      * Retrieves and removes the head of this queue, waiting up to the specified
  105.      * wait time if necessary for an element to become available.
  106.      **/
  107.     public E poll(long timeout, TimeUnit unit) throws InterruptedException
  108.     {
  109.         return synchronousQueue.poll(timeout, unit);
  110.     }
  111.  
  112.     /**
  113.      * Inserts the specified element at the tail of this queue, waiting for
  114.      * space to become available if the queue is full.
  115.      **/
  116.     public void put(E e) throws InterruptedException
  117.     {
  118.         synchronousQueue.put(e);
  119.     }
  120.  
  121.     /**
  122.      * Always returns Integer.MAX_VALUE because a PriorityBlockingQueue is not
  123.      * capacity constrained.
  124.      **/
  125.     public int remainingCapacity()
  126.     {
  127.         return synchronousQueue.remainingCapacity();
  128.     }
  129.  
  130.     /**
  131.      * Removes a single instance of the specified element from this queue, if it
  132.      * is present.
  133.      **/
  134.     public boolean remove(Object o)
  135.     {
  136.         return synchronousQueue.remove(o);
  137.     }
  138.  
  139.     /** Returns the number of elements in this queue. **/
  140.     public int size()
  141.     {
  142.         return synchronousQueue.size();
  143.     }
  144.  
  145.     /**
  146.      * Retrieves and removes the head of this queue, waiting if necessary until
  147.      * an element becomes available
  148.      **/
  149.     public E take() throws InterruptedException
  150.     {
  151.         return synchronousQueue.take();
  152.     }
  153.  
  154.     /**
  155.      * Returns an array containing all of the elements in this queue, in proper
  156.      * sequence.
  157.      **/
  158.     public Object[] toArray()
  159.     {
  160.         return synchronousQueue.toArray();
  161.     }
  162.  
  163.     /**
  164.      * Returns an array containing all of the elements in this queue, in proper
  165.      * sequence; the runtime type of the returned array is that of the specified
  166.      * array.
  167.      **/
  168.     public <T> T[] toArray(T[] a)
  169.     {
  170.         return synchronousQueue.toArray(a);
  171.     }
  172.  
  173.     /** Returns a string representation of this collection. **/
  174.     public String toString()
  175.     {
  176.         return synchronousQueue.toString();
  177.     }
  178.  
  179.     public static void main(String... arg) throws InterruptedException
  180.     {
  181.         SynchronousQueueImpl<Integer> synchronousQueue = new SynchronousQueueImpl<Integer>();
  182.         new Thread(new SynchronousQueueImpl<>().new PutThread(
  183.         synchronousQueue.synchronousQueue)).start();
  184.         new Thread(new SynchronousQueueImpl<>().new TakeThread(
  185.         synchronousQueue.synchronousQueue)).start();
  186.     }
  187.  
  188.     class PutThread implements Runnable
  189.     {
  190.         BlockingQueue SynchronousQueue;
  191.  
  192.         public PutThread(BlockingQueue q)
  193.         {
  194.             this.SynchronousQueue = q;
  195.         }
  196.  
  197.         @Override
  198.         public void run()
  199.         {
  200.             try
  201.             {
  202.                 SynchronousQueue.put(19);
  203.                 System.out.println("19 added to synchronous queue by PutThread");
  204.                 Thread.sleep(1000);
  205.             } catch (InterruptedException e)
  206.             {
  207.                 e.printStackTrace();
  208.             }
  209.         }
  210.     }
  211.  
  212.     class TakeThread implements Runnable
  213.     {
  214.         BlockingQueue SynchronousQueue;
  215.  
  216.         public TakeThread(BlockingQueue q)
  217.         {
  218.             this.SynchronousQueue = q;
  219.         }
  220.  
  221.         @Override
  222.         public void run()
  223.         {
  224.             try
  225.             {
  226.                 this.SynchronousQueue.take();
  227.                 System.out.println("19 removed from synchronous queue by TakeThread");
  228.             } catch (InterruptedException e)
  229.             {
  230.                 e.printStackTrace();
  231.             }
  232.         }
  233.     }
  234. }

$ javac SynchronousQueueImpl.java
$ java SynchronousQueueImpl
19 added to synchronous queue by PutThread
19 removed from synchronous queue by TakeThread

Sanfoundry Global Education & Learning Series – 1000 Java Programs.

advertisement
advertisement
If you wish to look at all Java Programming examples, go to Java Programs.

If you find any mistake above, kindly email to [email protected]

advertisement
advertisement
Subscribe to our Newsletters (Subject-wise). Participate in the Sanfoundry Certification contest to get free Certificate of Merit. Join our social networks below and stay updated with latest contests, videos, internships and jobs!

Youtube | Telegram | LinkedIn | Instagram | Facebook | Twitter | Pinterest
Manish Bhojasia - Founder & CTO at Sanfoundry
Manish Bhojasia, a technology veteran with 20+ years @ Cisco & Wipro, is Founder and CTO at Sanfoundry. He lives in Bangalore, and focuses on development of Linux Kernel, SAN Technologies, Advanced C, Data Structures & Alogrithms. Stay connected with him at LinkedIn.

Subscribe to his free Masterclasses at Youtube & discussions at Telegram SanfoundryClasses.