This project has moved and is read-only. For the latest updates, please go here.

Use of Disruptor

Jul 24, 2013 at 5:03 PM
I noticed a brief reference to the 'Disruptor' in one of the GPA web sites. I've been very interested in this software, and am interested in your experience using it. Both performance improvements, and any problems implementing these codes.
Jul 25, 2013 at 2:38 PM
Edited Jul 25, 2013 at 2:44 PM
I think the Java version is very stable, however, we have had varied results with the .NET version. Keep in mind that the .NET version is not actively maintained like its Java counterpart.

In our testing the Disruptor was always faster than .NET's BlockingCollection, but the Grid Solutions Framework GSF.Collections.AsyncQueue and especially the GSF.Collections.AsyncDoubleBufferedQueue in GSF.Core.dll (see http://gsf.codeplex.com) can beat the .NET Disruptor in almost every case, e.g., multi-producer/multi-consumer and single-producer/multi-consumer scenarios, using all variation of Disruptor claim/wait strategies. There was a single scenario where we got the .NET Disruptor to actually beat GSF, but only marginally - this was a multi-producer, single-consumer scenario and only when Disruptor was configured for single threaded claim strategy - other claim strategies, e.g., multi-threaded, made it worse. Even in this mode however, we found stability under load to be a real issue.

We found an interesting comment from the .NET Disruptor author here:

http://stackoverflow.com/questions/13350342/should-i-synchronize-access-to-disruptor-next-publish-methods

My suggestion (especially following his) is that if you are looking into using this technology, use the Java version - it's stable and used in production environments. If you are using .NET, use one of the GSF Async queues.

Thanks,
Ritchie

FYI - Stephen developed a simple test program you can use for reference to compare performance of the .NET Disruptor, GSF Async queues and .NET's blocking collection - you will need to tweak the code to adjust Disruptor's ring buffer size, claim/wait strategies, number of producers/consumers, etc.:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using GSF.Collections;

namespace AsyncQueueTest
{
    class Program
    {
        private class LongHandler : IEventHandler<LongEntry>
        {
            public void OnNext(LongEntry data, long sequence, bool endOfBatch)
            {
                HandleMessage(data.Value);
            }
        }

        private class LongEntry
        {
            public long Value
            {
                get;
                set;
            }
        }

        private const long NumberOfMessages = 100000;
        private const int NumberOfProducers = 5;
        private const int NumberOfConsumers = 10;

        static void Main(string[] args)
        {
            s_stopwatch = new Stopwatch();
            RunAsyncQueueTest();
            Console.ReadLine();
        }

        private static void RunBlockingCollectionTest()
        {
            BlockingCollection<long>[] blockingCollections = new BlockingCollection<long>[NumberOfConsumers];
            Thread t;

            for (int i = 0; i < NumberOfConsumers; i++)
            {
                BlockingCollection<long> blockingCollection = new BlockingCollection<long>();

                blockingCollections[i] = blockingCollection;

                t = new Thread(() =>
                {
                    long message;

                    while (s_finalMessagesReceived < NumberOfProducers * NumberOfConsumers)
                    {
                        if (blockingCollection.TryTake(out message))
                            HandleMessage(message);
                    }
                });

                t.Start();
            }

            s_stopwatch.Start();

            for (int i = 0; i < NumberOfProducers; i++)
            {
                t = new Thread(() =>
                {
                    foreach (BlockingCollection<long> blockingCollection in blockingCollections)
                    {
                        for (long message = 1L; message <= NumberOfMessages; message++)
                            blockingCollection.TryAdd(message);
                    }
                });

                t.Start();
            }
        }

        private static void RunAsyncQueueTest()
        {
            AsyncQueue<long>[] asyncQueues = new AsyncQueue<long>[NumberOfConsumers];

            for (int i = 0; i < NumberOfConsumers; i++)
            {
                AsyncQueue<long> asyncQueue = new AsyncQueue<long>();
                asyncQueue.ProcessItemFunction = HandleMessage;
                asyncQueues[i] = asyncQueue;
            }
            s_stopwatch.Start();

            for (int i = 0; i < NumberOfProducers; i++)
            {
                Thread t = new Thread(() =>
                {
                    foreach (AsyncQueue<long> asyncQueue in asyncQueues)
                    {
                        for (long message = 1L; message <= NumberOfMessages; message++)
                            asyncQueue.Enqueue(message);
                    }
                });

                t.Start();
            }
        }

        private static void RunDisruptorTest()
        {
            const int ringBufferSize = 2 << 21;
            RingBuffer<LongEntry>[] buffers;
            Thread t;

            buffers = new RingBuffer<LongEntry>[NumberOfConsumers];

            for (int i = 0; i < NumberOfConsumers; i++)
            {
                IClaimStrategy claimStrategy = new MultiThreadedClaimStrategy(ringBufferSize);
                IWaitStrategy waitStrategy = new SleepingWaitStrategy();
                Disruptor<LongEntry> disruptor = new Disruptor<LongEntry>(() => new LongEntry(), claimStrategy, waitStrategy, TaskScheduler.Default);
                disruptor.HandleEventsWith(new LongHandler());
                buffers[i] = disruptor.Start();
            }

            s_stopwatch.Start();

            for (int i = 0; i < NumberOfProducers; i++)
            {
                t = new Thread(() =>
                {
                    for (long message = 1L; message <= NumberOfMessages; message++)
                    {
                        foreach (RingBuffer<LongEntry> buffer in buffers)
                        {
                            long nextSequence = buffer.Next();
                            buffer[nextSequence].Value = message;
                            buffer.Publish(nextSequence);
                        }
                    }
                });

                t.Start();
            }
        }

        private static void HandleMessage(long message)
        {
            if (message == NumberOfMessages)
            {
                if (Interlocked.Increment(ref s_finalMessagesReceived) == NumberOfProducers * NumberOfConsumers)
                {
                    s_stopwatch.Stop();
                    Console.WriteLine("Time elapsed: {0}", s_stopwatch.Elapsed);
                }
            }
        }

        private static Stopwatch s_stopwatch;
        private static int s_finalMessagesReceived;
    }
}
Jul 25, 2013 at 3:18 PM
Thanks. These issues are very interesting to me. I have read elsewhere that the Disruptor seems to have a very narrow area of use. Great work!