Thursday, 17 April 2008

ConcurrentLinkedQueue On A Diet

News are coming very slowly on me I know. This overview of atomic variables from IBM website is dated year 2004.

The article implies that in the case when contention is high (multiple threads compete for access to the same data structures) we can save a lot of CPU cycles that the JVM would have otherwise spent on scheduling threads by using atomic variables instead of traditional synchronized locking.

As I understand Java memory model each read/write to an atomic variable (as well as to a volatile one) introduces a memory barrier forcing the CPU-s to discard read caches/to flush write ones.

Following this line of reasoning I have concluded that it would be beneficial not only to synchronize access to highly contended data structures with atomic variables but also to do as little synchronization as at all possible.

Here's my mental experiment. Suppose we would like to optimize one particular use case. A number of threads put tasks into a queue. A single worker thread takes tasks from the queue and executes them. Apparently we need only one atomic variable to perform this task reliably. Here's the code

package org.apache.atagunov.mr.queue;

public class MrLinked<T>
{
    /** Managed exclusively by {@link MrQueue} */
    T first, next;
}

package org.apache.atagunov.mr.queue;

public interface MrInternalIterator<T>
{
    void process(T t);
}

package org.apache.atagunov.mr.queue;

import java.util.concurrent.atomic.AtomicReference;

import org.apache.log4j.Logger;

public class MrQueue<T extends MrLinked<T>>
{
    private final AtomicReference<T> last = new AtomicReference<T>();
    
    /**
     * Add <tt>item</tt> to this queue
     * May be called from multiple threads.
     * @return <tt>true</tt> if we have gone from empty to non-empty state
     */
    public boolean add(final T item)
    {
        item.next = null;
        T oldLast;
        
        do
        {
            /* suppose queue is empty; then item will be the only element */
            item.first = item;
            do
            {
                if (last.compareAndSet(null, item)) {
                    return true;
                }
                
                /* okay, queue wasn't empty during compareAndSet */
                oldLast = last.get();
            }
            /* may have gone null by now */
            while (oldLast == null);
            
            /* suppose the queue doesn't change until next compareAndSet */
            oldLast.next = item;
            
            /* all items in the queue have same first */
            item.first = oldLast.first;
        }
        while (!last.compareAndSet(oldLast, item));
        
        /* if we got here this means we have added item to non-empty queue */
        return false;
    }
    
    private final Logger logger = Logger.getLogger(MrQueue.class);
    
    /**
     * Run <tt>ii</tt> for every item in the queue then clear the queue.
     * Even if <tt>ii</tt> throws an exception we consider the item as processed
     * Run from one thread at a time please!
     * @return number of orders executed
     */
    public int processAndClear(final MrInternalIterator<T> ii)
    {
        T stopHere = last.get();
        if (stopHere == null) {
            /* don't expect this in context of MrUniverse */
            return 0;
        }
        
        int count = 0;        
        T t = stopHere.first;
        
        for(;;)
        {
            for(;;)
            {
                if (t == null) {
                    logger.fatal("encountered null, queue discarded");
                    /* sort of recovery - discard queue */
                    last.set(null);
                    return count;
                }
                
                doProcess(t, ii);
                count++;
                
                if (t == stopHere) {
                    break;
                }
                t = t.next;
            }
            
            if (last.compareAndSet(stopHere, null))
            {
                /* done! */
                return count;
            }
            
            /* somebody must have added an item to the queue */
            stopHere = last.get();
            
            /* only read .next now after get() which has been a synch point */
            t = t.next;
        }
    }
    
    private final void doProcess(final T t, final MrInternalIterator<T> ii)
    {
        try
        {
            ii.process(t);
        }
        catch (Error e)
        {
            /* sort of recoverty - discard queue */
            logger.fatal("Iterator thrown " + e.getClass().getName()
                    + ", queue discarded");
            last.set(null);
            
            /* let it go through killing the thread maybe */
            throw e;
        }
        catch (Throwable thr)
        {
            /* consider item as processed; more handling may be added later,
             * at least better logging */
            logger.error("Problem processing queued item", thr);
        }        
    }
}


Only one atomic variable per queue! Contrast this with ConcurrentLinkedQueue which uses atomic access for each "next" pointer in its backing linked list implementation. This is our win for coding specialized solution for a specific use case.

I was also so concerned about performance that I've decided to unite backing linked list element implementation with actual data items. Sort of what we used to do in good old C days coding lists by hand. The actual class that needs to be stored in MrQueue has to extend MrQueue<its-own-type>. The gain here is that GC has to take care only about one object not about two.

I have tested this little bit of code as part of a bigger application on a big multi-processor box. The tests have run fine and it appears that the code actually works as designed.