eXept Software AG Logo

Smalltalk/X Webserver

Documentation of class 'SharedQueue':

Home

Documentation
www.exept.de
Everywhere
for:
[back]

Class: SharedQueue


Inheritance:

   Object
   |
   +--Collection
      |
      +--Queue
         |
         +--SharedQueue
            |
            +--UnlimitedSharedQueue

Package:
stx:libbasic2
Category:
Kernel-Processes
Version:
rev: 1.50 date: 2018/05/24 19:17:36
user: cg
file: SharedQueue.st directory: libbasic2
module: stx stc-classLibrary: libbasic2
Author:
Claus Gittinger

Description:


SharedQueues provide a safe mechanism for processes to communicate.
They are basically Queues, with added secure access to the internals,
allowing use from multiple processes (i.e. the access methods use
critical regions to protect against confusion due to a process
switch within a modification).

Also, sharedQueues can be used for synchronization, since a reading
process will be blocked when attempting to read an empty queue, while
a writer will be blocked when attempting to write into a full queue.
For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.

Be warned:
    if the reader process wants to add elements to the sharedqueue in its
    read-loop, the reader may block, if the queue is full.
    The reason is that the sharedQueues size is fixed, and any writer is blocked
    if the queue is full.
    For this situations, please use an UnlimitedSharedQueue, which grows in this
    particular situation.
    
See samples in doc/coding.


Related information:

    SharedCollection
    UnlimitedSharedQueue
    Queue
    Semaphore
    Process
    CodingExamples::SharedQueueExamples

Instance protocol:

accessing
o  remove: anElement ifAbsent: exceptionalValue
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue

o  removeAll
remove all elements in the queue; do not wait, but
synchronize access to the queue.
If the queue was full before, signal space-availability to writers.
This can be used to flush queues in multi-process applications,
when cleanup is required.

o  removeIdentical: anElement ifAbsent: exceptionalValue
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue

o  removeLast
return the last value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to
writers

accessing-internals
o  accessLock
return the critical access-semaphore which is used internally to synchronize access

o  readSemaphore
return the semaphore which is signalled when data is available
for reading.

o  readWaitWithTimeoutMs: ms
Return true if a timeout occurred (i.e. false, if data is available).

o  superNextPut: anObject
private; to allow subclasses to call the basic nextPut (w.o. synchronization)

o  superNextPutFirst: anObject
private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)

o  withAccessLockedDo: aBlock
evaluate aBlock while access via next/nextPut are blocked.

o  writeSemaphore
return the semaphore which is signalled when the queue has space
for writing.

accessing-reading
o  next
return the next value in the queue; if it its empty, wait 'til
something is put into the receiver.
When the datum has been removed, signal space-availability to
writers

o  nextIfEmpty: exceptionBlock
return the next value in the queue;
if it is empty do not wait, but return the value of exceptionBlock.
When a datum has been removed, signal space-availability to writers

o  nextOrNil
(comment from inherited method)
return the next value in the queue;
Return nil, if the queue is empty

o  nextWithTimeout: seconds
return the next value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to
writers.
Timeout after secondsIn seconds - answer nil if a timeout occurs.

o  peek

adding
o  nextPut: anObject
enter anObject to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.

o  nextPutFirst: anObject
insert anObject at the beginning of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)

enumerating
o  do: anObject
evaluate the argument, aBlock for each element in the queue

o  reverseDo: anObject
evaluate the argument, aBlock for each element in the queue

initialization
o  init: size
initialize the receiver for size entries

private
o  commonWriteWith: aBlock
common code for nextPut / nextPutFirst;
wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.


Examples:


|queues readers writers seqNumber accessLock accessLock2 numbersStillToReceive| seqNumber := 1. accessLock := Semaphore forMutualExclusion. accessLock2 := Semaphore forMutualExclusion. numbersStillToReceive := BooleanArray new:100000 withAll:true. queues := (1 to:10) collect:[:i | SharedQueue new]. readers := (1 to:10) collect:[:i | [ |num| 10000 timesRepeat:[ num := (queues at:i) next. accessLock2 critical:[ (numbersStillToReceive at:num) ifFalse:[ self halt:(num printString , ' received twice') ] ifTrue:[ numbersStillToReceive at:num put:false. ]. ]. 'num printCR.'. ]. ] fork ]. writers := (1 to:10) collect:[:i | [ |num| 10000 timesRepeat:[ accessLock critical:[ num := seqNumber. seqNumber := seqNumber + 1. ]. (queues at:i) nextPut:num. ] ] fork ]. readers do:[:aReader | aReader waitUntilTerminated]. ' any left ? '. (numbersStillToReceive includes:true) ifTrue:[ self halt:'oops - not all numbers received' ]

ST/X 7.1.0.0; WebServer 1.663 at exept.de:8081; Mon, 04 Aug 2025 14:21:35 GMT