(* Mini-CML. Andrew Appel, based on John Reppy *) signature CONCUR_ML = sig (** threads **) val spawn : (unit -> unit) -> unit val yield : unit -> unit val exit: unit -> 'a (** channels **) type 'a chan val channel : unit -> 'a chan (** events **) type 'a event val sync : 'a event -> 'a val choose : 'a event list -> 'a event val wrap : ('a event * ('a -> 'b)) -> 'b event val alwaysEvt : unit event val sendEvt : ('a chan * 'a) -> unit event val recvEvt : 'a chan -> 'a event end structure CML : CONCUR_ML = struct type 'a cont = 'a SMLofNJ.Cont.cont val callcc: ('a cont -> 'a) -> 'a = SMLofNJ.Cont.callcc val throw : 'a cont -> 'a -> 'b = SMLofNJ.Cont.throw datatype 'a queue = Q of {front: 'a list ref, rear: 'a list ref} fun queueNew () = Q{front = ref [], rear = ref []} fun queueIns (Q{rear, ...}) x = (rear := x :: !rear) exception EmptyQ fun queueRem (Q{front=ref[], rear=ref[]}) = raise EmptyQ | queueRem (Q{front as (ref []), rear as (ref l)}) = let val (x::r) = rev l in front := r; rear := []; x end | queueRem (Q{front as (ref(x::r)), ...}) = (front := r; x) (* the thread ready queue *) val rdyQ : unit cont queue = queueNew() val enqueue = queueIns rdyQ fun dispatch () = throw (queueRem rdyQ) () fun yield() = callcc(fn k => (enqueue k; dispatch())) (** Channels **) type 'a chanq = (bool ref * 'a) queue datatype 'a chan = CHAN of {inq : 'a cont chanq, outq : ('a * unit cont) chanq} (** Channel queue routines **) fun insert (q : 'a chanq, flg, item) = queueIns q (flg, item) fun remove (q : 'a chanq) = let val (flg, item) = queueRem q in flg := true; item end (* Clean a channel of satisfied transactions. We do this incrementally to give an amortized constant cost. Return true if the resulting queue is non-empty. *) fun clean(q as Q{front as ref((ref true,_)::rest),...})= (front := rest; clean q) | clean(q as Q{front as ref nil, rear=ref nil}) = false | clean(q as Q{front as ref nil, rear as ref r}) = (front := rev r; rear := nil; clean q) | clean _ = true fun channel() = CHAN{inq= queueNew(), outq= queueNew()} fun spawn f = callcc (fn parent_k => ( enqueue parent_k; f (); dispatch())) fun exit() = dispatch() (** Events **) type 'a base_evt = {pollfn : unit -> bool, dofn : unit -> 'a, blockfn : bool ref -> 'a} type 'a event = 'a base_evt list fun sublist p nil = nil | sublist p (x::r) = if p x then x :: sublist p r else sublist p r (* Generate index numbers for "non-deterministic" selection. We use a round-robin style policy. *) val cnt = ref 0 fun random i = let val j = !cnt in cnt := j+1; (j mod i) end exception Escape fun sync el = case sublist (fn{pollfn,...}=>pollfn()) el of [] => let val evtflg = ref false fun log[] = dispatch() | log({blockfn,pollfn,dofn}::r) = (blockfn evtflg) handle Escape => log r in log el end | [{dofn,...}] => dofn() | l => #dofn(List.nth(l, random(length l))) () fun wrap (el, f) = map (fn {pollfn, dofn, blockfn} => {pollfn=pollfn, dofn=(f o dofn), blockfn=(f o blockfn)}) el fun choose [] = [] | choose (evts :: el) = evts @ choose el (** Base events **) val alwaysEvt = [{pollfn = (fn () => true), dofn = (fn () => ()), blockfn = (fn _ => raise Escape)}] fun sendEvt (CHAN{inq, outq}, msg) = [{pollfn= fn()=>clean inq, dofn= fn()=>let val rkont = remove inq in callcc (fn k => ( enqueue k; throw rkont msg)) end, blockfn= fn flg => callcc (fn k => ( clean outq; insert(outq, flg, (msg, k)); raise Escape))}] fun recvEvt (CHAN{inq, outq}) = [{pollfn= fn () => clean outq, dofn= fn()=>let val (msg,skont) = remove outq in enqueue skont; msg end, blockfn= fn flg => callcc (fn k => ( clean inq; insert (inq, flg, k); raise Escape))}] end (* structure CML *)