Processing Multiple Threads with Unique Keys

No Comments July 29, 2013

A requirement cropped up the other day that required a way of allowing multiple processes to run at the same time, but only one instance of a given key at any time.

In other words, if four processes were started with the keys “foo”, “bar”, “baz” and “foo” at exactly the same time, only the first three instances would be allowed and the second “foo” would be blocked (assuming that the first “foo” hadn’t been completed by the time that the second “foo” was dealt with).

After pondering the problem for a short while, this was the result:

public class LockProcessor<T> {
    ConcurrentDictionary<T, Guid> _processDictionary;

    public LockProcessor() {
        _processDictionary = new ConcurrentDictionary<T, Guid>();
    }

    public void Process(T key, Action successAction, Action failureAction = null) {
        Guid localGuid = Guid.NewGuid();

        if (_processDictionary.GetOrAdd(key, localGuid).Equals(localGuid)) {
            try {
                successAction();
            }
            finally {
                Guid oldGuid;
                _processDictionary.TryRemove(key, out oldGuid);
            }
        }
        else {
            if (failureAction != null) {
                failureAction();
            }
        }
    }
}

So, what’s going on here?

  1. When a client calls the Process method a new Guid is generated
  2. An attempt is made to insert the new Guid into the dictionary given the key specified via the thread-safe GetOrAdd method. If the dictionary doesn’t hold a value for the given key, the Guid is inserted otherwise the original value is retained. The method returns with the value stored against the key. This therefore means that if two threads attempt to start a process with the same key, the first thread will add a Guid to the dictionary and the second thread will fail to add its value and read the value from the first thread instead. In this way, we can guarantee only one thread will enter the ‘if’ block.
    if (_processDictionary.GetOrAdd(key, localGuid).Equals(localGuid)) {
    // ...
    }
  3. The thread that successfully entered the ‘if’ block then executes the Action that was passed into it, otherwise the failure Action is invoked (if it exists).
  4. Once the success Action has been invoked, the finally block will be executed to remove the specified key from the concurrent dictionary. This means that even if the success Action throws an exception, the dictionary will be restored to the correct state.

And there you have it. A simple way to allow multiple processes on multiple threads to run ensuring that only unique keys are processed simultaneously.

var lp = new LockProcessor<string>();

Action<int> success = (n) => {
    Console.WriteLine("#{0} Success", n);
    Thread.Sleep(TimeSpan.FromSeconds(5));
};

Action<int> failure = (n) => {
    Console.WriteLine("#{0} Failure", n);
}; 

for (int i = 0; i < 10; i++) {
    int j = i;
    new Thread(() => {
        lp.Process("12345",
        () => {
            success(j);
        },
        () => {
            failure(j);
        });
    }).Start();
}

The above code fragment demonstrates using the LockProcessor to ensure that only one of the ten threads started will invoke the success Action.

Now, if only I could think of a more appropriate name than “LockProcessor”. Any suggestions gratefully received…


No Comments