/* Implementation exercise 5: Flow control An Engineering Approach to Computer Networking S. Keshav */ /* * This template is a superset of the template for ecn_error.c. * The portions missing in ecn_error.c are also deleted here. * So, they must be solved in the proper order. */ #include "../kernel/real.h" #include #include #include #include #define MWS 100 /* Max error control window size */ #define ALPHA 0.75 /* for round trip time estimation */ #define PKT_DATA_SIZE 250 extern int check_hamming(); /* implemented in ecn_error.c */ extern unsigned char hamming[]; /* defined in ecn_error.c */ ecn_flow() { PKT_PTR pkt, p; PKT_PTR save_pkt, timeout_pkt; int node, num_pkts_sent = 0, ack = 0, line_busy = 0; ident destn, sender, sink; long key; timev now, diff; int read_file, write_file; int seq_no = 0; int last_ack = -1; /* largest cumulative ack seen so * far */ char seen[MWS]; /* bit map of pkts in seq. */ PKT_PTR save[MWS]; /* saves transmitted packets */ unsigned char *r_save[MWS]; /* saves received packets */ int r_save_size[MWS]; /* size of saved packets */ int last_in_seq = -1; /* last in sequence packet at rcv */ int last_sent = -1; /* largest sequence number sent */ int final_ack = -1; /* ack for the last data pkt sent */ char no_more_pkts = 0; /* true of no more data to send */ int num_dup_acks = 0; /* number of duplicate acks seen so far */ int cur_window = 10; /* current flow control window size */ int win_count = 0; /* counts acks for linear increase */ int ssthresh = 5; /* slow start threshold */ int transmission_id = 0; /* unique id per packet */ char corrupted; unsigned char buffer[PKT_DATA_SIZE];/* temporary storage */ unsigned char data, *r_buf; float tao, rtt_est = 1.0; float timeout = 1.0; int i,j, num, size; struct stat buf; char nothing_sent; node = get_node_id(); /* find out local id */ now = runtime(); source_node_type[node] = TEMPLATE; sink = assigned_sink[node]; /* find out sink (from .l file ) */ abs_advance(node_start_time[node]); /* advance time to start time */ printf("Error and flow controled node: %d ", get_node_id()); printf("--> %d, start (%d, %d)\n", sink, node_start_time[node].tv_sec, node_start_time[node].tv_usec); /* XXX Initialization of an array used by the receiver */ /* open files for reading or writing */ if (node is 1) { if ((read_file = open("../sources/ecn_flow.c", O_RDONLY)) is - 1) { perror("open for read"); pr_error("error opening file for read"); } } else { /* if output file exists, first delete it */ if (stat("/tmp/tnew", &buf) isnt - 1) unlink("/tmp/tnew"); if ((write_file = open("/tmp/tnew", O_RDWR | O_CREAT, 777)) is - 1) { perror("open for write"); pr_error("error opening file for write"); } } goto test; for (ever) { recv: sender = recvm(&destn, &key, &pkt); now = runtime(); switch (pkt->type) { case ACK: /* on an ack, update notion of last_ack, and free saved packets */ printf("%f: Node %d got an ack, seq %d last ack %d\n", make_float(now), node, pkt->seq_no, last_ack); /* if not a duplicate ack, increase window size */ if(pkt->seq_no > last_ack) { printf(" cur_window %d, win_count %d, ssthresh %d\n", cur_window, win_count, ssthresh); num_dup_acks = 0; /* XXX Implement the JK window increase algorithm here */ } else { /* duplicate ack */ num_dup_acks ++; if(/* XXX condition for retransmission */1) { printf("enqueueing pkt %d on fast retransmission\n", last_ack + 1); /* XXX Actions on fast retransmission */ } } /* this creates a trace in the test subdirectory called win1 */ make_plot("win", cur_window); /* free the saved copies */ /* XXX How are copies saved? see the code after "test:" */ last_ack = pkt->seq_no; /* cumulative ack */ /* if something in retx. queue can be removed, do so */ num = num_in_q(node); for(i = 0; i < num; i++) { p = deq(node); if(p->seq_no > last_ack) enq(node, p); else free(p); } /* compute round trip time */ /* XXX Recall that pkt->gen_time has the generation time in a timeval structure */ /* update RTT and timeout estimate */ /* XXX Use exponential averaging, ALPHA is already defined */ /* if this is the last ack, send NO_MORE_DATA */ if(no_more_pkts and last_ack is final_ack) { pkt->type = NO_MORE_DATA; pkt->dest = 2; pkt->source = 1; enq(node, pkt); goto test; } else free(pkt); goto test; case DATA: /* receiver got data */ printf("%f: Node %d got data, seq %d\n", make_float(now), node, pkt->seq_no); /* only accept packets not already received */ /* What is the condition that a packet is not a duplicate? */ if(/*XXX*/ 1) { /* check for corrupted data */ corrupted = 0; r_buf = (unsigned char*) malloc(PKT_DATA_SIZE); for (i = 0; i < pkt->size; i += 2) { /* Call check_hamming here and test for an error return */ if (/* XXX */ 1) r_buf[i/2] = (data << 4); else { corrupted = 1; break; } /* Call check_hamming here */ if (/*XXX */ 1) r_buf[i/2] |= data; else { corrupted = 1; break; } } if (not corrupted) { int ack; /* save the data in a buffer */ r_save[pkt->seq_no % MWS] = r_buf; r_save_size[pkt->seq_no % MWS] = pkt->size/2; seen[pkt->seq_no % MWS] = 1; ack = last_in_seq; while (seen[(ack + 1) % MWS]) { last_in_seq = ack; ack++; write(write_file, r_save[ack % MWS], r_save_size[ack%MWS]); free(r_save[ack%MWS]); seen[ack % MWS] = 0; } last_in_seq = ack; } else { printf(" >>packet corrupted, and cant be fixed\n"); free(r_buf); } } /* always send an ack by changing the packet's params */ pkt->seq_no = last_in_seq; pkt->type = ACK; pkt->size = 0; pkt->source = node; pkt->dest = 1; printf(" sent ack, last_in_seq %d\n", pkt->seq_no); /* assume line to source is always free */ sendm(route[node][pkt->dest], 0, pkt); line_busy = 1; goto recv; case NO_MORE_DATA: /* receiver returns the packet */ if(node is 2){ close(write_file); printf("Receiver: file transferred successfully\n"); pkt->source = 2; pkt->dest = 1; sendm(route[node][pkt->dest],0,pkt); } else { /* sender terminates */ close(read_file); printf("Sender: file transferred successfully\n"); fflush(stdout); exit(); } goto recv; case INT: line_busy = 0; free(pkt); goto test; case TIMEOUT: /* Unlike ecn_error.c, we emulate a single timeout per * window */ if(pkt->id is transmission_id -1){ /* XXX enqueue the entire outstanding window in * high priority portion of retransmission queue */ timeout = 2*timeout; ssthresh = cur_window/2; cur_window = 1; win_count = 0; } goto test; default: free(pkt); pr_error("source received a pkt of unknown type"); } } test: if (node is 1) { num = num_in_q(node); /* if can send data, read it into a packet */ if (num is 0 and not no_more_pkts) { size = read(read_file, buffer, PKT_DATA_SIZE); if (size > 0) { pkt = (PKT_PTR) malloc(sizeof(PKT)); pkt->type = DATA; pkt->dest = 2; pkt->source = 1; pkt->seq_no = seq_no ++; /* encode with hamming code */ for (i = 0; i < size; i++) { /* XXX Convert the bytes in buffer to encoded bytes in packet. Remember that hamming[] operates in 4 bit quantities */ } pkt->size = 2 * size; /* if last piece of data, set flag */ if(size < PKT_DATA_SIZE){ final_ack = seq_no - 1; no_more_pkts = 1; } enq(node, pkt); } if (size < 0) pr_error("error on read"); /* previous packet was last */ if (pkt->size is 0){ final_ack = seq_no - 1; no_more_pkts = 1; } } num = num_in_q(node); /* send packet if line not busy, and within error * control window. Read each pkt in queue, and put it * back if it cant be sent */ if (num and not line_busy) nothing_sent = 1; for(j = 0; j < num; j++) { pkt = deq(node); if (/* XX condition to be able to send packets */1){ nothing_sent = 0; /* save a copy before transmission */ /* XXX Before you send the packet, save a copy in save[]. Remember, * just copying the pointer isnt enough: you have to copy the packet. * Use structure copy to make this easy */ pkt->gen_time = now; pkt->id = transmission_id ++; /* update last_sent */ if (pkt->seq_no > last_sent) last_sent = pkt->seq_no; line_busy = 1; /* set a timeout for this packet */ timeout_pkt = (PKT_PTR) malloc(sizeof(PKT)); timeout_pkt->seq_no = pkt->seq_no; timeout_pkt->type = TIMEOUT; timeout_pkt->id = pkt->id; timeout_pkt->source = timeout_pkt->dest = 1; set_timer(timeout, timeout_pkt); printf("%f: Node %d sent pkt, seq_no %d \n", make_float(now), node, pkt->seq_no); sendm(route[node][pkt->dest],0,pkt); } else enq(node, pkt); } } goto recv; }