256 Broadcast messaging and logical clocks
Videos
Logical clock
Each process participating in an algorithm has a logical clock lc and sends its current time with each message; it updates its current time with the timestamp of the message it receives using the following rules:
send some_channel(msg, lc); lc++;
receive my_channel(msg, ts); lc=max(lc, ts+1); lc++;
Distributed semaphores
With the help of broadcast messages and logical clocks we are able to build distributed semaphores.
Here a possible implementation in pseudo code
//Define a semaphore type – ACK used for helpers only
enum kind = {V, P, ACK }
//We create N channels so that each client can communicate with all others
// these channels will take the sender-ID, the kind and the timestamp
channel semop[1:N](int sender, int kind, int timestamp)
//...and we need also channels to do the communication between user and helper process
channel go[1:N](int timestamp)
// Code using a distributed semaphore
process User((int u=0; u < N; u++)) {
int ts, lc = 0; // timestamp and logical clock, respectively
// do some stuff
...
// execute a P operation, try to enter critical section
broadcast semop(u, P, lc); lc++;
receive go[u](ts); lc = max(lc, ts + 1); lc++; // get helper process active
// inside CS ....
// execute a V operation, leave CS
broadcast semop(u, V, lc); lc++;
...
...
}
And here the process that maintain the value of the semaphore locally
process Helper((int h=0; h < N; h++)) {
queue mq = new queue of (int, kind, int); // timestamp order
int lc = 0; // logical clock
int sem = initial value; // semaphore value
int sender, ts; // init in receive
kind mkind; // init in receive
while (true) {
receive semop[h](sender, mkind, ts); // listening for a msg on any (N) semop channels
lc = max(lc,ts+1); lc++;
if (mkind == P || mkind == V) {
mq.insert(sender, mkind, ts); // ordered by ts
broadcast semop(h, ACK, lc); lc++;
else { // mkind == ACK
// record that another ACK has been seen;
for_all (fully acknowledged V messages) {
mq.remove(); sem++;
}
for_all (fully acknowledged P messages) {
if (sem > 0) {
sender=mq.remove();
sem--;
if (sender == h) {
send go[h](lc); lc++; // this message releases on waiting (P()) process
}
}
}
}
}
}