JavaScript Editor JavaScript Editor     JavaScript Debugger 

Team LiB
Previous Section Next Section

Creating the Work Manager

The work manager follows a similar pattern to the coordination server developed for the Talk .NET system over the last two chapters. Perhaps the most important detail is the information that the work manager stores in its memory, which includes the collections shown here:

Private Workers As New Hashtable()
Private Tasks As New Hashtable()

The Workers collection tracks information about the registered peers and how to reach them using WorkerRecord objects. These objects are indexed by the WorkerID. The Tasks collection holds a collection of Task objects, which represent the ongoing, currently scheduled tasks. Objects in the Tasks collection are indexed by TaskID. To write more error-proof code, you could replace the worker and task hashtables with custom dictionary collections that can only hold WorkerRecord and Task objects respectively. These custom dictionary collections would derive from System.Collections.DictionaryBase.

The work manager can also use private variables to store global preferences. In this fairly simple example, we'll only use one custom setting: an integer that sets the maximum number of workers that can be assigned to a task. This helps to ensure that other workers will be free to serve new requests. It also prevents a task from being broken into so many separate pieces that the communication time begins to become a factor.

Private MaxWorkers As Integer

The MaxWorkers settings is read from a configuration file when the server object is created:

Public Sub New()

    ' Retrieve configuration settings.
    MaxWorkers = Int32.Parse(ConfigurationSettings.AppSettings("MaxWorkers"))
End Sub

For our test, we'll allow three maximum workers:

<?xml version="1.0" encoding="utf-8" ?>
    <add key="MaxWorkers" value="3" />

    <!-- Remoting settings omitted. -->

The work manager also uses the trace component used in the messenger application, which shows a window with trace messages that indicate what actions the server has performed.

Tracking Workers

The server provides an AddWorker() method that allows peers to register themselves in the Workers collection, and a RemoveWorker() method that allows peers to remove themselves. The following is the code for these methods:

Public Function AddWorker(ByVal callback As TaskComponent.ITaskWorker) _
  As System.Guid Implements TaskComponent.ITaskServer.AddWorker

    Dim Worker As New WorkerRecord(callback)
    SyncLock Workers
        Workers(Worker.WorkerID) = Worker
    End SyncLock
    Trace.Write("Added worker " & Worker.WorkerID.ToString())
    Return Worker.WorkerID

End Function

Public Sub RemoveWorker(ByVal workerID As System.Guid) _
  Implements TaskComponent.ITaskServer.RemoveWorker

    SyncLock Workers
    End SyncLock
    Trace.Write("Removed worker " & workerID.ToString())

End Sub

Note that the RemoveWorker() method assumes that the worker has finished all its tasks before exiting. Clearly, it would make sense to add a check to this code that looks for outstanding TaskSegments registered to this worker and tries to reassign them.

Workers are stored as WorkerRecord objects, as shown in the following example. Each worker has a globally unique identifier (GUID), which is generated automatically when the WorkerRecord class is instantiated. This allows workers to be identified uniquely on a network, without needing to assign them preexisting names (like a user alias). It's a technique you'll use again in later peer-to-peer examples in this book.

Public Class WorkerRecord

    Private _WorkerID As Guid = Guid.NewGuid()
    Private _WorkerReference As ITaskWorker
    Private _TaskAssigned As Boolean = False

    Public ReadOnly Property WorkerID() As Guid
            Return _WorkerID
        End Get
    End Property

    Public ReadOnly Property ITaskWorker() As ITaskWorker
            Return _WorkerReference
        End Get
    End Property

    Public Property TaskAssigned() As Boolean
            Return _TaskAssigned
        End Get
        Set(ByVal Value As Boolean)
            _TaskAssigned = Value
        End Set
    End Property

    Public Sub New(ByVal worker As ITaskWorker)
        _WorkerReference = worker
    End Sub

End Class

The WorkerRecord also provides a TaskAssigned property, which is initially set to False. In our simple example, a worker can be assigned at most one task. A more sophisticated worker might be able to hold a queue of task requests and deal with them one by one. In this case, you would replace the TaskAssigned Boolean variable with a TasksAssigned integer count. When assigning a task, the server would look for peers that have the lowest number of assigned tasks first.


When the server receives a TaskRequest, it creates a new Task object. The Task object stores the original Task data, along with additional information, including

  • The GUID, which the Task class generates automatically.

  • A collection that contains WorkerRecords for the workers that are processing the segments of this task.

  • A hashtable with an entry for each TaskSegment result. These entries are indexed by sequence number.

The Task class code is shown here:

Public Class Task

    Private _TaskID As Guid = Guid.NewGuid()

    ' The original task information.
    Private _Request As TaskRequest

    ' Holds WorkerRecord objects.
    Private _WorkersInProgress As New ArrayList()

    ' Holds partial prime lists, indexed by sequence number.
    Private _TaskResults As New Hashtable()

    Public ReadOnly Property TaskID() As Guid
            Return _TaskID
        End Get
    End Property

    Public ReadOnly Property Request() As TaskRequest
            Return _Request
        End Get
    End Property

    Public Property Workers() As ArrayList
            Return _WorkersInProgress
        End Get
        Set(ByVal Value As ArrayList)
            _WorkersInProgress = Value
        End Set
    End Property

    Public Property Results() As Hashtable
            Return _TaskResults
        End Get
        Set(ByVal Value As Hashtable)
            _TaskResults = Value
        End Set
    End Property

    Public Function GetJoinedResults() As Integer()
        ' (Code omitted.)

    End Function

    Public Sub New(ByVal taskRequest As TaskRequest)
        _Request = taskRequest
    End Sub

End Class

The Task class also contains a GetJoinedResults() method that steps through the hashtable or results and combines all values into a large array, which can then be returned to the client. Each entry in the hashtable is an array of primes that represents the solution for part of the original requested range. The code uses the fact that the entries in the results hashtable are indexed by their sequence number. Thus, as long as all the segments are present, they can be reassembled in order by starting with sequence number 0, regardless of the actual order in which the results were received.

Public Function GetJoinedResults() As Integer()

    ' Count the number of primes.
    Dim NumberOfPrimes As Integer
    Dim SegmentResults() As Integer
    Dim i As Integer For i = 0 To _TaskResults.Count - 1
        SegmentResults = CType(_TaskResults(i), Integer())
        NumberOfPrimes += SegmentResults.Length

    ' Create the whole array.
    Dim Results(NumberOfPrimes - 1) As Integer

    ' Combine the partial results, in order.
    Dim Pos As Integer
    For i = 0 To _TaskResults.Count - 1

        SegmentResults = CType(_TaskResults(i), Integer())
        SegmentResults.CopyTo(Results, Pos)
        Pos += SegmentResults.Length

    Return Results
End Function

Dispatching Tasks

The bulk of the work manager logic takes place in the SubmitTask() method, which receives a task request, breaks it into segments, and assigns it. The first step is to examine the request information and verify that it's valid.

' Validate task request.
If taskRequest.FromNumber > taskRequest.ToNumber Then
    Throw New ArgumentException("First number must be smaller than the second.")
End If

Note that the error condition leads to an exception. That means that SubmitTask() shouldn't be implemented as a one-way method, or the client will not receive this information.

Next, the code judges the range of numbers. If the range is very small, it decides to only send the request to one worker. Otherwise, it uses the full number of maximum workers allowed by MaxWorkers.

' Calculate if the task can benefit from parallelism.
Dim TotalRange As Integer = taskRequest.ToNumber - taskRequest.FromNumber
Dim MaxWorkersForTask As Integer
If TotalRange < 10000 Then
    MaxWorkersForTask = 1
    MaxWorkersForTask = MaxWorkers
End If

Depending on your design, it might make most sense to encapsulate the logic for validating a task and evaluating the Task range with dedicated methods in the Task class. This would be particularly useful if you wanted the work manager to manage more than one type of task. In this case, you would create a generic interface (possibly named ITask) that you would implement in all your Task classes.

Assuming these two steps succeed, a new Task object is created.

' Create the task.
Dim Task As New Task(taskRequest)

Next, the code searches for free workers. It attempts to use as many workers as there are available (up to the specified maximum), and it takes the first available workers it finds. This may include the worker making the request, which is perfectly reasonable. The workers are added to the Tasks.Workers collections and immediately marked as assigned.

Dim Worker As WorkerRecord

' This lock ensures that the server won't try to allocate two different
' tasks to the same worker if the requests arrive simultaneously.
SyncLock Workers
    ' Try to find workers for this task.
    Dim Item As DictionaryEntry
    For Each Item In Workers
        Worker = CType(Item.Value, WorkerRecord)
        If Not Worker.TaskAssigned Then
            Worker.TaskAssigned = True
        End If
        If Task.Workers.Count >= MaxWorkersForTask Then Exit For

End SyncLock

Next, a quick check is made to ensure that there's at least one worker, or an exception will be thrown.

If Task.Workers.Count = 0 Then
    Throw New ApplicationException("No free workers. Try again later.")
End If

The work of dividing the task into segments begins next. First, a calculation is made to determine an average range for numbers. For example, if there's a total range of 100,000 and three workers to handle it, the average range is 33,333. The first two workers will receive this range of numbers, while the last will receive everything that remains (in this case 33,334 items). Once the segment is constructed, it's sent asynchronously to the worker by calling the worker's ReceiveTask() method.

Trace.Write("Trying to assign " & Task.Workers.Count.ToString() & _
  " worker(s) for task " & Task.TaskID.ToString())

' Calculate segment sizes.
Dim Segment As TaskSegment
Dim LowerBound As Integer = taskRequest.FromNumber
Dim AverageRange As Integer = Math.Floor(TotalRange / Task.Workers.Count)
Dim i As Integer

' Divide the task into segments, and dispatch each segment.
' This code will be skipped if there's only one segment because
' (WorkersToUse.Count - 2) will equal 0.
Dim ReceiveTask As ReceiveTaskDelegate
For i = 0 To Task.Workers.Count - 2
    Segment = New TaskSegment(Task.TaskID, LowerBound, _
                                LowerBound + AverageRange, i)
    LowerBound += AverageRange + 1
    Worker = CType(Task.Workers(i), WorkerRecord)
    Segment.WorkerID = Worker.WorkerID
    ReceiveTask = New ReceiveTaskDelegate(AddressOf _
    ReceiveTask.BeginInvoke(Segment, Nothing, Nothing)

' Create the last segment to get the remaining numbers.
Segment = New TaskSegment(Task.TaskID, LowerBound, taskRequest.ToNumber, i)
Worker = CType(Task.Workers(Task.Workers.Count - 1), WorkerRecord)
Segment.WorkerID = Worker.WorkerID

ReceiveTask = New ReceiveTaskDelegate(AddressOf Worker.ITaskWorker.ReceiveTask)
ReceiveTask.BeginInvoke(Segment, Nothing, Nothing)

Finally, the Task object is stored in the Tasks collection.

' Store the Task object.
SyncLock Tasks
    Tasks.Add(Task.TaskID, Task)
End SyncLock

Trace.Write("Created and assigned task " & Task.TaskID.ToString() & ".")

Completing Tasks

The work manager's ReceiveTaskComplete() method is the last part of the ITaskServer interface. It receives completed TaskSegment objects, adds them to the corresponding Task (from the in-memory Tasks collection), and then marks the worker as available. If the number of received results equals the number of task segments, the task is declared complete. A notification message is sent to the original task requester with the list of primes, and the task is removed from memory.

Public Sub ReceiveTaskComplete(ByVal taskSegment As TaskSegment, _
  ByVal workerID As System.Guid) _
  Implements TaskComponent.ITaskServer.ReceiveTaskComplete

    Trace.Write("Received result sequence #" & _
      taskSegment.SequenceNumber.ToString() & " for task " & _
      taskSegment.TaskID.ToString() & ".")

    Dim Task As Task = CType(Tasks(taskSegment.TaskID), Task)
    Task.Results.Add(taskSegment.SequenceNumber, taskSegment.Primes)

    ' Free up worker.
    Dim Worker As WorkerRecord = CType(Workers(taskSegment.WorkerID), _
    Worker.TaskAssigned = False

    ' Check if this is the final submission.
    If Task.Results.Count = Task.Workers.Count Then

        SyncLock Tasks
            Trace.Write("Task " & Task.TaskID.ToString() & " completed.")
            Dim Primes() As Integer = Task.GetJoinedResults()
            Dim Results As New TaskResults(Task.Request.FromNumber, _
              Task.Request.ToNumber, Primes)
            Dim ReceiveResults As New ReceiveResultsDelegate( _
              AddressOf Task.Request.Client.ReceiveResults)
            ReceiveResults.BeginInvoke(Results, Nothing, Nothing)

            ' Remove task.
        End SyncLock

    End If

End Sub

You might choose to implement the ReceiveTaskComplete() method as a one-way method for maximum performance because the worker doesn't need to receive any information or exceptions that might be raised on the server.

Team LiB
Previous Section Next Section

JavaScript Editor Free JavaScript Editor     JavaScript Editor