#include "hosts.h" #include "msg.h" #include "online.h" #include "parse.h" #include #include #include #include #include #include extern char *f_host; // parsing data for hosts. extern struct host *conns[1024]; // connection data for each peer. extern int num_conns; // number of connected peers. extern int f; // maximum number of potentially faulty nodes. extern char *glob_name; // my name (not necessary here). extern uint32_t glob_replica_index; // my replica index. extern struct msg_log *mlog; struct msg_log *msg_reqs = NULL; // message requests received from a pre-prepare. extern struct msg_log *outstanding; // outstanding requests. extern struct msg_log *out_responses; // outstanding responses. uint32_t state_machine_state = 0; // initialize state machine state to 0. uint32_t view_next_seq_num = 1; uint32_t low_watermark = 0; uint32_t high_watermark = 300; uint32_t curr_view = 0; uint32_t accepted_req_num = 0; // Initialize host connections. void initialize() { init_hosts(f_host); printf("\n\n\n --- CONNECTIONS --- \n\n"); for (int i = 0; i < num_conns; i++) { printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port, conns[i]->h_port); } } // Start up peer. int main(int argc, char **argv) { srand(time(NULL)); parse(argc, argv); printf("Got host file: %s\n", f_host); initialize(); uint32_t request_num = 1; // Take requests from stdin: we want to poll on stdin and incoming messages. struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1)); int infd = fileno(stdin); pollfds[0].fd = infd; pollfds[0].events = POLLIN | POLLPRI; for (int i = 0; i < num_conns; i++) { pollfds[i + 1].fd = conns[i]->sockfd; pollfds[i + 1].events = POLLIN | POLLPRI; } printf("%d > ", request_num); fflush(stdout); struct timespec start_time; clock_gettime(CLOCK_REALTIME, &start_time); int timeout = 30; // this should be fixed for a view change. while (1) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (poll(pollfds, num_conns + 1, timeout) != 0) { if (pollfds[0].revents > 0) { // update timeouts. // handle stdin request for state change. char *str = NULL; size_t len; if (getline(&str, &len, stdin) != -1) { bool proper_request = false; struct msg *req = init_msg(); req->msg_type = 0; // request if (strncmp("ADD", str, strlen("ADD")) == 0) { char *argument = str + 4; uint32_t x = atoi(argument); req->req_message = 0; req->option_value = x; proper_request = true; } if (strncmp("SUB", str, strlen("SUB")) == 0) { char *argument = str + 4; uint32_t x = atoi(argument); req->req_message = 1; req->option_value = x; proper_request = true; } if (strncmp("MUL", str, strlen("MUL")) == 0) { char *argument = str + 4; uint32_t x = atoi(argument); req->req_message = 2; req->option_value = x; proper_request = true; } if (strncmp("GET", str, strlen("GET")) == 0) { req->req_message = 3; proper_request = true; } if (proper_request) { struct timespec creq_timestamp; clock_gettime(CLOCK_MONOTONIC, &creq_timestamp); req->timestamp = creq_timestamp.tv_sec; req->client_index = glob_replica_index; packet *p = parse_msg_as_packet(req); if (curr_view % (num_conns + 1) == glob_replica_index) { // case: we are the primary. handle_primary_recv_request(req, todo_key()); struct msg_log *out_msg = malloc(sizeof(struct msg_log)); out_msg->m = req; out_msg->m->seq = view_next_seq_num; out_msg->next = msg_reqs; msg_reqs = out_msg; view_next_seq_num += 1; } else { // case: we are a client sending to primary. send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p); } clean_packet(p); struct msg_log *ml = malloc(sizeof(struct msg_log)); struct msg *standing = malloc(sizeof(struct msg)); memcpy(standing, req, sizeof(struct msg)); standing->client_index = request_num; ml->m = standing; ml->m->client_index = request_num; ml->next = outstanding; outstanding = ml; request_num += 1; } } } for (int i = 0; i < num_conns; i++) { if (pollfds[i + 1].revents > 0) { // handle message from peer at this point. uint8_t msglen; socklen_t addr_len = sizeof(conns[i]->conn); recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, (struct sockaddr *)&(conns[i]->conn), &addr_len); uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, (struct sockaddr *)&(conns[i]->conn), &addr_len); uint8_t *new_data = malloc(msglen); memcpy(new_data, msg_buf + 1, msglen); free(msg_buf); packet *p = malloc(sizeof(packet)); p->is_timed_out = false; p->len = msglen; p->data = new_data; struct msg *message = parse_packet_as_msg(p); clean_packet(p); if (message->msg_type == 0) { // printf("\nRequest\n"); // request for message coming from a client // check that we are the primary if (curr_view % (num_conns + 1) == glob_replica_index) { handle_primary_recv_request(message, todo_key()); struct msg_log *out_msg = malloc(sizeof(struct msg_log)); out_msg->m = message; out_msg->m->seq = view_next_seq_num; out_msg->next = msg_reqs; msg_reqs = out_msg; view_next_seq_num += 1; } } else if (message->msg_type == 1) { // printf("\nPreprepare\n"); // pre-prepare message sent out // // 1. parse the pre-prepare message. // // 2. receive the piggybacked message. // // validate and accept pre-prepare message. // receive the piggybacked message. uint8_t msglen; socklen_t addr_len = sizeof(conns[i]->conn); recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, (struct sockaddr *)&(conns[i]->conn), &addr_len); uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, (struct sockaddr *)&(conns[i]->conn), &addr_len); uint8_t *new_data = malloc(msglen); memcpy(new_data, msg_buf + 1, msglen); free(msg_buf); packet *p = malloc(sizeof(packet)); p->is_timed_out = false; p->len = msglen; p->data = new_data; struct msg *piggyback = parse_packet_as_msg(p); uint32_t d = small_digest(piggyback); bool check1 = d == message->digest; bool check2 = validate_signature(message, todo_key()); bool check3 = message->view == curr_view; bool check4 = true; struct msg_log *m; for (m = mlog; m != NULL; m = m->next) { if (m->m->msg_type == 1 && m->m->view == message->view && m->m->seq == message->seq && m->m->digest != message->digest) { check4 = false; } } bool check5 = (low_watermark <= message->seq && message->seq <= high_watermark); if (check1 && check2 && check3 && check4 && check5) { struct msg_log *ml = malloc(sizeof(struct msg_log)); ml->m = message; ml->next = mlog; struct msg_log *ml2 = malloc(sizeof(struct msg_log)); ml2->m = piggyback; ml2->next = ml; mlog = ml2; struct msg_log *out_msg = malloc(sizeof(struct msg_log)); struct msg *cloned_msg = malloc(sizeof(struct msg)); memcpy(cloned_msg, piggyback, sizeof(struct msg)); cloned_msg->seq = message->seq; out_msg->m = cloned_msg; out_msg->next = msg_reqs; msg_reqs = out_msg; // now that the message log has been updated, we should broadcast // prepare messages. handle_send_prepare(message, todo_key()); struct msg_log *out; for (out = msg_reqs; out != NULL; out = out->next) { if (prepared(mlog, f, out->m, message->view, message->seq)) { handle_send_commit(out->m, message, todo_key()); } } } } else if (message->msg_type == 2) { // printf("\nPrepare\n"); // receive a prepare message. // 1. validate the prepare message. // 2. check if this prepare message completes the `prepared` for any // currently waiting message. // if it does, send out a commit! if (message->view == curr_view && low_watermark <= message->seq && message->seq <= high_watermark && validate_signature(message, todo_key())) { struct msg_log *out_msg = malloc(sizeof(struct msg_log)); out_msg->m = message; out_msg->next = mlog; mlog = out_msg; struct msg_log *out; for (out = msg_reqs; out != NULL; out = out->next) { if (prepared(mlog, f, out->m, message->view, message->seq)) { handle_send_commit(out->m, message, todo_key()); } } } } else if (message->msg_type == 3) { // printf("\nCommit\n"); // someone committed. // validate committed message. // struct msg_log *loop; // for (loop = mlog; loop != NULL; loop = loop->next) { // display_msg(loop->m); // fflush(stdout); // } if (message->view == curr_view && low_watermark <= message->seq && message->seq <= high_watermark && validate_signature(message, todo_key())) { struct msg_log *out_msg = malloc(sizeof(struct msg_log)); out_msg->m = message; out_msg->next = mlog; mlog = out_msg; struct msg_log *out; uint32_t lowest_seq = UINT32_MAX; struct msg_log *prev_to_curr; struct msg_log *curr; struct msg *curr_handle; bool valid = false; for (out = msg_reqs; out != NULL; out = out->next) { if (out->next != NULL && committed_local(mlog, f, out->next->m, message->view, message->seq)) { if (out->next->m->seq < lowest_seq) { lowest_seq = out->next->m->seq; curr_handle = out->next->m; prev_to_curr = out; curr = out->next; valid = true; } // [HERE] check if all reqs w lower sequence numbers have been // established. } } if (msg_reqs != NULL && committed_local(mlog, f, msg_reqs->m, message->view, message->seq)) { if (msg_reqs->m->seq < lowest_seq) { lowest_seq = msg_reqs->m->seq; curr_handle = msg_reqs->m; prev_to_curr = NULL; curr = msg_reqs; valid = true; } } if (valid == true && prev_to_curr != NULL && curr != NULL) { printf("\nTest1\n"); prev_to_curr->next = curr->next; printf("\nTest2\n"); } else { if (valid == true && msg_reqs != NULL) { msg_reqs = msg_reqs->next; } } if (valid) { handle_operation(curr_handle, todo_key()); } } } else if (message->msg_type == 5) { // printf("\nResponse\n"); if (validate_signature(message, todo_key())) { struct msg_log *out_msg = malloc(sizeof(struct msg_log)); out_msg->m = message; out_msg->next = out_responses; out_responses = out_msg; } struct msg_log *out; uint32_t resp = message->option_value; uint8_t count = 1; for (out = out_responses; out != NULL; out = out->next) { if (out->m->option_value == resp && out->m->timestamp == message->timestamp) { count += 1; } } uint32_t req_num = 0; if (count > f) { struct msg_log *out; struct msg_log *curr; for (out = outstanding; out != NULL; out = out->next) { curr = out->next; if (curr != NULL && curr->m->timestamp == message->timestamp) { out->next = curr->next; req_num = curr->m->client_index; clean_msg(curr->m); free(curr); } } if (outstanding != NULL && outstanding->m->timestamp == message->timestamp) { req_num = outstanding->m->client_index; outstanding = outstanding->next; } if (req_num > accepted_req_num) { struct msg_log *loop; // for (loop = mlog; loop != NULL; loop = loop->next) { // display_msg(loop->m); // fflush(stdout); // } printf("\nRESP [%d] > %d \n", req_num, resp); fflush(stdout); printf("%d > ", request_num); fflush(stdout); accepted_req_num = req_num; } } } } } } else { // handle view change. } } // stdin msg -> forward to primary (check if it is us), start online phase // other messages: keep message log, and validate as we proceed }