Thread structure

Earlier versions of Poly/ML have provided a form of concurrent execution through the Process structure. Version 5.1 introduces new thread primitives in the Thread structure. This structure is modelled on the Posix thread (pthread) package but simplified and modified for ML. The aim is to provide an efficient implementation of parallelism particularly to enable ML programs to make use of multi-core processors while minimising the changes needed to existing code. The Process structure will continue to be available as a library written on top of these primitives but new programs should use the Thread structure directly.

The thread package differs from pthreads in a number of ways. There is no join function to wait for the completion of a thread. This can be written using mutexes and condition variables. Cancellation and signal handling are combined into the interrupt functions. (The Poly/ML Signal structure handles signals for all the threads together). The effect of explicit cancellation is achieved using the interrupt function. This causes an interrupt to be generated in a specific thread. Alternatively an interrupt can be broadcast to all threads. This is most likely to be used interactively to kill threads that appear to have gone out of control. The normal top-level handler for a console interrupt will generate this. Threads can choose how or whether they respond to these interrupts. A thread that is doing processor-intensive work probably needs to be able to be interrupted asynchronously whereas if it is communicating with other threads the presence of asynchronous interrupts makes correct programming difficult.

signature THREAD =
sig
    exception Thread of string
    
    structure Thread:
    sig
        type thread;
        
        datatype threadAttribute =
            EnableBroadcastInterrupt of bool
        |   InterruptState of interruptState
        |   MaximumMLStack of int option (* Added in 5.5.3 *) 
        
        and interruptState =
            InterruptDefer
        |   InterruptSynch
        |   InterruptAsynch
        |   InterruptAsynchOnce
        
        val fork: (unit->unit) * threadAttribute list -> thread
        val exit: unit -> unit
        val isActive: thread -> bool
        
        val equal: thread * thread -> bool
        val self: unit -> thread
        
        exception Interrupt
        val interrupt: thread -> unit
        val broadcastInterrupt: unit -> unit
        val testInterrupt: unit -> unit
        val kill: thread -> unit
       
        val getLocal: 'a Universal.tags -> 'a option
        val setLocal: 'a Universal.tag * 'a -> unit
        
        val setAttributes: threadAttribute list -> unit
        val getAttributes: unit -> threadAttribute list

        val numProcessors: unit -> int
    end

    structure Mutex:
    sig
        type mutex
        val mutex: unit -> mutex
        val lock: mutex -> unit
        val unlock: mutex -> unit
        val trylock: mutex -> bool
    end

    structure ConditionVar:
    sig
        type conditionVar
        val conditionVar: unit -> conditionVar
        val wait: conditionVar * Mutex.mutex -> unit
        val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
        val signal: conditionVar -> unit
        val broadcast: conditionVar -> unit
    end

end;

The Thread substructure

exception Thread of string
The Thread exception can be raised by various of the functions in the structure if they detect an error.
type thread
The type of a thread identifier.
datatype threadAttribute =
        EnableBroadcastInterrupt of bool
    |   InterruptState of interruptState
    |   MaximumMLStack of int option
and interruptState =
        InterruptDefer
   |    InterruptSynch
   |    InterruptAsynch
   |    InterruptAsynchOnce
The type of a thread attribute. Thread attributes are properties of the thread that are set initially when the thread is created but can subsequently be modified by the thread itself. The thread attribute type may be extended in the future to include things like scheduling priority. The current thread attributes control the way interrupt exceptions are delivered to the thread.
EnableBroadcastInterrupt controls whether the thread will receive an interrupt sent using broadcastInterrupt or as a result of pressing the console interrupt key. If this is false the thread will not receive them. The default for a new thread if this is not specified is false.

InterruptState controls when and whether interrupts are delivered to the thread. This includes broadcast interrupts and also interrupts directed at a specific thread with the interrupt call. InterruptDefer means the thread will not receive any interrupts. However, if the thread has previously been interrupted the interrupt may be delivered when the thread calls setAttributes to change its interrupt state. InterruptSynch means interrupts are delivered synchronously. An interrupt will be delayed until an interruption point. An interruption point is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil and various library calls that may block, such as IO calls, pause etc. N.B. Mutex.lock is not an interruption point even though it can result in a thread blocking for an indefinite period. InterruptAsynch means interrupts are delivered asynchronously i.e. at a suitable point soon after they are triggered. InterruptAsynchOnce means that only a single interrupt is delivered asynchronously after which the interrupt state is changed to InterruptSynch. It allows a thread to tidy up and if necessary indicate that it has been interrupted without the risk of a second asynchronous interrupt occurring in the handler for the first interrupt. If this attribute is not specified when a thread is created the default is InterruptSynch.

MaximumMLStack was added in version 5.5.3. It controls the maximum size the ML stack may grow to. It is an option type where NONE allows the stack to grow to the limit of the available memory whereas SOME n limits the stack to n words. This is approximate since there is some rounding involved. When the limit is reached the thread is sent an Interrupt exception.

      
val fork: (unit->unit) * threadAttribute list -> thread
Fork a thread. Starts a new thread running the function argument. The attribute list gives initial values for thread attributes which can be modified by the thread itself. Any unspecified attributes take default values. The thread is terminated when the thread function returns, if it raises an uncaught exception or if it calls "exit".
val exit: unit -> unit
Terminate this thread.
val isActive: thread -> bool
Test if a thread is still running or has terminated.
val equal: thread * thread -> bool
Test whether thread values denote the same thread.
val self: unit -> thread
Return the thread identifier for the current thread.
exception Interrupt = SML90.Interrupt
The Interrupt exception can be generated in another thread either by a broadcast to all threads or directed to a single thread.
val interrupt: thread -> unit
Send an Interrupt exception to a specific thread. When and indeed whether the exception is actually delivered will depend on the interrupt state of the target thread.
val broadcastInterrupt: unit -> unit
Send an interrupt exception to every thread which is set to accept it.
val testInterrupt: unit -> unit
If this thread is handling interrupts synchronously, test to see if it has been interrupted. If so it raises the Interrupt exception.
val kill: thread -> unit
Terminate a thread. This should be used as a last resort. Normally a thread should be allowed to clean up and terminate by using the interrupt function. Raises Thread if the thread is no longer running, so an exception handler should be used unless the thread is known to be blocked.
val getLocal: 'a Universal.tas -> 'a option
val setLocal: 'a Universal.tag * 'a -> unit
Get and set thread-local store for the calling thread. The store is a tagged associative memory which is initially empty for a new thread. A thread can call setLocal to add or replace items in its store and call getLocal to return values if they exist. The Universal structure contains functions to make new tags as well as injection, projection and test functions.
val setAttributes: threadAttribute list -> unit
Thread attributes are initially set when the thread is forked but can be changed by thread itself using this call. Unspecified attributes remain unchanged.
val getAttributes:  unit -> threadAttribute list
Get the values of attributes for the current thread.
val numProcessors: unit -> int
Return the number of processors configured on the machine.

The Mutex substructure

A mutex provides simple mutual exclusion. A thread can lock a mutex and until it unlocks it no other thread will be able to lock it. Locking and unlocking are intended to be fast in the situation when there is no other process attempting to lock the mutex. Mutexes are non-recursive: if a thread tries to lock a mutex that it has already locked it will deadlock. Note: a thread should never attempt to lock or unlock a mutex if it may receive an asynchronous interrupt. It should always set its interrupt state to either InterruptDefer or InterruptSynch before calling these functions. An asynchronous interrupt may leave the mutex in an indeterminate state.

type mutex
The type of a mutex
val mutex: unit -> mutex
Create a new mutex.
val lock: mutex -> unit
Lock a mutex. If the mutex is currently locked the thread is blocked until it is unlocked. If a thread tries to lock a mutex that it has previously locked the thread will deadlock. N.B. "lock" is not an interruption point (a point where synchronous interrupts are delivered) even though a thread can be blocked indefinitely. If the thread attempting to lock the mutex is handling interrupts asynchronously an asynchronous interrupt may be delivered before or after the lock is taken.
val unlock: mutex -> unit
Unlock a mutex and allow any waiting threads to run. The behaviour if the mutex was not previously locked by the calling thread is undefined.
val trylock: mutex -> bool
Attempt to lock the mutex without blocking. Returns true if the mutex was not previously locked and has now been locked by the calling thread. Returns false if the mutex was previously locked, including by the calling thread.

The ConditionVar substructure

Condition variables. Condition variables are used to provide communication between threads. A condition variable is used in conjunction with a mutex and usually a reference to establish and test changes in state. The normal use is for one thread to lock a mutex, test the reference and then wait on the condition variable, releasing the lock on the mutex while it does so. Another thread may then lock the mutex, update the reference, unlock the mutex, and signal the condition variable. This wakes up the first thread and reacquires the lock allowing the thread to test the updated reference with the lock held. More complex communication mechanisms, such as blocking channels, can be written in terms of condition variables.

type conditionVar
The type of a condition variable
val conditionVar: unit -> conditionVar
Make a new condition variable.
val wait: conditionVar * Mutex.mutex -> unit

Release the mutex and block until the condition variable is signalled. When wait returns the mutex will have been re-acquired.

If the thread is handling interrupts synchronously this function can be interrupted using the "Thread.interrupt" function or, if the thread is set to accept broadcast interrupts, "Thread.broadcastInterrupt". The thread will re-acquire the mutex before the exception is delivered. An exception will only be delivered in this case if the interrupt is sent before the condition variable is signalled. If the interrupt is sent after the condition variable is signalled the function will return normally even if it has not yet re-acquired the mutex. The interrupt state will be delivered on the next call to "wait", "Thread.testInterrupt" or other blocking call.

A thread should never call this function if it may receive an asynchronous interrupt. It should always set its interrupt state to either InterruptSynch or InterruptDefer beforehand. An asynchronous interrupt may leave the condition variable and the mutex in an indeterminate state and could lead to deadlock.

A condition variable should only be associated with one mutex at a time. All the threads waiting on a condition variable should pass the same mutex as argument.

val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
As wait except that it blocks until either the condition variable is signalled or the time (absolute) is reached. Either way the mutex is reacquired so there may be a further delay if it is held by another thread. Returns true if the condition variable had been signalled and false if the time had expired before the variable was signalled.
val signal: conditionVar -> unit
Wake up one thread if any are waiting on the condition variable. If there are several threads waiting for the condition variable one will be selected to run and will run as soon as it has re-acquired the lock.
val broadcast: conditionVar -> unit
Wake up all threads waiting on the condition variable.