438 lines
15 KiB
C
438 lines
15 KiB
C
#include "hosts.h"
|
|
#include "msg.h"
|
|
#include "online.h"
|
|
#include "parse.h"
|
|
#include <poll.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <time.h>
|
|
#include <unistd.h>
|
|
|
|
extern char *f_host; // parsing data for hosts.
|
|
|
|
extern struct host *conns[1024]; // connection data for each peer.
|
|
|
|
extern int num_conns; // number of connected peers.
|
|
|
|
extern int f; // maximum number of potentially faulty nodes.
|
|
|
|
extern char *glob_name; // my name (not necessary here).
|
|
|
|
extern uint32_t glob_replica_index; // my replica index.
|
|
|
|
extern struct msg_log *mlog;
|
|
|
|
struct msg_log *msg_reqs =
|
|
NULL; // message requests received from a pre-prepare.
|
|
|
|
extern struct msg_log *outstanding; // outstanding requests.
|
|
extern struct msg_log *out_responses; // outstanding responses.
|
|
|
|
uint32_t state_machine_state = 0; // initialize state machine state to 0.
|
|
|
|
uint32_t view_next_seq_num = 1;
|
|
|
|
uint32_t low_watermark = 0;
|
|
|
|
uint32_t high_watermark = 300;
|
|
|
|
uint32_t curr_view = 0;
|
|
|
|
uint32_t accepted_req_num = 0;
|
|
|
|
// Initialize host connections.
|
|
void initialize() {
|
|
init_hosts(f_host);
|
|
|
|
printf("\n\n\n --- CONNECTIONS --- \n\n");
|
|
for (int i = 0; i < num_conns; i++) {
|
|
printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port,
|
|
conns[i]->h_port);
|
|
}
|
|
}
|
|
|
|
// Start up peer.
|
|
int main(int argc, char **argv) {
|
|
srand(time(NULL));
|
|
parse(argc, argv);
|
|
|
|
printf("Got host file: %s\n", f_host);
|
|
|
|
initialize();
|
|
|
|
uint32_t request_num = 1;
|
|
|
|
// Take requests from stdin: we want to poll on stdin and incoming messages.
|
|
|
|
struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1));
|
|
|
|
int infd = fileno(stdin);
|
|
|
|
pollfds[0].fd = infd;
|
|
pollfds[0].events = POLLIN | POLLPRI;
|
|
|
|
for (int i = 0; i < num_conns; i++) {
|
|
pollfds[i + 1].fd = conns[i]->sockfd;
|
|
pollfds[i + 1].events = POLLIN | POLLPRI;
|
|
}
|
|
|
|
printf("%d > ", request_num);
|
|
fflush(stdout);
|
|
|
|
struct timespec start_time;
|
|
clock_gettime(CLOCK_REALTIME, &start_time);
|
|
|
|
int timeout = 30; // this should be fixed for a view change.
|
|
|
|
while (1) {
|
|
|
|
struct timespec now;
|
|
clock_gettime(CLOCK_MONOTONIC, &now);
|
|
if (poll(pollfds, num_conns + 1, timeout) != 0) {
|
|
if (pollfds[0].revents > 0) {
|
|
// update timeouts.
|
|
|
|
// handle stdin request for state change.
|
|
char *str = NULL;
|
|
size_t len;
|
|
if (getline(&str, &len, stdin) != -1) {
|
|
bool proper_request = false;
|
|
struct msg *req = init_msg();
|
|
req->msg_type = 0; // request
|
|
if (strncmp("ADD", str, strlen("ADD")) == 0) {
|
|
char *argument = str + 4;
|
|
uint32_t x = atoi(argument);
|
|
req->req_message = 0;
|
|
req->option_value = x;
|
|
proper_request = true;
|
|
}
|
|
if (strncmp("SUB", str, strlen("SUB")) == 0) {
|
|
char *argument = str + 4;
|
|
uint32_t x = atoi(argument);
|
|
req->req_message = 1;
|
|
req->option_value = x;
|
|
proper_request = true;
|
|
}
|
|
if (strncmp("MUL", str, strlen("MUL")) == 0) {
|
|
char *argument = str + 4;
|
|
uint32_t x = atoi(argument);
|
|
req->req_message = 2;
|
|
req->option_value = x;
|
|
proper_request = true;
|
|
}
|
|
if (strncmp("GET", str, strlen("GET")) == 0) {
|
|
req->req_message = 3;
|
|
proper_request = true;
|
|
}
|
|
|
|
if (proper_request) {
|
|
struct timespec creq_timestamp;
|
|
clock_gettime(CLOCK_MONOTONIC, &creq_timestamp);
|
|
req->timestamp = creq_timestamp.tv_sec;
|
|
req->client_index = glob_replica_index;
|
|
packet *p = parse_msg_as_packet(req);
|
|
if (curr_view % (num_conns + 1) == glob_replica_index) {
|
|
// case: we are the primary.
|
|
handle_primary_recv_request(req, todo_key());
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
out_msg->m = req;
|
|
out_msg->m->seq = view_next_seq_num;
|
|
out_msg->next = msg_reqs;
|
|
msg_reqs = out_msg;
|
|
view_next_seq_num += 1;
|
|
} else {
|
|
// case: we are a client sending to primary.
|
|
send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p);
|
|
}
|
|
clean_packet(p);
|
|
struct msg_log *ml = malloc(sizeof(struct msg_log));
|
|
struct msg *standing = malloc(sizeof(struct msg));
|
|
memcpy(standing, req, sizeof(struct msg));
|
|
standing->client_index = request_num;
|
|
ml->m = standing;
|
|
ml->m->client_index = request_num;
|
|
ml->next = outstanding;
|
|
outstanding = ml;
|
|
request_num += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < num_conns; i++) {
|
|
if (pollfds[i + 1].revents > 0) {
|
|
// handle message from peer at this point.
|
|
uint8_t msglen;
|
|
socklen_t addr_len = sizeof(conns[i]->conn);
|
|
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
|
|
(struct sockaddr *)&(conns[i]->conn), &addr_len);
|
|
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
|
|
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
|
|
(struct sockaddr *)&(conns[i]->conn), &addr_len);
|
|
|
|
uint8_t *new_data = malloc(msglen);
|
|
memcpy(new_data, msg_buf + 1, msglen);
|
|
free(msg_buf);
|
|
packet *p = malloc(sizeof(packet));
|
|
p->is_timed_out = false;
|
|
p->len = msglen;
|
|
p->data = new_data;
|
|
struct msg *message = parse_packet_as_msg(p);
|
|
clean_packet(p);
|
|
|
|
if (message->msg_type == 0) {
|
|
// printf("\nRequest\n");
|
|
// request for message coming from a client
|
|
// check that we are the primary
|
|
if (curr_view % (num_conns + 1) == glob_replica_index) {
|
|
handle_primary_recv_request(message, todo_key());
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
out_msg->m = message;
|
|
out_msg->m->seq = view_next_seq_num;
|
|
out_msg->next = msg_reqs;
|
|
msg_reqs = out_msg;
|
|
view_next_seq_num += 1;
|
|
}
|
|
} else if (message->msg_type == 1) {
|
|
// printf("\nPreprepare\n");
|
|
// pre-prepare message sent out
|
|
//
|
|
// 1. parse the pre-prepare message.
|
|
//
|
|
// 2. receive the piggybacked message.
|
|
//
|
|
// validate and accept pre-prepare message.
|
|
|
|
// receive the piggybacked message.
|
|
uint8_t msglen;
|
|
socklen_t addr_len = sizeof(conns[i]->conn);
|
|
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
|
|
(struct sockaddr *)&(conns[i]->conn), &addr_len);
|
|
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
|
|
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
|
|
(struct sockaddr *)&(conns[i]->conn), &addr_len);
|
|
|
|
uint8_t *new_data = malloc(msglen);
|
|
memcpy(new_data, msg_buf + 1, msglen);
|
|
free(msg_buf);
|
|
packet *p = malloc(sizeof(packet));
|
|
p->is_timed_out = false;
|
|
p->len = msglen;
|
|
p->data = new_data;
|
|
struct msg *piggyback = parse_packet_as_msg(p);
|
|
|
|
uint32_t d = small_digest(piggyback);
|
|
|
|
bool check1 = d == message->digest;
|
|
bool check2 = validate_signature(message, todo_key());
|
|
bool check3 = message->view == curr_view;
|
|
bool check4 = true;
|
|
struct msg_log *m;
|
|
for (m = mlog; m != NULL; m = m->next) {
|
|
if (m->m->msg_type == 1 && m->m->view == message->view &&
|
|
m->m->seq == message->seq &&
|
|
m->m->digest != message->digest) {
|
|
check4 = false;
|
|
}
|
|
}
|
|
bool check5 = (low_watermark <= message->seq &&
|
|
message->seq <= high_watermark);
|
|
if (check1 && check2 && check3 && check4 && check5) {
|
|
struct msg_log *ml = malloc(sizeof(struct msg_log));
|
|
ml->m = message;
|
|
ml->next = mlog;
|
|
struct msg_log *ml2 = malloc(sizeof(struct msg_log));
|
|
ml2->m = piggyback;
|
|
ml2->next = ml;
|
|
mlog = ml2;
|
|
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
struct msg *cloned_msg = malloc(sizeof(struct msg));
|
|
memcpy(cloned_msg, piggyback, sizeof(struct msg));
|
|
cloned_msg->seq = message->seq;
|
|
out_msg->m = cloned_msg;
|
|
out_msg->next = msg_reqs;
|
|
msg_reqs = out_msg;
|
|
|
|
// now that the message log has been updated, we should broadcast
|
|
// prepare messages.
|
|
handle_send_prepare(message, todo_key());
|
|
|
|
struct msg_log *out;
|
|
|
|
for (out = msg_reqs; out != NULL; out = out->next) {
|
|
if (prepared(mlog, f, out->m, message->view, message->seq)) {
|
|
handle_send_commit(out->m, message, todo_key());
|
|
}
|
|
}
|
|
}
|
|
} else if (message->msg_type == 2) {
|
|
// printf("\nPrepare\n");
|
|
// receive a prepare message.
|
|
// 1. validate the prepare message.
|
|
// 2. check if this prepare message completes the `prepared` for any
|
|
// currently waiting message.
|
|
// if it does, send out a commit!
|
|
if (message->view == curr_view && low_watermark <= message->seq &&
|
|
message->seq <= high_watermark &&
|
|
validate_signature(message, todo_key())) {
|
|
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
out_msg->m = message;
|
|
out_msg->next = mlog;
|
|
mlog = out_msg;
|
|
|
|
struct msg_log *out;
|
|
|
|
for (out = msg_reqs; out != NULL; out = out->next) {
|
|
if (prepared(mlog, f, out->m, message->view, message->seq)) {
|
|
handle_send_commit(out->m, message, todo_key());
|
|
}
|
|
}
|
|
}
|
|
} else if (message->msg_type == 3) {
|
|
// printf("\nCommit\n");
|
|
// someone committed.
|
|
// validate committed message.
|
|
|
|
// struct msg_log *loop;
|
|
// for (loop = mlog; loop != NULL; loop = loop->next) {
|
|
// display_msg(loop->m);
|
|
// fflush(stdout);
|
|
// }
|
|
|
|
if (message->view == curr_view && low_watermark <= message->seq &&
|
|
message->seq <= high_watermark &&
|
|
validate_signature(message, todo_key())) {
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
out_msg->m = message;
|
|
out_msg->next = mlog;
|
|
mlog = out_msg;
|
|
|
|
struct msg_log *out;
|
|
|
|
uint32_t lowest_seq = UINT32_MAX;
|
|
|
|
struct msg_log *prev_to_curr;
|
|
struct msg_log *curr;
|
|
struct msg *curr_handle;
|
|
|
|
bool valid = false;
|
|
|
|
for (out = msg_reqs; out != NULL; out = out->next) {
|
|
if (out->next != NULL &&
|
|
committed_local(mlog, f, out->next->m, message->view,
|
|
message->seq)) {
|
|
|
|
if (out->next->m->seq < lowest_seq) {
|
|
lowest_seq = out->next->m->seq;
|
|
curr_handle = out->next->m;
|
|
prev_to_curr = out;
|
|
curr = out->next;
|
|
valid = true;
|
|
}
|
|
// [HERE] check if all reqs w lower sequence numbers have been
|
|
// established.
|
|
}
|
|
}
|
|
|
|
if (msg_reqs != NULL &&
|
|
committed_local(mlog, f, msg_reqs->m, message->view,
|
|
message->seq)) {
|
|
if (msg_reqs->m->seq < lowest_seq) {
|
|
lowest_seq = msg_reqs->m->seq;
|
|
curr_handle = msg_reqs->m;
|
|
prev_to_curr = NULL;
|
|
curr = msg_reqs;
|
|
valid = true;
|
|
}
|
|
}
|
|
|
|
if (valid == true && prev_to_curr != NULL && curr != NULL) {
|
|
printf("\nTest1\n");
|
|
prev_to_curr->next = curr->next;
|
|
printf("\nTest2\n");
|
|
} else {
|
|
if (valid == true && msg_reqs != NULL) {
|
|
msg_reqs = msg_reqs->next;
|
|
}
|
|
}
|
|
|
|
if (valid) {
|
|
handle_operation(curr_handle, todo_key());
|
|
}
|
|
}
|
|
} else if (message->msg_type == 5) {
|
|
// printf("\nResponse\n");
|
|
|
|
if (validate_signature(message, todo_key())) {
|
|
|
|
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
|
|
out_msg->m = message;
|
|
out_msg->next = out_responses;
|
|
out_responses = out_msg;
|
|
}
|
|
struct msg_log *out;
|
|
|
|
uint32_t resp = message->option_value;
|
|
|
|
uint8_t count = 1;
|
|
|
|
for (out = out_responses; out != NULL; out = out->next) {
|
|
if (out->m->option_value == resp &&
|
|
out->m->timestamp == message->timestamp) {
|
|
count += 1;
|
|
}
|
|
}
|
|
|
|
uint32_t req_num = 0;
|
|
|
|
if (count > f) {
|
|
struct msg_log *out;
|
|
struct msg_log *curr;
|
|
|
|
for (out = outstanding; out != NULL; out = out->next) {
|
|
curr = out->next;
|
|
|
|
if (curr != NULL && curr->m->timestamp == message->timestamp) {
|
|
out->next = curr->next;
|
|
req_num = curr->m->client_index;
|
|
clean_msg(curr->m);
|
|
free(curr);
|
|
}
|
|
}
|
|
|
|
if (outstanding != NULL &&
|
|
outstanding->m->timestamp == message->timestamp) {
|
|
req_num = outstanding->m->client_index;
|
|
outstanding = outstanding->next;
|
|
}
|
|
|
|
if (req_num > accepted_req_num) {
|
|
|
|
struct msg_log *loop;
|
|
// for (loop = mlog; loop != NULL; loop = loop->next) {
|
|
// display_msg(loop->m);
|
|
// fflush(stdout);
|
|
// }
|
|
|
|
printf("\nRESP [%d] > %d \n", req_num, resp);
|
|
fflush(stdout);
|
|
printf("%d > ", request_num);
|
|
fflush(stdout);
|
|
accepted_req_num = req_num;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
} else {
|
|
// handle view change.
|
|
}
|
|
}
|
|
|
|
// stdin msg -> forward to primary (check if it is us), start online phase
|
|
// other messages: keep message log, and validate as we proceed
|
|
}
|