Skip to content

256 Broadcast messaging and logical clocks

Videos

Logical clock

Each process participating in an algorithm has a logical clock lc and sends its current time with each message; it updates its current time with the timestamp of the message it receives using the following rules:

send some_channel(msg, lc); lc++;

receive my_channel(msg, ts); lc=max(lc, ts+1); lc++;

Distributed semaphores

With the help of broadcast messages and logical clocks we are able to build distributed semaphores.

Here a possible implementation in pseudo code

//Define a semaphore type – ACK used for helpers only
enum kind = {V, P, ACK }

//We create N channels so that each client can communicate with all others
//  these channels will take the sender-ID, the kind and the timestamp
channel semop[1:N](int sender, int kind, int timestamp)

//...and we need also channels to do the communication between user and helper process
channel go[1:N](int timestamp)


// Code using a distributed semaphore
process User((int u=0; u < N; u++)) {
   int ts, lc = 0;  // timestamp and logical clock, respectively

   // do some stuff
   ...
   // execute a P operation, try to enter critical section
   broadcast semop(u, P, lc); lc++;
   receive go[u](ts); lc = max(lc, ts + 1); lc++;  // get helper process active

   // inside CS ....

   // execute a V operation, leave CS
   broadcast semop(u, V, lc); lc++;
   ...
   ...
}

And here the process that maintain the value of the semaphore locally

process Helper((int h=0; h < N; h++)) {
   queue mq = new queue of (int, kind, int);      // timestamp order
   int lc = 0;                                    // logical clock
   int sem = initial value;                       // semaphore value

   int sender, ts;                                // init in receive
   kind mkind;                                    // init in receive

   while (true) { 
      receive semop[h](sender, mkind, ts);        // listening for a msg on any (N) semop channels
      lc = max(lc,ts+1); lc++;
      if (mkind == P || mkind == V) {
         mq.insert(sender, mkind, ts);            // ordered by ts
         broadcast semop(h, ACK, lc); lc++;
      else {                                      // mkind == ACK
         // record that another ACK has been seen;
         for_all (fully acknowledged V messages) {
            mq.remove(); sem++;
         }
         for_all (fully acknowledged P messages) {
            if (sem > 0) {
               sender=mq.remove();
               sem--;
               if (sender == h) {
                  send go[h](lc); lc++; // this message releases on waiting (P()) process 
               }
            }
         }
      }
   }
}