The work manager follows a similar pattern to the coordination server developed for the Talk .NET system over the last two chapters. Perhaps the most important detail is the information that the work manager stores in its memory, which includes the collections shown here:
Private Workers As New Hashtable() Private Tasks As New Hashtable()
The Workers collection tracks information about the registered peers and how to reach them using WorkerRecord objects. These objects are indexed by the WorkerID. The Tasks collection holds a collection of Task objects, which represent the ongoing, currently scheduled tasks. Objects in the Tasks collection are indexed by TaskID. To write more error-proof code, you could replace the worker and task hashtables with custom dictionary collections that can only hold WorkerRecord and Task objects respectively. These custom dictionary collections would derive from System.Collections.DictionaryBase.
The work manager can also use private variables to store global preferences. In this fairly simple example, we'll only use one custom setting: an integer that sets the maximum number of workers that can be assigned to a task. This helps to ensure that other workers will be free to serve new requests. It also prevents a task from being broken into so many separate pieces that the communication time begins to become a factor.
Private MaxWorkers As Integer
The MaxWorkers settings is read from a configuration file when the server object is created:
Public Sub New() MyBase.New() ' Retrieve configuration settings. MaxWorkers = Int32.Parse(ConfigurationSettings.AppSettings("MaxWorkers")) End Sub
For our test, we'll allow three maximum workers:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings> <add key="MaxWorkers" value="3" /> </appSettings> <system.runtime.remoting> <!-- Remoting settings omitted. --> </system.runtime.remoting> </configuration>
The work manager also uses the trace component used in the messenger application, which shows a window with trace messages that indicate what actions the server has performed.
The server provides an AddWorker() method that allows peers to register themselves in the Workers collection, and a RemoveWorker() method that allows peers to remove themselves. The following is the code for these methods:
Public Function AddWorker(ByVal callback As TaskComponent.ITaskWorker) _ As System.Guid Implements TaskComponent.ITaskServer.AddWorker Dim Worker As New WorkerRecord(callback) SyncLock Workers Workers(Worker.WorkerID) = Worker End SyncLock Trace.Write("Added worker " & Worker.WorkerID.ToString()) Return Worker.WorkerID End Function Public Sub RemoveWorker(ByVal workerID As System.Guid) _ Implements TaskComponent.ITaskServer.RemoveWorker SyncLock Workers Workers.Remove(workerID) End SyncLock Trace.Write("Removed worker " & workerID.ToString()) End Sub
Note that the RemoveWorker() method assumes that the worker has finished all its tasks before exiting. Clearly, it would make sense to add a check to this code that looks for outstanding TaskSegments registered to this worker and tries to reassign them.
Workers are stored as WorkerRecord objects, as shown in the following example. Each worker has a globally unique identifier (GUID), which is generated automatically when the WorkerRecord class is instantiated. This allows workers to be identified uniquely on a network, without needing to assign them preexisting names (like a user alias). It's a technique you'll use again in later peer-to-peer examples in this book.
Public Class WorkerRecord Private _WorkerID As Guid = Guid.NewGuid() Private _WorkerReference As ITaskWorker Private _TaskAssigned As Boolean = False Public ReadOnly Property WorkerID() As Guid Get Return _WorkerID End Get End Property Public ReadOnly Property ITaskWorker() As ITaskWorker Get Return _WorkerReference End Get End Property Public Property TaskAssigned() As Boolean Get Return _TaskAssigned End Get Set(ByVal Value As Boolean) _TaskAssigned = Value End Set End Property Public Sub New(ByVal worker As ITaskWorker) _WorkerReference = worker End Sub End Class
The WorkerRecord also provides a TaskAssigned property, which is initially set to False. In our simple example, a worker can be assigned at most one task. A more sophisticated worker might be able to hold a queue of task requests and deal with them one by one. In this case, you would replace the TaskAssigned Boolean variable with a TasksAssigned integer count. When assigning a task, the server would look for peers that have the lowest number of assigned tasks first.
When the server receives a TaskRequest, it creates a new Task object. The Task object stores the original Task data, along with additional information, including
The GUID, which the Task class generates automatically.
A collection that contains WorkerRecords for the workers that are processing the segments of this task.
A hashtable with an entry for each TaskSegment result. These entries are indexed by sequence number.
The Task class code is shown here:
Public Class Task Private _TaskID As Guid = Guid.NewGuid() ' The original task information. Private _Request As TaskRequest ' Holds WorkerRecord objects. Private _WorkersInProgress As New ArrayList() ' Holds partial prime lists, indexed by sequence number. Private _TaskResults As New Hashtable() Public ReadOnly Property TaskID() As Guid Get Return _TaskID End Get End Property Public ReadOnly Property Request() As TaskRequest Get Return _Request End Get End Property Public Property Workers() As ArrayList Get Return _WorkersInProgress End Get Set(ByVal Value As ArrayList) _WorkersInProgress = Value End Set End Property Public Property Results() As Hashtable Get Return _TaskResults End Get Set(ByVal Value As Hashtable) _TaskResults = Value End Set End Property Public Function GetJoinedResults() As Integer() ' (Code omitted.) End Function Public Sub New(ByVal taskRequest As TaskRequest) _Request = taskRequest End Sub End Class
The Task class also contains a GetJoinedResults() method that steps through the hashtable or results and combines all values into a large array, which can then be returned to the client. Each entry in the hashtable is an array of primes that represents the solution for part of the original requested range. The code uses the fact that the entries in the results hashtable are indexed by their sequence number. Thus, as long as all the segments are present, they can be reassembled in order by starting with sequence number 0, regardless of the actual order in which the results were received.
Public Function GetJoinedResults() As Integer() ' Count the number of primes. Dim NumberOfPrimes As Integer Dim SegmentResults() As Integer Dim i As Integer For i = 0 To _TaskResults.Count - 1 SegmentResults = CType(_TaskResults(i), Integer()) NumberOfPrimes += SegmentResults.Length Next ' Create the whole array. Dim Results(NumberOfPrimes - 1) As Integer ' Combine the partial results, in order. Dim Pos As Integer For i = 0 To _TaskResults.Count - 1 SegmentResults = CType(_TaskResults(i), Integer()) SegmentResults.CopyTo(Results, Pos) Pos += SegmentResults.Length Next Return Results End Function
The bulk of the work manager logic takes place in the SubmitTask() method, which receives a task request, breaks it into segments, and assigns it. The first step is to examine the request information and verify that it's valid.
' Validate task request. If taskRequest.FromNumber > taskRequest.ToNumber Then Throw New ArgumentException("First number must be smaller than the second.") End If
Note that the error condition leads to an exception. That means that SubmitTask() shouldn't be implemented as a one-way method, or the client will not receive this information.
Next, the code judges the range of numbers. If the range is very small, it decides to only send the request to one worker. Otherwise, it uses the full number of maximum workers allowed by MaxWorkers.
' Calculate if the task can benefit from parallelism. Dim TotalRange As Integer = taskRequest.ToNumber - taskRequest.FromNumber Dim MaxWorkersForTask As Integer If TotalRange < 10000 Then MaxWorkersForTask = 1 Else MaxWorkersForTask = MaxWorkers End If
Depending on your design, it might make most sense to encapsulate the logic for validating a task and evaluating the Task range with dedicated methods in the Task class. This would be particularly useful if you wanted the work manager to manage more than one type of task. In this case, you would create a generic interface (possibly named ITask) that you would implement in all your Task classes.
Assuming these two steps succeed, a new Task object is created.
' Create the task. Dim Task As New Task(taskRequest)
Next, the code searches for free workers. It attempts to use as many workers as there are available (up to the specified maximum), and it takes the first available workers it finds. This may include the worker making the request, which is perfectly reasonable. The workers are added to the Tasks.Workers collections and immediately marked as assigned.
Dim Worker As WorkerRecord ' This lock ensures that the server won't try to allocate two different ' tasks to the same worker if the requests arrive simultaneously. SyncLock Workers ' Try to find workers for this task. Dim Item As DictionaryEntry For Each Item In Workers Worker = CType(Item.Value, WorkerRecord) If Not Worker.TaskAssigned Then Worker.TaskAssigned = True Task.Workers.Add(Worker) End If If Task.Workers.Count >= MaxWorkersForTask Then Exit For Next End SyncLock
Next, a quick check is made to ensure that there's at least one worker, or an exception will be thrown.
If Task.Workers.Count = 0 Then Throw New ApplicationException("No free workers. Try again later.") End If
The work of dividing the task into segments begins next. First, a calculation is made to determine an average range for numbers. For example, if there's a total range of 100,000 and three workers to handle it, the average range is 33,333. The first two workers will receive this range of numbers, while the last will receive everything that remains (in this case 33,334 items). Once the segment is constructed, it's sent asynchronously to the worker by calling the worker's ReceiveTask() method.
Trace.Write("Trying to assign " & Task.Workers.Count.ToString() & _ " worker(s) for task " & Task.TaskID.ToString()) ' Calculate segment sizes. Dim Segment As TaskSegment Dim LowerBound As Integer = taskRequest.FromNumber Dim AverageRange As Integer = Math.Floor(TotalRange / Task.Workers.Count) Dim i As Integer ' Divide the task into segments, and dispatch each segment. ' This code will be skipped if there's only one segment because ' (WorkersToUse.Count - 2) will equal 0. Dim ReceiveTask As ReceiveTaskDelegate For i = 0 To Task.Workers.Count - 2 Segment = New TaskSegment(Task.TaskID, LowerBound, _ LowerBound + AverageRange, i) LowerBound += AverageRange + 1 Worker = CType(Task.Workers(i), WorkerRecord) Segment.WorkerID = Worker.WorkerID ReceiveTask = New ReceiveTaskDelegate(AddressOf _ Worker.ITaskWorker.ReceiveTask) ReceiveTask.BeginInvoke(Segment, Nothing, Nothing) Next ' Create the last segment to get the remaining numbers. Segment = New TaskSegment(Task.TaskID, LowerBound, taskRequest.ToNumber, i) Worker = CType(Task.Workers(Task.Workers.Count - 1), WorkerRecord) Segment.WorkerID = Worker.WorkerID ReceiveTask = New ReceiveTaskDelegate(AddressOf Worker.ITaskWorker.ReceiveTask) ReceiveTask.BeginInvoke(Segment, Nothing, Nothing)
Finally, the Task object is stored in the Tasks collection.
' Store the Task object. SyncLock Tasks Tasks.Add(Task.TaskID, Task) End SyncLock Trace.Write("Created and assigned task " & Task.TaskID.ToString() & ".")
The work manager's ReceiveTaskComplete() method is the last part of the ITaskServer interface. It receives completed TaskSegment objects, adds them to the corresponding Task (from the in-memory Tasks collection), and then marks the worker as available. If the number of received results equals the number of task segments, the task is declared complete. A notification message is sent to the original task requester with the list of primes, and the task is removed from memory.
Public Sub ReceiveTaskComplete(ByVal taskSegment As TaskSegment, _ ByVal workerID As System.Guid) _ Implements TaskComponent.ITaskServer.ReceiveTaskComplete Trace.Write("Received result sequence #" & _ taskSegment.SequenceNumber.ToString() & " for task " & _ taskSegment.TaskID.ToString() & ".") Dim Task As Task = CType(Tasks(taskSegment.TaskID), Task) Task.Results.Add(taskSegment.SequenceNumber, taskSegment.Primes) ' Free up worker. Dim Worker As WorkerRecord = CType(Workers(taskSegment.WorkerID), _ WorkerRecord) Worker.TaskAssigned = False ' Check if this is the final submission. If Task.Results.Count = Task.Workers.Count Then SyncLock Tasks Trace.Write("Task " & Task.TaskID.ToString() & " completed.") Dim Primes() As Integer = Task.GetJoinedResults() Dim Results As New TaskResults(Task.Request.FromNumber, _ Task.Request.ToNumber, Primes) Dim ReceiveResults As New ReceiveResultsDelegate( _ AddressOf Task.Request.Client.ReceiveResults) ReceiveResults.BeginInvoke(Results, Nothing, Nothing) ' Remove task. Tasks.Remove(Task.TaskID) End SyncLock End If End Sub
You might choose to implement the ReceiveTaskComplete() method as a one-way method for maximum performance because the worker doesn't need to receive any information or exceptions that might be raised on the server.