The ServerProcess class contains only a single piece of shared data: the collection of client information. Unfortunately, this is enough to cause trouble because the collection object is not intrinsically thread-safe. If more than one thread attempts to perform work with the collection at the same time, it's possible that the collection won't be properly updated. For example, if two users are registered at the same time, only one update might persist, leading to an unregistered user. Even worse, iterating through a collection isn't a thread-safe operation, which means that trying to register a new user and look up an existing user for a message delivery at the same time could conceivably cause a problem. These errors could be rare, but they're never worth the risk because they tend to grow increasingly more significant and frequent as an application becomes more successful and is used by a larger and larger user base.
Resolving concurrency problems is fairly easy. The Hashtable collection provides a thread-safe wrapper that you can use with a minimum of fuss, or you can take control of the situation yourself with Visual Basic's SyncLock statement. Both of these techniques ensure that only one client can access the collection at a time. However, these approaches can reduce performance. Every time you use locking, you force some code to execute synchronously, meaning that other clients attempting the same task or requiring access to the same resource could be stalled. If your locks are too coarse, held too long, and applied too often, the overall performance of your application may be unacceptable for a large number of users. This is the key compromise with multithreaded programming, and it requires an experienced developer to strike the right balance.
The next few sections show how you can add locking to the ServerProcess class, and how you can do so to minimize the performance overhead.
The easiest methods to deal with are the AddUser() and RemoveUser() methods, which manage the user registration process. There are three ways you could apply a lock, and we'll consider the trade-offs and advantages of each one.
First, you can create what's known as a critical section by locking the entire ServiceProcess object. It looks like this:
Public Sub AddUser(ByVal [alias] As String, ByVal client As ITalkClient) _ Implements TalkComponent.ITalkServer.AddUser SyncLock Me Trace.Write("Added user '" & [alias] & "'") ActiveUsers([alias]) = client End SyncLock End Sub
When a thread hits this patch of code, the SyncLock statement is used to lock the entire ServiceProcess object. That means that no other thread will be able to use ServiceProcess until the first thread completes its task. This is true even if the other thread is calling an innocent, unrelated method that wouldn't pose any threat. Clearly, this coarse lock can create frequent bottlenecks.
A more fine-tuned option is shown in the next example. In this case, only the ActiveUsers collection itself is locked. Other threads can continue working with ServiceProcess, until they hit a line of code that requires the ActiveUsers collection, at which point they'll be stalled:
Public Sub AddUser(ByVal [alias] As String, ByVal client As ITalkClient) _ Implements TalkComponent.ITalkServer.AddUser Trace.Write("Added user '" & [alias] & "'") SyncLock ActiveUsers ActiveUsers([alias]) = client End SyncLock End Sub
SyncLock can only be used with objects, not simple value types such as integers. Because ActiveUsers is a Hashtable object, this technique works perfectly. If an unhandled error occurs inside the SyncLock block, the lock is automatically released.
Note that the lock is only used around the single statement that interacts with the collection. The Trace.Write() method call is not included in the block. This ensures that the lock is held for the shortest possible time, and helps to wring every possible degree of concurrency out of this solution.
Finally, you can accomplish exactly the same thing by using the synchronized wrapper provided by the Hashtable collection as shown here:
Public Sub AddUser(ByVal [alias] As String, ByVal client As ITalkClient) _ Implements TalkComponent.ITalkServer.AddUser Trace.Write("Added user '" & [alias] & "'") Dim SynchronizedCollection As Hashtable SynchronizedCollection = Hashtable.Synchronized(ActiveUsers) SynchronizedCollection([alias]) = client End Sub
The synchronized wrapper returned by the Hashtable.Synchronized() method is identical to the original Hashtable in every respect except for the fact that it wraps all its methods with locking statements to prevent concurrency problems. Thus, the previous code sample is equivalent to manually locking the collection.
In some cases, it's better to manually lock the collection yourself rather than use the synchronized wrapper. This is primarily the case if you need to perform several tasks with the collection, one after the other. In this case, it will be better to lock the object once and perform the work, rather than use the synchronized wrapper, which will obtain and release the lock with each individual method call.
Either one of these approaches provides a good solution for the AddUser() and RemoveUser() methods, because they typically execute quite quickly and hold the lock for mere fractions of a second. However, it's still possible to coax a little more performance from your locking code by using the System.Threading.ReaderWriterLock class. This class allows you to create a lock that permits only one user to write to the collection at a time, but allows multiple users to read from it. By implementing this design, you could protect the AddUser() and RemoveUser() methods without locking out the harmless GetUsers() method.
To implement reader and writing locking, you must first create a member variable in the ServerProcess class that represents the lock:
Private ActiveUsersLock As New ReaderWriterLock()
In the GetUsers() method, you would acquire a reader lock by using the AcquireReaderLock() method. This method accepts a TimeSpan object that represents the interval of time to wait while attempting to acquire the lock before giving up. You can use –1 to specify an infinite wait, meaning the code won't time out (although the network connection eventually will, if the lock is never obtained). In this case, we specify a more reasonable maximum of one minute. If the lock is not acquired within this time period, an exception will be thrown.
Public Function GetUsers() As System.Collections.ICollection _ Implements TalkComponent.ITalkServer.GetUsers ActiveUsersLock.AcquireReaderLock(TimeSpan.FromMinutes(1)) Return ActiveUsers.Keys ActiveUsersLock.ReleaseReaderLock() End Function
Public Sub AddUser(ByVal [alias] As String, ByVal client As ITalkClient) _ Implements TalkComponent.ITalkServer.AddUser Trace.Write("Added user '" & [alias] & "'") ActiveUsersLock.AcquireWriterLock(TimeSpan.FromMinutes(1)) ActiveUsers[alias] = client ActiveUsersLock.ReleaseWriterLock() End Sub
Now multiple users can call the GetUsers() method and read from the collection at the same time without causing an error. However, if the AddUser() or RemoveUser() method is executed, an exclusive lock will be required, which will temporarily prevent any other read or write operation.
Remember, when using the ReaderWriterLock class, you should make sure to explicitly release the lock if an exception occurs after you acquire it.
Synchronizing the collection access with AddUser() and RemoveUser() is straightforward, once you understand a few threading concepts. Doing the same with the message delivery isn't quite as easy. In an average system, the number of messages will be quite high. It's not practical to lock the user collection each time you need to search for a message recipient, because the entire system could shudder to a standstill.
Another option is to use a dedicated delivery service that runs on a separate thread, routing messages as needed. This delivery service wouldn't use the ActiveUsers collection but rather a recent copy of the collection. This reduces thread contention, which occurs when multiple clients try to grab the same resource, and some are left waiting. Best of all, the delivery service will operate on a different thread from the pool of threads used to handle incoming requests. This ensures that the server won't become a bottleneck, even if there's a measurable delay required in order to contact a remote client and transmit a message.
The delivery service should have the following basic skeleton:
Public Class DeliveryService ' Stores a copy of the ActiveUsers collection. Private RegisteredUsers As New Hashtable() ' Stores messages that haven't been delivered yet. Private Messages As New Queue() ' Adds a message to the queue. Public Sub RegisterMessage(ByVal message As Message) ' (Code omitted.) End Sub ' Updates the user list. Public Sub UpdateUsers(ByVal users As Hashtable) ' (Code omitted.) End Sub ' Keep the thread active as long as there are messages. ' After that, suspend it. Public Sub Deliver() ' (Code omitted.) End Sub ' Look up the remote client and send the message. Private Sub DeliverMessages() ' (Code omitted.) End Sub End Class
In this example, messages are stored in a Queue object. Queues are first-in-first-out (FIFO) collections. Using a queue ensures that messages are dealt with in the order that they're received, and none are delayed unreasonably.
The RegisterMessage() and UpdateUsers() methods are quite straightforward and need simple locking code to ensure that no concurrency errors will occur as messages are registered or the user list is updated:
Public Sub RegisterMessage(ByVal message As Message) SyncLock Messages Messages.Enqueue(message) End SyncLock End Sub Public Sub UpdateUsers(ByVal users As Hashtable) SyncLock (RegisteredUsers) RegisteredUsers = users End SyncLock End Sub
Public Class Message Private _SenderAlias As String Private _RecipientAlias As String Private _MessageBody As String Public Property SenderAlias() As String Get Return _SenderAlias End Get Set(ByVal Value As String) _SenderAlias = Value End Set End Property Public Property RecipientAlias() As String Get Return _RecipientAlias End Get Set(ByVal Value As String) _RecipientAlias = Value End Set End Property Public Property MessageBody() As String Get Return _MessageBody End Get Set(ByVal Value As String) _MessageBody = Value End Set End Property Public Sub New(ByVal sender As String, ByVal recipient As String, _ ByVal body As String) Me.SenderAlias = sender Me.RecipientAlias = recipient Me.MessageBody = body End Sub End Class
The message delivery is performed in the DeliverMessages() method, while the Deliver() method keeps the thread alive, looping continuously, and calling DeliverMessages() if there are items in the Messages queue. Remember, once a thread completes, it cannot be resurrected. The only way to keep the message delivery thread alive is to use a loop in the Deliver() method and explicitly suspend the thread when there's no work to do.
Public Sub Deliver() Do Trace.Write("Starting message delivery") DeliverMessages() ' Processing is complete. The thread can be put on hold. Trace.Write("Suspending thread") Thread.CurrentThread.Suspend() Loop End Sub
Another option would be to use some sort of timer to periodically scan the Messages queue and deliver messages. However, this could lead to latency when delivering messages. If your timer fires every five seconds, for example, messages may take over five seconds to be transmitted to their destination. Also, you would need to manually disable the timer while a message deliver was in process, and re-enable it afterwards. Similar logic can be accomplished more efficiently using threads.
The majority of the work takes place in the DeliverMessages() method. The Messages collection is locked only to retrieve the next message object, by calling Dequeue(). Calling this method retrieves the Message object and removes it from the queue. The RegisteredUsers collection is locked during the lookup operation.
Private Sub DeliverMessages() Do While Messages.Count > 0 Trace.Write("Retrieving next message") Dim NextMessage As Message SyncLock Messages NextMessage = CType(Messages.Dequeue(), Message) End SyncLock Dim Recipient As ITalkClient Dim MessageBody As String Dim Sender As String ' Look up the recipient. SyncLock RegisteredUsers If RegisteredUsers.ContainsKey(NextMessage.RecipientAlias) Then Recipient = CType(RegisteredUsers(NextMessage.RecipientAlias), _ ITalkClient) MessageBody = NextMessage.MessageBody Sender = NextMessage.SenderAlias Else ' User wasn't found. Try to find the sender. If RegisteredUsers.ContainsKey(NextMessage.SenderAlias) Then Recipient = CType(RegisteredUsers(NextMessage.SenderAlias), _ ITalkClient) MessageBody = "'" & NextMessage.MessageBody & _ "' could not be delivered." Sender = "Talk .NET" Else ' Both sender and recipient were not found. ' Ignore this message. End If End If End SyncLock ' Deliver the message. If Not Recipient Is Nothing Then Trace.Write("Performing message delivery callback") Dim callback As New ReceiveMessageCallback(AddressOf _ Recipient.ReceiveMessage) Try callback.BeginInvoke(MessageBody, Sender, Nothing, Nothing) Catch Err As Exception Trace.Write("Message delivery failed") End Try End If Loop End Sub
Error handling is mandatory in the DeliverMessages() method. Because this method isn't directly called by the client, exceptions will not propagate to the user-interface level. Any problems will simply derail the delivery thread, halting all message delivery.
The threading used here is quite efficient. Because the RegisteredUsers collection is only updated periodically, and because there's only ever one delivery operation running at a time on this thread, there's little likelihood of thread contention (when one thread needs to wait for another one to finish using a resource and release its lock). The same is true of the Messages collection, which is only locked briefly to retrieve or add a message.
To start using the new delivery service, you'll need to modify the server code. The first step is to create two additional member variables in the ServerProcess class: MessageDelivery and DeliveryThread. MessageDelivery stores a reference to an instance of the DeliveryService class, and DeliveryThread references the System.Threading.Thread object where it executes.
Public Class ServerProcess Inherits MarshalByRefObject Implements ITalkServer ' The object used for delivering messages. Private MessageDelivery As New DeliveryService() ' The thread where the message delivery takes place. Private DeliveryThread As New Thread(AddressOf MessageDelivery.Deliver) Public Sub New() MyBase.New() DeliveryThread.IsBackground = True End Sub ' (Other code omitted.) End Class
When the ServerProcess is first created, the delivery thread is configured to run in the background. This means that it will automatically be aborted when the ServerProcess thread is destroyed. You could also use the ServerProcess constructor to configure the priority of the delivery thread.
ServerProcess also needs to update the DeliveryService.RegisteredUsers collection periodically. One possibility is to update this copy of the collection only when a user is added to the collection. At this point the server clones a new copy of the user collection, and submits it to the delivery service. This ensures that the delivery service can always locate message recipients. It also doesn't use much additional memory, because the duplicate collection actually references the same set of ITalkClient objects. It's only the memory references that are actually duplicated.
Public Sub AddUser(ByVal [alias] As String, ByVal client As ITalkClient) _ Implements TalkComponent.ITalkServer.AddUser Trace.Write("Added user '" & [alias] & "'") SyncLock ActiveUsers ActiveUsers([alias]) = client MessageDelivery.UpdateUsers(ActiveUsers.Clone()) End SyncLock End Sub
There's not much point to refresh the collection when users are removed because this won't help the delivery service, and it will increase the potential for thread contention. Note that it's not necessary to lock the DeliveryService.Registered Users collection because the DeliveryService.UpdateUsers() method performs this step on its own.
The ServerProcess.SendMessage() method also needs to change. It will no longer send the message directly. Instead, it will just submit the message to the delivery service.
Public Sub SendMessage(ByVal senderAlias As String, _ ByVal recipientAlias As String, ByVal message As String) _ Implements TalkComponent.ITalkServer.SendMessage ' Register the message. Trace.Write("Queuing message to '" & recipientAlias & "_ ' from '" & senderAlias & "'") Dim NewMessage As New Message(senderAlias, recipientAlias, message) MessageDelivery.RegisterMessage(NewMessage) ' Resume the thread if needed. If (DeliveryThread.ThreadState And ThreadState.Unstarted) = _ ThreadState.Unstarted Then Trace.Write("Start delivery thread") DeliveryThread.Start() ElseIf (DeliveryThread.ThreadState And ThreadState.Suspended) = _ ThreadState.Suspended Then Trace.Write("Resuming delivery thread") DeliveryThread.Resume() End If End Sub
Once the message is queued, the status of the thread is checked. It's then started for this first time, if needed, or unsuspended. If the thread is already actively delivering messages, it will not suspend itself. Instead, it will pick up the new message as soon as it finishes delivering all the others.
Figure 5-1 and Figure 5-2 show two different views of this process. Figure 5-1 shows the interaction of the DeliveryService and the ServiceProcess objects. Figure 5-2 shows the threading picture (where the code is executed). As you can see, when ServiceProcess calls DeliveryService the code executes on the same thread. This is why synchronization code is needed: to prevent the Remoting threads from conflicting with the delivery process.
Finally, Figure 5-3 shows the typical trace output after sending messages between clients.
We've haven't discussed some of the other potential hurdles of multithreaded programming, including deadlocking and thread starvation. That's because these problems are unlikely to occur in Talk .NET. Deadlocks only appear when you're attempting to acquire locks on multiple objects at the same time or when objects are trying to obtain locks on each other. The end result is a stand-off where multiple segments of code wait for each other to surrender the lock they desire. Contrary to what some programmers may have told you, deadlocks aren't always that difficult to avoid. The best advice is to never hold more than one lock at a time, and to use fine-grained locks instead of coarse-grained critical sections. If you really must obtain multiple locks at once, always make sure that you obtain them in the same order. Finally, if you're writing some really intricate threading code, you would do well to master some of the more advanced classes in the System.Threading namespace. For example, using the Monitor class, you can write intelligent threading code that prevents deadlocks by releasing all locks if it can't complete its task.
A more realistic danger is thread starvation, the condition that occurs when you have too many threads competing for the CPU, and some threads never have the processor's attention for long enough to complete some reasonable work. This problem most often occurs when you create too many threads, so that the operating system wastes a large amount of time tracking, scheduling, and splicing from one thread to another. In the current delivery service, this isn't a problem because only one additional thread is created and this thread is reused for all message delivery operations. In the next section, however, you'll learn about an alternate design in which thread starvation is a real possibility and you'll see how the ThreadPool class can reduce the risk dramatically.
The delivery service design presented here will typically work very well, but it isn't the only option. Another solution is to create multiple threads to handle the message delivery. This design is possible because each message delivery is a separate operation. Using multiple threads allows the delivery of multiple messages to be performed asynchronously, potentially increasing delivery times if the system is large and networking delays are significant. But it also requires more memory, because each thread will have its own copy of the collection of registered users. In a real-world application, you would probably test both approaches with a scaled-down, automated version of the application before you begin coding the full solution.
The basic operation of the system is shown in Figure 5-4. The idea is that a thread is created every time a message needs to be delivered.
If you implemented this design by using the System.Threading.Thread class, you would quickly run into a few terrible headaches. The overhead of creating and destroying threads would waste valuable server resources, and the system would perform poorly under heavy user loads because it would create far too many threads for the CPU to track and manage effectively. Instead, most of the computer's resources would be dedicated to tracking and scheduling threads, and the system would collapse under its own weight.
Luckily, there's a better approach: using a thread pool. A thread pool is a dedicated group of threads that are reused indefinitely (in much the same way the Remoting infrastructure uses threads to handle user requests). The advantages of thread pools include the following:
Threads are only created once, so the overhead of creating and destroying threads is negligible.
Several operations can complete at the same time. With Remoting, this means that other messages won't be stalled while the delivery service attempts to contact a disconnected client.
Thread pools multiplex a large number of requests to a small number of threads (typically about 25). This ensures that the system never creates more threads than it can handle.
You can create a thread pool system on your own, but you'll need significant code to monitor the threads and distribute the work appropriately. Thankfully,.NET provides a simple thread pool through the System.Thread.ThreadPool class. Using the ThreadPool class is easy—the only disadvantages are threefold: you lack any way to configure how many threads it uses; you can't set relative priorities; and you can't cancel tasks after they have been submitted. By default, the ThreadPool allocates about 25 threads per CPU.
To perform a task asynchronously with the ThreadPool, simply use the static QueueUserWorkItem() method with a delegate that points to the method that should be executed.
This schedules the task. When there is a free thread, the CLR will use it to execute the specified code.
To use the ThreadPool class with Talk .NET, you would first simplify the DeliveryService class:
Public Class DeliveryService Private RegisteredUsers As Hashtable Private NextMessage As Message Public Sub New (ByVal users As Hashtable, ByVal nextMessage As Message) RegisteredUsers = users NextMessage = nextMessage End Sub Public Sub DeliverMessage(state As Object) ' Deliver the message. Dim Recipient As ITalkClient Dim MessageBody As String Dim Sender As String ' There's no need to lock anything, because no other part of the ' application will communicate with this class once it is started. If RegisteredUsers.ContainsKey(NextMessage.RecipientAlias) Then Recipient = CType(RegisteredUsers(NextMessage.RecipientAlias), _ ITalkClient) MessageBody = NextMessage.MessageBody Sender = NextMessage.SenderAlias Else ' User wasn't found. Try to find the sender. If RegisteredUsers.ContainsKey(NextMessage.SenderAlias) Then Recipient = CType(RegisteredUsers(NextMessage.SenderAlias), _ ITalkClient) MessageBody = "'" & NextMessage.MessageBody & _ "' could not be delivered." Sender = "Talk .NET" Else ' Both sender and recipient were not found. ' Ignore this message. End If End If ' Deliver the message. If Not Recipient Is Nothing Then Trace.Write("Performing message delivery callback") Dim callback As New ReceiveMessageCallback(AddressOf _ Recipient.ReceiveMessage) Try SyncLock Recipient callback.BeginInvoke(MessageBody, Sender, Nothing, Nothing) End SyncLock Catch Err As Exception Trace.Write("Message delivery failed") End Try End If End Sub End Class
There's no longer any need to provide a form-level reference to the delivery object and thread in the ServerProcess class (although you could store this information in a collection, if needed). The ServerProcess.SendMessage() method creates a new DeliveryService object and queues it with the thread pool.
Public Sub SendMessage(ByVal senderAlias As String, _ ByVal recipientAlias As String, ByVal message As String) _ Implements TalkComponent.ITalkServer.SendMessage Dim NewMessage As New Message(senderAlias, recipientAlias, message) Dim NewDelivery As New DeliveryService(ActiveUsers.Clone(), NewMessage) Trace.Write("Queuing message to '" & recipientAlias & "_ ' from '" & senderAlias & "'") ThreadPool.QueueUserWorkItem(NewDelivery.DeliverMessage) End Sub
In this example, each thread is given a separate copy of the user collection. However, you must still lock the ITalkClient object before you attempt to send a message, to prevent a problem that could occur if more than one delivery thread tries to send a message to the same user at the same time. Remember, when you clone a collection, it still contains the same objects.
There's only one such ThreadPool per application domain, so if you use it in more than one part of your application, all work items will be constrained to the set number of threads.
Currently, neither of these examples go the extra length to remove a client when message delivery fails. In these multithreaded examples, this step isn't as easy as it was in the nonthreaded version of Talk .NET. The problem is that it's not enough to remove the user from the DeliveryService copy of the collection—if you do, it will simply reappear the next time the collection is copied over, and it won't affect the contact list downloaded by the clients. Instead, the DeliveryService class needs to call the ServerProcess.RemoveUser() method to make sure the central collection is modified.
In order to add this functionality, you need to create a DeliveryService class that stores a reference to the ServerProcess.
You could set this reference in the DeliveryService constructor. Then, you can use this reference to call RemoveUser() as needed:
Try callback.BeginInvoke(MessageBody, Sender, Nothing, Nothing) Catch Err As Exception Trace.Write("Message delivery failed") Server.RemoveUser(Recipient) End Try