Building Our Own ESB - Publish / Subscribe (Part 3)

And now the code...

So now we are ready to start coding our framework. To start with, we are going to need a few classes. Here's a class diagram of our core:


click image to enlarge

We've already talked at a high level, so we'll dive straight into the core code now. Feel free to ask questions if you have them, since I'm going to let the code do most of the talking today.

MessageDelivery: Encapsulates information about a message. In addition to the message data, we need a place to store things like the destination of the message, how many attempts we've made to deliver the message, and the maximum number of tries to deliver the message. We don't want to force everyone to stuff this information into their message classes, since this is really information that the service bus needs to track for it's own purposes.

    [Serializable]
    public class MessageDelivery 
    {
        public MessageDelivery(SubscriptionEndpoint endpoint, string action, object message)
        {
            _messageId = Guid.NewGuid().ToString();
            _endpoint = endpoint;
            _action = action;
            _message = message;
        }

        public MessageDelivery(string messageId, SubscriptionEndpoint endpoint, string action, object message, int retryCount, DateTime? timeToProcess)
        {
            _messageId = messageId;
            _endpoint = endpoint;
            _action = action;
            _message = message;
            _retryCount = retryCount;
            _timeToProcess = timeToProcess;
        }

        private readonly string _messageId;

        public string MessageId
        {
            get { return _messageId; }
        }

        private readonly SubscriptionEndpoint _endpoint;

        public SubscriptionEndpoint Endpoint
        {
            get { return _endpoint; }
        } 

        private readonly string _action;

        public string Action
        {
            get { return _action; }
        }

        private readonly object _message;

        public object Message
        {
            get { return _message; }
        } 

        private readonly int _retryCount;

        public int RetryCount
        {
            get { return _retryCount; }
        } 

        private readonly DateTime? _timeToProcess;

        public DateTime? TimeToProcess
        {
            get { return _timeToProcess; }
        }

        private readonly int _maxRetries = 10;

        public int MaxRetries
        {
            get { return _maxRetries; }
        } 


        public bool RetriesMaxed
        {
            get
            {
                return _maxRetries < _retryCount;
            }
        }

        const int RETRY_DELAY_MS = 30000;

        public MessageDelivery CreateRetry()
        {
            int retryCount = (_retryCount + 1);
            
            return new MessageDelivery(_messageId, _endpoint, _action, _message, retryCount, DateTime.Now.AddMilliseconds(RETRY_DELAY_MS * retryCount * retryCount)); 
            
        }
      
    }

MessageDeliveryQueue: Abstracts our message queueing infrastructure. We don't want to take a direct dependancy on MSMQ by coding directly against the MessageQueue class in System.Messaging, since we want each part of our bus to be fully replaceable. Note the addition of the Dequeue by id method, which we will need at a later when we want to pull specific messages out of the dead-letter queue for reprocessing.

    public interface MessageDeliveryQueue 
    {
        void Enqueue(MessageDelivery value);
        MessageDelivery Dequeue(TimeSpan timeout);
        MessageDelivery Dequeue(string id, TimeSpan timeout);
    }

Endpoint: At this point, our ESB will contain two types of endpoints, listener endpoints and subscription endpoints. Listener endpoints will be hosted by the ESB, while subscription endpoints are where our ESB will be sending it's messages.

    [Serializable]
    public abstract class Endpoint
    {
        Guid _endpointId = Guid.NewGuid();
        public Guid Id
        {
            get
            {
                return _endpointId;
            }
            set
            {
                _endpointId = value;
            }
        }


        Type _contractType;
        public Type ContractType
        {
            get
            {
                return _contractType;
            }
            set
            {
                _contractType = value;
            }
        }

        string _address;
        public string Address
        {
            get
            {
                return _address;
            }
            set
            {
                _address = value;
            }
        }

        string _configurationName;
        public string ConfigurationName
        {
            get
            {
                return _configurationName;
            }
            set
            {
                _configurationName = value;
            }
        }

        string _name;
        public string Name
        {
            get
            {
                return _name;
            }
            set
            {
                _name = value;
            }
        }
    }

Dispatcher: Once a message is ready to be delivered, we need something to send it to it's endpoint. Dispatchers abstract the dispatch process, so we aren't bound tightly to WCF.

    public abstract class Dispatcher
    {
        [ThreadStatic]
        static DispatchContext _dispatchContext;
        public static DispatchContext DispatchContext
        {
            get
            {
                return _dispatchContext;
            }
        }

        internal void DispatchInternal(MessageDelivery delivery)
        {
            _dispatchContext = new DispatchContext(delivery);
            try
            {
                Dispatch(delivery.Endpoint, delivery.Action, delivery.Message);
            }
            finally
            {
                _dispatchContext = null;
            }

        }

        protected abstract void Dispatch(SubscriptionEndpoint endpoint, string action, object message);
    }

RuntimeService: We want to keep the core of the service bus as clean as possible, so things like persistence of subscribers or hosting of specific types of listeners will be provided by RuntimeServices. The service bus will pass messages to each registered runtime service when significant events happen and the runtime service can use these events to do things like start up a WCF ServiceHost or communicate with a SQL server.

    public class RuntimeService
    {
        protected ServiceBusRuntime Runtime
        {
            get;
            private set;
        }

        internal void SetRuntime(ServiceBusRuntime runtime)
        {
            Runtime = runtime;
        }

        volatile bool _started;
        protected bool Started
        {
            get
            {
                return _started;
            }
        }
        protected internal void Start()
        {
            if (!_started)
            {
                OnStart();
                _started = true;
            }
        }
        protected internal void Stop()
        {
            if (_started)
            {
                _started = false;
                OnStop();
            }
        }

        protected virtual void OnStart()
        {
        }

        protected virtual void OnStop()
        {
        }

        /// <remarks>
        /// Warning: Unhandled exception here could be fatal. Handle exceptions in this method carefully.
        /// </remarks>
        protected virtual internal void OnUnhandledException(Exception ex, bool terminating)
        {

        }

        protected virtual internal void OnListenerAdded(ListenerEndpoint endpoint)
        {
        }

        protected virtual internal void OnListenerRemoved(ListenerEndpoint endpoint)
        {
        }

        protected virtual internal void OnSubscriptionAdded(SubscriptionEndpoint endpoint)
        {
        }

        protected virtual internal void OnSubscriptionRemoved(SubscriptionEndpoint endpoint)
        {
        }

        protected virtual internal void OnMessageDelivered(MessageDelivery delivery)
        {            
        }

        protected virtual internal void OnMessageDeliveryFailed(MessageDelivery delivery, bool permanent)
        {
        }
    }

ServiceBusRuntime: ServiceBusRuntime is the heart of our ESB. It will handle essential functions like managing worker threads, scheduling deliveries, and providing a way for us to register subscriptions and listeners. Again, we will try to keep the runtime as slim as possible, letting additional functionality be packaged as runtime services. This is a big class, so I'll point out a few significant things. First, multiple threads might be trying to access our service bus at the same time, in fact, the bus itself will host multiple threads of its own so there is a bit of code devoted to keeping things thread-safe. Most of this can be handled by simple lock statements, but when it comes to collections like the subscription list, we need to use a reader/writer lock to make sure that threads don't block each other. A reader/writer lock allows us to have multiple threads accessing an object, but only one thread at a time can be in a write mode, and it must wait till it can gain exclusive access. Additionally, we have provided some helper functions to execute code blocks safely and automatically send unhandled exceptions to the unhandled exception handler to make sure that our runtime won't terminate unexpectedly and to ensure that we don't just swallow exceptions.

    public class ServiceBusRuntime
    {
        public ServiceBusRuntime(MessageDeliveryQueue deliveryQueue, MessageDeliveryQueue retryQueue, MessageDeliveryQueue failureQueue)
        {
            _messageDeliveryQueue = deliveryQueue;
            _retryQueue = retryQueue;            
            _failureQueue = failureQueue;
        }
        
        object _startLock = new object();
        
        List<Thread> _workerThreads = new List<Thread>();
        object _workerThreadsLock = new object();
        
        public void RegisterService(RuntimeService service)
        {
            lock(_startLock)
            {
                if (_starting || _started)
                {
                    throw new InvalidOperationException("Services cannot be registered or unregistered while the bus is running.");
                }
                _runtimeServicesRWLock.EnterWriteLock();
                try
                {
                    _runtimeServices.Add(service);
                    service.SetRuntime(this);    
                }
                finally
                {
                    _runtimeServicesRWLock.ExitWriteLock();
                }
            }
        }
        
        public void UnregisterService(RuntimeService service)
        {
            lock(_startLock)
            {
                if (_starting || _started)
                {
                    throw new InvalidOperationException("Services cannot be registered or unregistered while the bus is running.");
                }

                _runtimeServicesRWLock.EnterWriteLock();
                try
                {
                    _runtimeServices.Remove(service);
                }
                finally
                {
                    _runtimeServicesRWLock.ExitWriteLock();
                }
            }
        }                
        
        public T GetRuntimeService<T>() where T : RuntimeService
        {
            _runtimeServicesRWLock.EnterReadLock();
            try
            {
                foreach(RuntimeService service in _runtimeServices)
                {
                    T s = service as T;
                    if(s != null)
                    {
                        return s;
                    }
                }
            }           
            finally
            {
                _runtimeServicesRWLock.ExitReadLock();
            }
            return null;
        }
        
        public IEnumerable<T> GetRuntimeServices<T>() where T : RuntimeService
        {
            _runtimeServicesRWLock.EnterReadLock();
            try
            {
                List<T> matching = new List<T>();
                foreach(RuntimeService service in _runtimeServices)
                {
                    T match = service as T;
                    if(match != null)
                    {
                        matching.Add(match);
                    }
                }
                return matching;
            }
            finally
            {
                _runtimeServicesRWLock.ExitReadLock();
            }            
        }
        
        List<RuntimeService> _runtimeServices = new List<RuntimeService>();
        ReaderWriterLockSlim _runtimeServicesRWLock = new ReaderWriterLockSlim();
        
        public void Start()
        {
            lock (_startLock)
            {
                _starting = true;
            
                _runtimeServicesRWLock.EnterReadLock();
                try
                {

                    int i = 0;
                    try
                    {
                        for (; i < _runtimeServices.Count; i++)
                        {
                            _runtimeServices[i].Start();
                        }
                    }
                    catch(Exception ex)
                    {
                        DoSafely(() =>
                            {
                                notifyUnhandledException(ex, true);
                            });
                        // try to stop any started services since we couldn't start all of them
                        for (int j = 0; j <= i; j++)
                        {
                            try
                            {
                                _runtimeServices[j].Stop();
                            }
                            catch(Exception ex2)
                            {
                                DoSafely(() =>
                                {
                                    notifyUnhandledException(ex2, true);
                                });
                            }
                        }
                        throw;
                    }
                    addWorker(deliveryWorker, "Delivery worker {0}");
                    addWorker(retryWorker, "Retry worker {0}");

                    InvokeSafely(Started, this, EventArgs.Empty);

                    _started = true;
                }
                catch
                {
                    _runtimeServicesRWLock.ExitReadLock();
                }
            
                finally
                {
                    _starting = false;
                }             
            }
        }
        
        void addWorker(ParameterizedThreadStart start, string name)
        {
            lock(_workerThreadsLock)
            {
                Thread thread = new Thread(start);            
                thread.IsBackground = true;
                int threadIndex = _workerThreads.Count - 1;
                if (name != null)
                {
                    thread.Name = String.Format(name, threadIndex);
                }
                _workerThreads.Add(thread);
                _stopWaitHandles.Add(new AutoResetEvent(false));
                thread.Start(threadIndex);
            }
        }
        void deliveryWorker(object param)
        {
            int threadIndex = (int)param;
            while(true)
            {
                DoSafely(() =>
                {
                    using (TransactionScope ts = new TransactionScope())
                    {
                        DeliverOne(_messageDeliveryQueue, _retryQueue);
                       
                        ts.Complete();
                    }
                });

                if (_stopping)
                {
                    _stopWaitHandles[threadIndex].Set();
                    break;
                }

            }
        }
        
        void retryWorker(object param)
        {
            int threadIndex = (int)param;
            while(true)
            {
                DoSafely(() =>
                {
                    using (TransactionScope ts = new TransactionScope())
                    {
                        DeliverOne(_retryQueue, _retryQueue);
                        ts.Complete();
                    }
                });
                if (_stopping)
                {
                    _stopWaitHandles[threadIndex].Set();
                    break;
                }
                Thread.Sleep(RETRY_SLEEP_MS); 
            }
        }
                
        public bool Stop()
        {
            bool clean = true;
            lock(_startLock)
            {        
                _stopping = true;

                _runtimeServicesRWLock.EnterReadLock();
                try
                {
                    clean = ForEachSafely(_runtimeServices, service =>
                    {
                        service.Stop();
                    });
                }
                finally
                {
                    _runtimeServicesRWLock.ExitReadLock();
                }

                lock(_workerThreadsLock)
                {
                    for(int i = 0; i < _stopWaitHandles.Count; i++)
                    {
                        _stopWaitHandles[i].WaitOne();
                    }
                    _workerThreads.Clear();
                    _stopWaitHandles.Clear();
                }

                _started = false;
            }

            InvokeSafely(Stopped, this, EventArgs.Empty);                
            
            return clean;
        }

        volatile bool _starting;
        volatile bool _started;
        volatile bool _stopping = false;

        List<AutoResetEvent> _stopWaitHandles = new List<AutoResetEvent>();        
         
        public event EventHandler Started;
        public event EventHandler Stopped;        

        public event EventHandler<EndpointEventArgs> Subscribed;
        public event EventHandler<EndpointEventArgs> Unsubscribed;
        
        public event EventHandler<EndpointEventArgs> ListenerAdded;
        public event EventHandler<EndpointEventArgs> ListenerRemoved;
        
        List<Endpoint> _listenerEndpoints = new List<Endpoint>();
        object _listenerEndpointsLock = new object();

        public IEnumerable<Endpoint> ListeningEndpoints
        {
            get
            {
                return _listenerEndpoints.ToArray();
            }
        }
        ReaderWriterLockSlim _subscriptionsRWLock = new ReaderWriterLockSlim();
        List<SubscriptionEndpoint> _subscriptions = new List<SubscriptionEndpoint>();
                        
        public void AddListener(ListenerEndpoint endpoint)
        {
            bool added = false;
            try
            {
                using (TransactionScope ts = new TransactionScope())
                {
                    lock (_listenerEndpointsLock)
                    {
                        _listenerEndpoints.Add(endpoint);
                        added = true;
                    }
                    
                    _runtimeServicesRWLock.EnterReadLock();
                    try
                    {
                        foreach (RuntimeService service in _runtimeServices)
                        {
                            service.OnListenerAdded(endpoint);
                        }
                    }
                    finally
                    {
                        _runtimeServicesRWLock.ExitReadLock();
                    }

                    EventHandler<EndpointEventArgs> listenEvent = ListenerAdded;
                    if (listenEvent != null) listenEvent(this, new EndpointEventArgs(endpoint));

                    ts.Complete();
                }
            }
            catch
            {
                if (added)
                {
                    // remove on failure
                    lock (_listenerEndpointsLock)
                    {
                        _listenerEndpoints.Remove(endpoint);
                    }
                }
                throw;                
            }            
        }
        
        public void RemoveListener(ListenerEndpoint endpoint)
        {
            bool removed = false;
            try
            {
                using(TransactionScope ts = new TransactionScope())
                {
                    lock(_listenerEndpointsLock)
                    {
                        _listenerEndpoints.Remove(endpoint);
                    }

                    _runtimeServicesRWLock.EnterReadLock();
                    try
                    {
                        foreach (RuntimeService service in _runtimeServices)
                        {
                            service.OnListenerRemoved(endpoint);
                        }
                    }
                    finally
                    {
                        _runtimeServicesRWLock.ExitReadLock();
                    }

                    EventHandler<EndpointEventArgs> unlistenEvent = ListenerRemoved;
                    if (unlistenEvent != null) unlistenEvent(this, new EndpointEventArgs(endpoint));

                    ts.Complete();
                }
            }
            catch
            {
                if (removed)
                {
                    // re-add on failure
                    lock (_listenerEndpointsLock)
                    {
                        _listenerEndpoints.Add(endpoint);
                    }
                }
                throw;
            }

        }

        public event UnhandledExceptionEventHandler UnhandledException;
        
        MessageDeliveryQueue _messageDeliveryQueue;        
        MessageDeliveryQueue _retryQueue;
        MessageDeliveryQueue _failureQueue;        
                
        protected void QueueDelivery(SubscriptionEndpoint endpoint, string action, object message)
        {    
            MessageDelivery delivery = new MessageDelivery(endpoint, action, message);
            _messageDeliveryQueue.Enqueue(delivery);
        }

        const int RETRY_SLEEP_MS = 1000;
        const int FUTURE_SLEEP_MS = 1000;

        
        protected void DeliverOne(MessageDeliveryQueue fromQueue, MessageDeliveryQueue retryQueue)
        {
            MessageDelivery delivery = fromQueue.Dequeue(TimeSpan.FromSeconds(1));
            if(delivery != null)
            {
                if (delivery.TimeToProcess != null)
                {
                    int mDelay = (int)(delivery.TimeToProcess.Value - DateTime.Now).TotalMilliseconds;
                    if (mDelay > 0)
                    {
                        System.Diagnostics.Debug.WriteLine("Time to process is " + mDelay + " milliseconds away. Requeuing in " + FUTURE_SLEEP_MS);
                        Thread.Sleep(FUTURE_SLEEP_MS); // Sleep briefly in case we are in a loop of future messages, should be a little smarter
                        fromQueue.Enqueue(delivery);
                        return;
                    }
                }
                
                try
                {
                    Dispatcher dispatcher;
                    _dispatchers.TryGetValue(delivery.Endpoint.DispatcherType, out dispatcher);
                    
                    if(dispatcher != null)
                    {
                        dispatcher.DispatchInternal(delivery);
                        
                        ForEachSafely(_runtimeServices, service=> service.OnMessageDelivered(delivery));                                
                        InvokeSafely(_messageDelivered, this, new MessageDeliveryEventArgs() { MessageDelivery = delivery });                                                    
                    }
                }
                catch(Exception ex)
                {
                    MessageDelivery retryDelivery = delivery.CreateRetry();
                    if (!delivery.RetriesMaxed)
                    {
                        System.Diagnostics.Trace.TraceError("Sending to retry queue due to unhandled exception: " + ex.Message);
                        notifyUnhandledException(ex);
                        retryQueue.Enqueue(retryDelivery);

                        ForEachSafely(_runtimeServices, service => service.OnMessageDeliveryFailed(delivery, false));                            
                        InvokeSafely(_messageDeliveryFailed, this, new MessageDeliveryFailedEventArgs() { MessageDelivery = delivery, Permanent = false });                        
                    }
                    else
                    {
                        System.Diagnostics.Trace.TraceError("Sending to failure queue due to unhandled exception: " + ex.Message);
                        notifyUnhandledException(ex);
                        _failureQueue.Enqueue(retryDelivery);

                        ForEachSafely(_runtimeServices, service => service.OnMessageDeliveryFailed(delivery, true));                            
                        InvokeSafely(_messageDelivered, delivery, true);                                                    
                    }
                }
            }
        }
        
        Dictionary<Type, Dispatcher> _dispatchers = new Dictionary<Type, Dispatcher>();

        /// <summary>
        /// Execute a block of code, passing any unhandled exceptions to the unhandled exception handler
        /// </summary>
        /// <param name="action"></param>
        protected void DoSafely(Action action)
        {
            try
            {
                action();
            }
            catch(Exception ex)
            {
                System.Diagnostics.Trace.TraceError("UNHANDLED EXCEPTION: "+ex.Message);
                notifyUnhandledException(ex);                
            }
        }


        private void notifyUnhandledException(Exception ex)
        {
            notifyUnhandledException(ex, false);
        }
        private void notifyUnhandledException(Exception ex, bool isTerminating)
        {
            // Warning: unhandled exception inside unhandled exception event handlers could cause bad things to happen
            foreach (RuntimeService service in _runtimeServices)
            {
                service.OnUnhandledException(ex, false);
            }

            UnhandledExceptionEventHandler ueHandler = UnhandledException;
            if (ueHandler != null) ueHandler(this, new UnhandledExceptionEventArgs(ex, isTerminating));            
        }

        protected bool DoSafely<T>(Action<T> action, T value)
        {
            bool clean = false;
            try
            {
                action(value);
                clean = true;
            }
            catch (Exception ex)
            {
                UnhandledExceptionEventHandler ueHandler = UnhandledException;
                ueHandler(this, new UnhandledExceptionEventArgs(ex, false));
            }
            return clean;
        }
        protected bool ForEachSafely<T>(IEnumerable<T> collection, Action<T> action)
        {
            bool clean = true;
            foreach (T value in collection)
            {
                clean = clean && DoSafely(action, value);
            }
            return clean;
        }

        protected bool InvokeSafely(Delegate handler, params object[] parameters)
        {
            if(handler == null) return true;
            Delegate[] list = handler.GetInvocationList();
            return ForEachSafely(list, d => d.DynamicInvoke(parameters));
        }
        
        public void Publish(Type contract, string action, object message)
        {                                 
            _subscriptionsRWLock.EnterReadLock();
            try
            {
                using (TransactionScope ts = new TransactionScope())
                {
                    foreach (SubscriptionEndpoint subscription in _subscriptions)
                    {
                        bool include;
                        if (subscription.Filter != null)
                        {
                            include = subscription.Filter.Include(action, message);
                        }
                        else
                        {
                            include = true;
                        }

                        if (include)
                        {
                            QueueDelivery(subscription, action, message);
                        }
                    }
                    ts.Complete();
                }
            }
            finally
            {
                _subscriptionsRWLock.ExitReadLock();
            }
        }
        
        public bool IsListeningFor(Type contractType)
        {
            lock (_listenerEndpoints)
            {
                foreach (ListenerEndpoint le in _listenerEndpoints)
                {
                    if (le.ContractType == contractType)
                    {
                        return true;
                    }
                }
            }
            return false;
        }
                
        public void Subscribe(SubscriptionEndpoint subscription)
        {
            bool added = false;
            try
            {
                using (TransactionScope ts = new TransactionScope())
                {
                    if (!IsListeningFor(subscription.ContractType))
                    {
                        throw new InvalidOperationException("The bus is not currently listening for this type of subscription.");
                    }
                    _subscriptionsRWLock.EnterWriteLock();
                    try
                    {
                        _subscriptions.Add(subscription);
                        added = true;
                    }
                    finally
                    {
                        _subscriptionsRWLock.ExitWriteLock();
                    }

                    _runtimeServicesRWLock.EnterReadLock();
                    try
                    {
                        foreach (RuntimeService service in _runtimeServices)
                        {
                            service.OnSubscriptionAdded(subscription);
                        }
                    }
                    finally
                    {
                        _runtimeServicesRWLock.ExitReadLock();
                    }

                    EventHandler<EndpointEventArgs> subscribedEvent = Subscribed;
                    if (subscribedEvent != null) subscribedEvent(this, new EndpointEventArgs(subscription));

                    lock (_dispatchers)
                    {
                        if (!_dispatchers.ContainsKey(subscription.DispatcherType))
                        {
                            _dispatchers.Add(subscription.DispatcherType, (Dispatcher)Activator.CreateInstance(subscription.DispatcherType));
                        }
                    }

                    ts.Complete();
                }
            }
            catch
            {
                if (added)
                {                    
                    // Remove subscription on failure
                    _subscriptionsRWLock.EnterWriteLock();
                    try
                    {
                        _subscriptions.Remove(subscription);
                    }
                    finally
                    {
                        _subscriptionsRWLock.ExitWriteLock();
                    }                 
                }
                throw;                
            }
        }
        
        public void Unsubscribe(SubscriptionEndpoint subscription)
        {
            bool removed = false;
            try
            {
                using (TransactionScope ts = new TransactionScope())
                {
                    _subscriptionsRWLock.EnterWriteLock();
                    try
                    {
                        _subscriptions.Remove(subscription);
                        removed = true;
                    }
                    finally
                    {
                        _subscriptionsRWLock.ExitWriteLock();
                    }


                    _runtimeServicesRWLock.EnterReadLock();
                    try
                    {
                        foreach (RuntimeService service in _runtimeServices)
                        {
                            service.OnSubscriptionRemoved(subscription);
                        }
                    }
                    finally
                    {
                        _runtimeServicesRWLock.ExitReadLock();
                    }

                    EventHandler<EndpointEventArgs> unsubscribedEvent = Unsubscribed;
                    if (unsubscribedEvent != null) unsubscribedEvent(this, new EndpointEventArgs(subscription));


                    lock (_dispatchers)
                    {
                        _dispatchers.Remove(subscription.ContractType);
                    }

                    ts.Complete();
                }
            }
            catch
            {
                if (removed)
                {
                    // Re-add subscription on failure
                    _subscriptionsRWLock.EnterWriteLock();
                    try
                    {
                        _subscriptions.Add(subscription);
                    }
                    finally
                    {
                        _subscriptionsRWLock.ExitWriteLock();
                    }
                }
                throw;          
            }
        }

        object _messageDeliveredEventLock = new object();
        event EventHandler<MessageDeliveryEventArgs> _messageDelivered;
        public event EventHandler<MessageDeliveryEventArgs> MessageDelivered
        {
            add
            {
                lock (_messageDeliveredEventLock)
                {
                    _messageDelivered += value;
                }
            }
            remove
            {
                lock (_messageDeliveredEventLock)
                {
                    _messageDelivered -= value;
                }
            }
        }

        object _messageDeliveryFailedEventLock = new object();
        event EventHandler<MessageDeliveryFailedEventArgs> _messageDeliveryFailed;
        public event EventHandler<MessageDeliveryFailedEventArgs> MessageDeliveryFailed
        {
            add
            {
                lock (_messageDeliveryFailedEventLock)
                {
                    _messageDeliveryFailed += value;
                }
            }
            remove
            {
                lock (_messageDeliveryFailedEventLock)
                {
                    _messageDeliveryFailed -= value;
                }
            }
        }
    }
            
    
    public class EndpointEventArgs : EventArgs
    {
        public EndpointEventArgs()
        {
        }
        
        public EndpointEventArgs(Endpoint endpoint)
        {
            Endpoint = endpoint;
        }
        
        public Endpoint Endpoint { get; set; }
    }

    public class MessageDeliveryEventArgs : EventArgs
    {
        public MessageDelivery MessageDelivery
        {
            get;
            set;
        }
    }

    public class MessageDeliveryFailedEventArgs : MessageDeliveryEventArgs
    {
        public bool Permanent
        {
            get;
            set;
        }        
    }
}

Now you've seen the guts. We'll jump back up to a higher level next and talk about making our bus work with WCF as we build functionality on top of the core and turn this into a working implementation.

Linked By

Reader Comments

  • shawn says Is this in the Stop method just weird formatting or is there something to it?

    int i = 0;
    for (; i < _runtimeServices.Count; i++)
  • Jesse Ezell says You would do this if you wanted the index to stay around after the for loop. This is useful to know what the last index that was used was in the case of an unhandled exception.

    I updated the sample to clarify the intent by using ForEachSafely. However, I've left Start using this type of for loop, since it tries to roll back any started services if there is an unhandled exception while starting up and it needs to know which index it stopped at to do so.
  • Eyal Lantzman says I would split the ServiceBusRuntime into several partial classes each of them will be responsible for a group of logical commands, ie: ServiceBusRuntimeListner, ServiceBusRuntimePublishSubscribe, ServiceBusRuntimeService etc.
  • Eyal Lantzman says For readability and reusability I would wrap the
    _subscriptionsRWLock.EnterWriteLock();
    try
    {
    ...
    }
    finally { _subscriptionsRWLock.ExitWriteLock();
    }

    with:


    public class RWLock :IDisposable
    {
    private bool _read;
    private ReaderWriterLockSlim _lock;

    private RWLock (ReaderWriterLockSlim rwLock, bool read)
    {
    _read = read;
    _lock = rwLock;

    if (read)
    _lock .EnterReadLock();
    else
    _lock .EnterWriteLock();
    }


    public static RWLock AcquireReadLock(ReaderWriterLockSlim rwLock)
    {
    return RWLock (rwLock,true);
    }

    public static RWLock AcquireReadLock(ReaderWriterLockSlim rwLock)
    {
    return RWLock (rwLock,false);
    }
    public void DIspose()
    {
    if (read)
    _lock .ExitReadLock();
    else
    _lock .ExitWriteLock();
    }

    }
    }

    And the usage will be alot cleaner:
    using (var locker = RWLock.AcquireReadLock(...))
    {
    //do some thing
    }
  • Jesse Ezell says Interesting idea with the wrapper for reader writer lock. I'd probably opt for using a lambda instead of allocating objects every time and creating a new class (semantically it reads a little better I think, like the ForEachSafely blocks), but I think your idea is a great one. Repeated try/finally blocks are always a good canidate for refactoring into something reusable.
Name 
Url
Comments