public class SendBuffer { private Object[] buf; private int MAX_BUF; private int Nae = 0; /* Next Ack expected */ private int Lfs = 0; /* Last frame sent */ public SendBuffer (int size) { buf = new Object[size]; MAX_BUF = size; } /* return index position in our array for a given sequence number */ private int idx (int s) { return s%MAX_BUF; } // Note: Whenever control enters a synchronized method, the thread // that called the method locks the object whose method has been called. /* return sequence number of next packet in sequence */ public synchronized int getnextsequencenumber(Object d) { while (Lfs >= Nae+MAX_BUF) { // no more send-slots available // System.out.println("Waiting for next available send slot."); try { // not enough acks received. No packets available for sending. // wait until a new spot becomes available (note that this is // only for optimization purposes -- we could also return -1 // and have the caller poll repeatedly). wait(); /* release lock and wait until we're notified */ } catch (InterruptedException e) { } } buf[idx(Lfs)] = d; /* packet sent, no ACK yet received */ return Lfs++; } public synchronized void waituntilempty() { while (Lfs != Nae) { // System.out.println("Waiting for Buffer to empty: "+ show()); try { wait(); /* release lock and wait until we're notified */ } catch (InterruptedException e) { } } return; } /* received sequence number in ACK */ public synchronized void ackpacket (int s) { if ((s < Nae) || (s >= (Nae+MAX_BUF))) { // we don't care about out-of-range acks } else { int Nae_original = Nae; buf[idx(s)] = null; /* other side received our packet */ while ( (Nae < Lfs) && (buf[idx(Nae)] == null)) { // increment Nae Nae++; // System.out.println("Inc(Nae): " + show()); } // let threads that wait in getnextsequencenumber() know that // new sequence numbers have become available. if (Nae > Nae_original) { // System.out.println("New send slots available. Notifying."); notifyAll(); } } return; } public synchronized Object getpacket (int s) { if ((s >= (Nae+MAX_BUF)) || (s < Nae)) { return null; } else { return buf[idx(s)]; } } /* Optional: return buffer state as string representation */ public synchronized String show () { String status = ""; int Nae_idx = idx(Nae); for (int i = 0; i < MAX_BUF ; i++) { int offset = (i - Nae_idx+MAX_BUF)%MAX_BUF; status = status + ( ((i == Nae_idx) && (i == idx(Lfs))) ? "|" : ( (i == Nae_idx)? ">" : (i == idx(Lfs)? "<" : " ")) ) +i + ":"; int s = Nae+offset; if (buf[idx(i)] != null) { status = status + "["+s+"],"; } else { status = status + " "+s+", "; } } return status; } }