(* Copyright 1999, 2000 Andrew W. Appel. *) structure TextIO=TextIO (* Need this here to work around a CML bug *) signature COUNTER = sig type counter val counter : int -> counter val inc : counter * int -> unit val get : counter -> int end structure Counter : COUNTER = struct open CML type counter = int chan * int chan fun inc ((inch, outch),i) = send(inch,i) fun get (inch,outch) = recv outch fun counter init = let val inch = channel() val outch = channel() fun loop (n: int) = select[wrap(recvEvt inch, fn i => loop(n+i)), wrap(sendEvt(outch, n), fn()=> loop n)] in spawnc loop init; (inch, outch) end end (* First interface for concurrent-read, exclusive write. Not used by the test application below. *) signature CRXW = sig type crxw_lock val crxwLock : unit -> crxw_lock val beginRead: crxw_lock -> unit val endRead: crxw_lock -> unit val beginWrite: crxw_lock -> unit val endWrite: crxw_lock -> unit end (* First implementation for concurrent-read, exclusive write. *) structure CRXW : CRXW = struct open CML datatype crxw_lock = LOCK of {bRead: unit chan, eRead: unit chan, bWrite: unit chan, eWrite: unit chan, writersWaiting: Counter.counter} fun crxwLock() = let val bRead=channel() val eRead=channel() val bWrite=channel() val eWrite=channel() val writersWaiting = Counter.counter 0 fun idle () = select [wrap (recvEvt bRead, fn()=> reading 1), wrap (recvEvt bWrite, fn()=> writing())] and reading 0 = idle() | reading n = if Counter.get writersWaiting > 0 then readingEnd n else select[wrap(recvEvt bRead, fn()=> reading (n+1)), wrap(recvEvt eRead, fn()=> reading(n-1))] and readingEnd 0 = idle() | readingEnd n = (recv eRead; readingEnd(n-1)) and writing() = (recv eWrite; idle()) in LOCK{bRead=bRead, eRead=eRead, bWrite=bWrite, eWrite=eWrite, writersWaiting=writersWaiting} end fun beginRead (LOCK{bRead,...}) = send(bRead,()) fun endRead (LOCK{eRead,...}) = send(eRead,()) fun beginWrite (LOCK{bWrite,writersWaiting,...}) = (Counter.inc(writersWaiting,1); send(bWrite,()); Counter.inc(writersWaiting, ~1)) fun endWrite (LOCK{eWrite,...}) = send(eWrite,()) end (* Alternate interface for concurrent-read, exclusive write *) signature CRXW2 = sig type crxw_lock val crxwLock : unit -> crxw_lock val beginRead: crxw_lock -> unit CML.event val endRead: crxw_lock -> unit CML.event val beginWrite: crxw_lock -> unit CML.event val endWrite: crxw_lock -> unit CML.event end structure CRXW2 : CRXW2 = struct open CML datatype crxw_lock = LOCK of {bRead: unit chan, eRead: unit chan, bWrite: unit chan, eWrite: unit chan, writersWaiting: Counter.counter} fun crxwLock() = let val bRead=channel() val eRead=channel() val bWrite=channel() val eWrite=channel() val writersWaiting = Counter.counter 0 fun when x y = if x then y else never fun loop readers = (print ("Status: " ^ Int.toString readers ^ " readers, " ^ Int.toString(Counter.get writersWaiting) ^ " writers waiting\n"); select[ when (Counter.get writersWaiting = 0) (wrap(recvEvt bRead, fn()=> loop (readers+1))), wrap(recvEvt eRead, fn()=> loop(readers-1)), when (readers = 0) (wrap (recvEvt bWrite, fn()=> (recv eWrite; loop 0)))] ) in spawnc loop 0; LOCK{bRead=bRead, eRead=eRead, bWrite=bWrite, eWrite=eWrite, writersWaiting=writersWaiting} end fun beginRead (LOCK{bRead,...}) = sendEvt(bRead,()) fun endRead (LOCK{eRead,...}) = sendEvt(eRead,()) fun beginWrite (LOCK{bWrite,writersWaiting,...}) = withNack(fn nack => (Counter.inc(writersWaiting,1); spawn(fn()=>(sync nack; Counter.inc(writersWaiting, ~1))); wrap (sendEvt(bWrite,()), fn () => Counter.inc(writersWaiting, ~1)))) fun endWrite (LOCK{eWrite,...}) = sendEvt(eWrite,()) end structure Test = struct fun main() = let open CML local val randCh : int chan = channel() fun loop s = (send(randCh, s mod 10); loop((s+37) mod 61)) val _ = spawnc loop 0 in fun random() = recv randCh end fun repeat 0 f x = () | repeat n f x = (f x; repeat (n-1) f x) fun delay() = repeat (10 * random()) yield () val lock = CRXW2.crxwLock() fun reader name = (delay(); print (name ^ ": Ready to read\n"); sync (CRXW2.beginRead lock); print (name ^ ": Reading\n"); delay(); sync (CRXW2.endRead lock); print (name ^ ": Done reading\n"); reader name) fun writer name = while true do (delay(); delay(); print (name ^ ": Ready to write\n"); sync (CRXW2.beginWrite lock); print (name ^ ": Writing\n"); delay(); sync (CRXW2.endWrite lock); print (name ^ ": Done writing\n")) val threads = map (spawnc reader) ["A","B","C","D","E"] @ map (spawnc writer) ["X","Y"] in app (sync o joinEvt) threads end fun go() = RunCML.doit(main,NONE) end