commit bffe95971313b47d43c95c9852b3754395d6a4c4 Author: h Date: Sat Jan 25 00:01:36 2025 -0500 first BFT impl diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..28bd2ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +.ccls-cache/ +pbft diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3cdb5b4 --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +CC=gcc +CFLAGS=-Wall -Wextra -ggdb -std=gnu99 +SRC=client.c hosts.c parse.c online.c msg.c +OBJ=$(SRC:.c=.o) +LIBS=-lcrypto + +all: client + +.SUFFIXES: .c .o +.c.o: + $(CC) -c $(CFLAGS) $*.c + +client: $(OBJ) + $(CC) -o pbft $(OBJ) $(LIBS) + +clean: + rm -f *.o pbft diff --git a/byz/hcfg1 b/byz/hcfg1 new file mode 100644 index 0000000..afb72c0 --- /dev/null +++ b/byz/hcfg1 @@ -0,0 +1,4 @@ +0 +1 8000 127.0.0.1 8010 +2 8001 127.0.0.1 8020 +3 8002 127.0.0.1 8030 diff --git a/byz/hcfg2 b/byz/hcfg2 new file mode 100644 index 0000000..dbc819c --- /dev/null +++ b/byz/hcfg2 @@ -0,0 +1,4 @@ +1 +0 8010 127.0.0.1 8000 +2 8011 127.0.0.1 8021 +3 8012 127.0.0.1 8023 diff --git a/byz/hcfg3 b/byz/hcfg3 new file mode 100644 index 0000000..a67a24f --- /dev/null +++ b/byz/hcfg3 @@ -0,0 +1,4 @@ +2 +0 8020 127.0.0.1 8001 +1 8021 127.0.0.1 8011 +3 8022 127.0.0.1 8100 diff --git a/byz/hcfg4 b/byz/hcfg4 new file mode 100644 index 0000000..bdd4e43 --- /dev/null +++ b/byz/hcfg4 @@ -0,0 +1,4 @@ +3 +0 8030 127.0.0.1 8002 +1 8023 127.0.0.1 8012 +2 8100 127.0.0.1 8022 diff --git a/client.c b/client.c new file mode 100644 index 0000000..273a842 --- /dev/null +++ b/client.c @@ -0,0 +1,437 @@ +#include "hosts.h" +#include "msg.h" +#include "online.h" +#include "parse.h" +#include +#include +#include +#include +#include +#include + +extern char *f_host; // parsing data for hosts. + +extern struct host *conns[1024]; // connection data for each peer. + +extern int num_conns; // number of connected peers. + +extern int f; // maximum number of potentially faulty nodes. + +extern char *glob_name; // my name (not necessary here). + +extern uint32_t glob_replica_index; // my replica index. + +extern struct msg_log *mlog; + +struct msg_log *msg_reqs = + NULL; // message requests received from a pre-prepare. + +extern struct msg_log *outstanding; // outstanding requests. +extern struct msg_log *out_responses; // outstanding responses. + +uint32_t state_machine_state = 0; // initialize state machine state to 0. + +uint32_t view_next_seq_num = 1; + +uint32_t low_watermark = 0; + +uint32_t high_watermark = 300; + +uint32_t curr_view = 0; + +uint32_t accepted_req_num = 0; + +// Initialize host connections. +void initialize() { + init_hosts(f_host); + + printf("\n\n\n --- CONNECTIONS --- \n\n"); + for (int i = 0; i < num_conns; i++) { + printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port, + conns[i]->h_port); + } +} + +// Start up peer. +int main(int argc, char **argv) { + srand(time(NULL)); + parse(argc, argv); + + printf("Got host file: %s\n", f_host); + + initialize(); + + uint32_t request_num = 1; + + // Take requests from stdin: we want to poll on stdin and incoming messages. + + struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1)); + + int infd = fileno(stdin); + + pollfds[0].fd = infd; + pollfds[0].events = POLLIN | POLLPRI; + + for (int i = 0; i < num_conns; i++) { + pollfds[i + 1].fd = conns[i]->sockfd; + pollfds[i + 1].events = POLLIN | POLLPRI; + } + + printf("%d > ", request_num); + fflush(stdout); + + struct timespec start_time; + clock_gettime(CLOCK_REALTIME, &start_time); + + int timeout = 30; // this should be fixed for a view change. + + while (1) { + + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + if (poll(pollfds, num_conns + 1, timeout) != 0) { + if (pollfds[0].revents > 0) { + // update timeouts. + + // handle stdin request for state change. + char *str = NULL; + size_t len; + if (getline(&str, &len, stdin) != -1) { + bool proper_request = false; + struct msg *req = init_msg(); + req->msg_type = 0; // request + if (strncmp("ADD", str, strlen("ADD")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 0; + req->option_value = x; + proper_request = true; + } + if (strncmp("SUB", str, strlen("SUB")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 1; + req->option_value = x; + proper_request = true; + } + if (strncmp("MUL", str, strlen("MUL")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 2; + req->option_value = x; + proper_request = true; + } + if (strncmp("GET", str, strlen("GET")) == 0) { + req->req_message = 3; + proper_request = true; + } + + if (proper_request) { + struct timespec creq_timestamp; + clock_gettime(CLOCK_MONOTONIC, &creq_timestamp); + req->timestamp = creq_timestamp.tv_sec; + req->client_index = glob_replica_index; + packet *p = parse_msg_as_packet(req); + if (curr_view % (num_conns + 1) == glob_replica_index) { + // case: we are the primary. + handle_primary_recv_request(req, todo_key()); + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = req; + out_msg->m->seq = view_next_seq_num; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + view_next_seq_num += 1; + } else { + // case: we are a client sending to primary. + send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p); + } + clean_packet(p); + struct msg_log *ml = malloc(sizeof(struct msg_log)); + struct msg *standing = malloc(sizeof(struct msg)); + memcpy(standing, req, sizeof(struct msg)); + standing->client_index = request_num; + ml->m = standing; + ml->m->client_index = request_num; + ml->next = outstanding; + outstanding = ml; + request_num += 1; + } + } + } + + for (int i = 0; i < num_conns; i++) { + if (pollfds[i + 1].revents > 0) { + // handle message from peer at this point. + uint8_t msglen; + socklen_t addr_len = sizeof(conns[i]->conn); + recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); + recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + + uint8_t *new_data = malloc(msglen); + memcpy(new_data, msg_buf + 1, msglen); + free(msg_buf); + packet *p = malloc(sizeof(packet)); + p->is_timed_out = false; + p->len = msglen; + p->data = new_data; + struct msg *message = parse_packet_as_msg(p); + clean_packet(p); + + if (message->msg_type == 0) { + // printf("\nRequest\n"); + // request for message coming from a client + // check that we are the primary + if (curr_view % (num_conns + 1) == glob_replica_index) { + handle_primary_recv_request(message, todo_key()); + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->m->seq = view_next_seq_num; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + view_next_seq_num += 1; + } + } else if (message->msg_type == 1) { + // printf("\nPreprepare\n"); + // pre-prepare message sent out + // + // 1. parse the pre-prepare message. + // + // 2. receive the piggybacked message. + // + // validate and accept pre-prepare message. + + // receive the piggybacked message. + uint8_t msglen; + socklen_t addr_len = sizeof(conns[i]->conn); + recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); + recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + + uint8_t *new_data = malloc(msglen); + memcpy(new_data, msg_buf + 1, msglen); + free(msg_buf); + packet *p = malloc(sizeof(packet)); + p->is_timed_out = false; + p->len = msglen; + p->data = new_data; + struct msg *piggyback = parse_packet_as_msg(p); + + uint32_t d = small_digest(piggyback); + + bool check1 = d == message->digest; + bool check2 = validate_signature(message, todo_key()); + bool check3 = message->view == curr_view; + bool check4 = true; + struct msg_log *m; + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 1 && m->m->view == message->view && + m->m->seq == message->seq && + m->m->digest != message->digest) { + check4 = false; + } + } + bool check5 = (low_watermark <= message->seq && + message->seq <= high_watermark); + if (check1 && check2 && check3 && check4 && check5) { + struct msg_log *ml = malloc(sizeof(struct msg_log)); + ml->m = message; + ml->next = mlog; + struct msg_log *ml2 = malloc(sizeof(struct msg_log)); + ml2->m = piggyback; + ml2->next = ml; + mlog = ml2; + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + struct msg *cloned_msg = malloc(sizeof(struct msg)); + memcpy(cloned_msg, piggyback, sizeof(struct msg)); + cloned_msg->seq = message->seq; + out_msg->m = cloned_msg; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + + // now that the message log has been updated, we should broadcast + // prepare messages. + handle_send_prepare(message, todo_key()); + + struct msg_log *out; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (prepared(mlog, f, out->m, message->view, message->seq)) { + handle_send_commit(out->m, message, todo_key()); + } + } + } + } else if (message->msg_type == 2) { + // printf("\nPrepare\n"); + // receive a prepare message. + // 1. validate the prepare message. + // 2. check if this prepare message completes the `prepared` for any + // currently waiting message. + // if it does, send out a commit! + if (message->view == curr_view && low_watermark <= message->seq && + message->seq <= high_watermark && + validate_signature(message, todo_key())) { + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (prepared(mlog, f, out->m, message->view, message->seq)) { + handle_send_commit(out->m, message, todo_key()); + } + } + } + } else if (message->msg_type == 3) { + // printf("\nCommit\n"); + // someone committed. + // validate committed message. + + // struct msg_log *loop; + // for (loop = mlog; loop != NULL; loop = loop->next) { + // display_msg(loop->m); + // fflush(stdout); + // } + + if (message->view == curr_view && low_watermark <= message->seq && + message->seq <= high_watermark && + validate_signature(message, todo_key())) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out; + + uint32_t lowest_seq = UINT32_MAX; + + struct msg_log *prev_to_curr; + struct msg_log *curr; + struct msg *curr_handle; + + bool valid = false; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (out->next != NULL && + committed_local(mlog, f, out->next->m, message->view, + message->seq)) { + + if (out->next->m->seq < lowest_seq) { + lowest_seq = out->next->m->seq; + curr_handle = out->next->m; + prev_to_curr = out; + curr = out->next; + valid = true; + } + // [HERE] check if all reqs w lower sequence numbers have been + // established. + } + } + + if (msg_reqs != NULL && + committed_local(mlog, f, msg_reqs->m, message->view, + message->seq)) { + if (msg_reqs->m->seq < lowest_seq) { + lowest_seq = msg_reqs->m->seq; + curr_handle = msg_reqs->m; + prev_to_curr = NULL; + curr = msg_reqs; + valid = true; + } + } + + if (valid == true && prev_to_curr != NULL && curr != NULL) { + printf("\nTest1\n"); + prev_to_curr->next = curr->next; + printf("\nTest2\n"); + } else { + if (valid == true && msg_reqs != NULL) { + msg_reqs = msg_reqs->next; + } + } + + if (valid) { + handle_operation(curr_handle, todo_key()); + } + } + } else if (message->msg_type == 5) { + // printf("\nResponse\n"); + + if (validate_signature(message, todo_key())) { + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = out_responses; + out_responses = out_msg; + } + struct msg_log *out; + + uint32_t resp = message->option_value; + + uint8_t count = 1; + + for (out = out_responses; out != NULL; out = out->next) { + if (out->m->option_value == resp && + out->m->timestamp == message->timestamp) { + count += 1; + } + } + + uint32_t req_num = 0; + + if (count > f) { + struct msg_log *out; + struct msg_log *curr; + + for (out = outstanding; out != NULL; out = out->next) { + curr = out->next; + + if (curr != NULL && curr->m->timestamp == message->timestamp) { + out->next = curr->next; + req_num = curr->m->client_index; + clean_msg(curr->m); + free(curr); + } + } + + if (outstanding != NULL && + outstanding->m->timestamp == message->timestamp) { + req_num = outstanding->m->client_index; + outstanding = outstanding->next; + } + + if (req_num > accepted_req_num) { + + struct msg_log *loop; + // for (loop = mlog; loop != NULL; loop = loop->next) { + // display_msg(loop->m); + // fflush(stdout); + // } + + printf("\nRESP [%d] > %d \n", req_num, resp); + fflush(stdout); + printf("%d > ", request_num); + fflush(stdout); + accepted_req_num = req_num; + } + } + } + } + } + + } else { + // handle view change. + } + } + + // stdin msg -> forward to primary (check if it is us), start online phase + // other messages: keep message log, and validate as we proceed +} diff --git a/hosts.c b/hosts.c new file mode 100644 index 0000000..e37f830 --- /dev/null +++ b/hosts.c @@ -0,0 +1,94 @@ +#include "hosts.h" +#include +#include +#include +#include +#include +#include + +struct host *conns[1024]; +int num_conns = 0; +int f = 0; +char *glob_name; +uint32_t glob_replica_index = 0; + +int init_conn(int h_port) { + + int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + + if (sock_fd < 0) { + printf("Socket initialization failed."); + exit(1); + } + + struct sockaddr_in serv_addr; + + memset(&serv_addr, 0, sizeof(serv_addr)); + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(h_port); + serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + printf("Failed to bind socket.\n"); + exit(1); + } + + return sock_fd; +} + +void init_hosts(char *file) { + + char *line = NULL; + size_t len = 0; + ssize_t read; + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + printf("Host file not found.\n"); + exit(1); + } + + int name = 1; + + while ((read = getline(&line, &len, fp)) != -1) { + if (name == 1) { + glob_name = malloc(sizeof(char) * (strlen(line) + 1)); + strcpy(glob_name, line); + glob_name[strcspn(glob_name, "\n")] = 0; + glob_replica_index = atoi(glob_name); + printf("Name: %s", glob_name); + name = 0; + } else { + char *cname = malloc(200 * sizeof(char)); + char *addr = malloc(32 * sizeof(char)); + int c_port; + int h_port; + int ret = sscanf(line, "%200s %d %32s %d", cname, &h_port, addr, &c_port); + if (ret != 4) { + printf("Could not parse host file.\n"); + exit(1); + } + struct host *h = malloc(sizeof(struct host)); + h->name = cname; + h->replica_index = atoi(cname); + h->h_port = h_port; + h->c_port = c_port; + h->sockfd = init_conn(h_port); + + struct sockaddr_in conn_addr; + + memset(&conn_addr, 0, sizeof(conn_addr)); + + conn_addr.sin_family = AF_INET; + conn_addr.sin_port = htons(c_port); + conn_addr.sin_addr.s_addr = inet_addr(addr); + free(addr); + h->conn = conn_addr; + conns[num_conns] = h; + num_conns += 1; + } + } + + f = (num_conns) / 3; +} diff --git a/hosts.h b/hosts.h new file mode 100644 index 0000000..2585905 --- /dev/null +++ b/hosts.h @@ -0,0 +1,17 @@ +#ifndef H_HOST +#define H_HOST + +#include + +struct host { + char *name; + int replica_index; + int sockfd; + struct sockaddr_in conn; + int h_port; + int c_port; +}; + +void init_hosts(char *file); + +#endif diff --git a/msg.c b/msg.c new file mode 100644 index 0000000..27a734c --- /dev/null +++ b/msg.c @@ -0,0 +1,177 @@ +#include "msg.h" +#include "hosts.h" +#include +#include +#include +#include +#include + +struct msg_log *mlog = NULL; + +struct msg_log *outstanding = NULL; +struct msg_log *out_responses = NULL; + +void broadcast(struct host **conns, int num_conns, packet *data) { + + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + for (int i = 0; i < num_conns; i++) { + sendto(conns[i]->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); + } + free(transmit); +} + +void send_to(struct host *conn, packet *data) { + + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + sendto(conn->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conn->conn, sizeof(conn->conn)); + free(transmit); +} + +void send_to_replica(struct host **conns, int num_conns, int replica, + packet *data) { + for (int i = 0; i < num_conns; i++) { + if (replica == conns[i]->replica_index) { + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + sendto(conns[i]->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); + free(transmit); + } + } +} + +void append_msg(struct msg *m) { + struct msg_log *cur = malloc(sizeof(struct msg_log)); + cur->m = m; + cur->next = mlog; +} + +void discard_before_seq(uint32_t seq) { + struct msg_log *old; + struct msg_log *cur; + for (old = mlog; old != NULL; old = old->next) { + cur = old->next; + if (cur != NULL && cur->m->seq < seq) { + old->next = cur->next; + free(cur->m); + free(cur); + } + } + + if (mlog != NULL && mlog->m->seq < seq) { + mlog = mlog->next; + } +} + +struct msg *parse_packet_as_msg(packet *p) { + uint8_t msg_type = p->data[0]; + uint8_t req_message = p->data[1]; + uint32_t option_value = ntohl(((uint32_t *)p->data)[1]); + uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]); + uint32_t client_index = ntohl(((uint32_t *)p->data)[4]); + uint32_t view = ntohl(((uint32_t *)p->data)[5]); + uint32_t seq = ntohl(((uint32_t *)p->data)[6]); + uint32_t digest = ntohl(((uint32_t *)p->data)[7]); + uint32_t server_index = ntohl(((uint32_t *)p->data)[8]); + uint32_t signature = ntohl(((uint32_t *)p->data)[9]); + + struct msg *m = malloc(sizeof(struct msg)); + + m->msg_type = msg_type; + m->option_value = option_value; + m->timestamp = timestamp; + m->client_index = client_index; + m->req_message = req_message; + m->view = view; + m->seq = seq; + m->digest = digest; + m->server_index = server_index; + m->signature = signature; + return m; +} + +packet *parse_msg_as_packet(struct msg *m) { + uint32_t *data = malloc(sizeof(uint32_t) * 10); + data[0] = 0; + ((uint8_t *)data)[0] = m->msg_type; + ((uint8_t *)data)[1] = m->req_message; + + data[1] = htonl(m->option_value); + ((uint64_t *)data)[1] = htobe64(m->timestamp); + data[4] = htonl(m->client_index); + data[5] = htonl(m->view); + data[6] = htonl(m->seq); + data[7] = htonl(m->digest); + data[8] = htonl(m->server_index); + data[9] = htonl(m->signature); + + uint8_t len = sizeof(uint32_t) * 10; + + packet *p = malloc(sizeof(packet)); + + p->is_timed_out = false; + p->data = (uint8_t *)data; + p->len = len; + + return p; +} + +void clean_packet(packet *p) { + free(p->data); + free(p); +} + +void clean_msg(struct msg *m) { free(m); } + +struct msg *init_msg() { + struct msg *m = malloc(sizeof(struct msg)); + m->msg_type = 0; + + m->req_message = 0; + m->option_value = 0; + m->timestamp = 0; + m->client_index = 0; + + m->view = 0; + m->seq = 0; + m->digest = 0; + m->server_index = 0; + m->signature = 0; + + return m; +} + +void display_msg(struct msg *m) { + if (m->msg_type == 0) { + printf("Request: "); + if (m->req_message == 0) { + printf(" ADD %d [", m->option_value); + } else if (m->req_message == 1) { + printf(" SUB %d [", m->option_value); + } else if (m->req_message == 2) { + printf(" MUL %d [", m->option_value); + } else if (m->req_message == 3) { + printf(" GET ["); + } + + printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index); + } else if (m->msg_type == 1) { + printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq, + m->digest); + } else if (m->msg_type == 2) { + printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, + m->server_index, m->digest); + } else if (m->msg_type == 3) { + printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, + m->server_index, m->digest); + } else if (m->msg_type == 5) { + printf("Response: [ value %d ] \n", m->option_value); + } +} diff --git a/msg.h b/msg.h new file mode 100644 index 0000000..d39b8bd --- /dev/null +++ b/msg.h @@ -0,0 +1,61 @@ +#ifndef H_MSG +#define H_MSG + +#include "hosts.h" +#include +#include + +typedef struct { + bool is_timed_out; + uint8_t *data; // always in network byte order + uint8_t len; +} packet; + +// client msg requests: (ADD(x), SUB(x), MUL(x), GET) +// state += x, -= x, *= x, returns state + +struct msg { + uint8_t msg_type; // 0 = request, 1 = preprepare, 2 = prepare, 3 = commit, 4 = + // response + + // request fields + uint8_t req_message; // 0 = add, 1 = sub, 2 = mul, 3 = get + uint32_t option_value; // value for message. + uint64_t timestamp; + uint32_t client_index; + + // preprepare, prepare, commit, checkpoint fields + uint32_t view; + uint32_t seq; + uint32_t digest; + uint32_t server_index; + uint32_t signature; +}; + +struct msg_log { + struct msg_log *next; + struct msg *m; +}; + +void append_msg(struct msg *m); + +void discard_before_seq(uint32_t seq); + +void clean_packet(packet *p); + +struct msg *init_msg(); + +void clean_msg(struct msg *m); + +void display_msg(struct msg *m); + +struct msg *parse_packet_as_msg(packet *p); +packet *parse_msg_as_packet(struct msg *m); + +void broadcast(struct host **conns, int num_conns, packet *data); +void send_to(struct host *conn, packet *data); + +void send_to_replica(struct host **conns, int num_conns, int replica, + packet *data); + +#endif diff --git a/online.c b/online.c new file mode 100644 index 0000000..0f56066 --- /dev/null +++ b/online.c @@ -0,0 +1,250 @@ +#include "online.h" +#include "hosts.h" +#include "msg.h" +#include +#include +#include +#include +#include + +extern struct host *conns[1024]; +extern int num_conns; + +extern uint32_t curr_view; +extern uint32_t view_next_seq_num; + +extern uint32_t state_machine_state; + +extern struct msg_log *mlog; +extern struct msg_log *out_responses; + +extern uint32_t glob_replica_index; + +uint32_t small_digest(struct msg *request) { + char data[20] = {0}; + char *out = malloc(32 * sizeof(char)); + for (int i = 0; i < 32; i++) { + out[i] = 0; + } + + data[0] = request->req_message; + ((uint32_t *)data)[1] = request->option_value; + ((uint64_t *)data)[1] = request->timestamp; + + ((uint32_t *)data)[4] = request->client_index; + SHA256((unsigned char *)data, 20, (unsigned char *)out); + uint32_t x = ((uint32_t *)out)[0]; + + free(out); + + return x; +} + +void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey) { + struct msg *pre_prepare = init_msg(); + uint32_t d = small_digest(request); + pre_prepare->msg_type = 1; + pre_prepare->seq = view_next_seq_num; + pre_prepare->view = curr_view; + pre_prepare->digest = d; + + pre_prepare->signature = sign_msg(pre_prepare, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(pre_prepare)); + broadcast(conns, num_conns, parse_msg_as_packet(request)); + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + struct msg *o = malloc(sizeof(struct msg)); + memcpy(o, request, sizeof(struct msg)); + out_msg->m = o; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = pre_prepare; + out2->next = mlog; + mlog = out2; +} + +void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey) { + + struct msg *prepare = init_msg(); + prepare->msg_type = 2; + prepare->seq = preprepare->seq; + prepare->view = preprepare->view; + prepare->server_index = glob_replica_index; + prepare->digest = preprepare->digest; + prepare->signature = sign_msg(prepare, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(prepare)); + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = prepare; + out2->next = mlog; + mlog = out2; +} + +void handle_send_commit(struct msg *msg, struct msg *prepare, + struct sigpriv *privkey) { + + struct msg *commit = init_msg(); + commit->msg_type = 3; + commit->seq = prepare->seq; + commit->view = prepare->view; + commit->server_index = glob_replica_index; + commit->digest = small_digest(msg); + commit->signature = sign_msg(commit, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(commit)); + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = commit; + out2->next = mlog; + mlog = out2; +} + +void handle_operation(struct msg *msg, struct sigpriv *privkey) { + + struct msg *m = init_msg(); + m->msg_type = 5; + if (msg->req_message == 0) { + state_machine_state += msg->option_value; + + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 1) { + state_machine_state -= msg->option_value; + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 2) { + state_machine_state *= msg->option_value; + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 3) { + m->option_value = state_machine_state; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } + clean_msg(m); +} + +bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view, + uint32_t seq) { + bool is_request_in = false; + bool is_preprepare_in = false; + uint8_t num_prepares = 0; + + uint32_t digest_req = small_digest(request); + + struct msg_log *m; + + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 0 && request->msg_type == 0 && + m->m->req_message == request->req_message && + m->m->option_value == request->option_value && + m->m->timestamp == request->timestamp && + m->m->client_index == request->client_index) { + is_request_in = true; + } + + if (m->m->msg_type == 1 && m->m->view == view && m->m->seq == seq && + m->m->digest == digest_req) { + is_preprepare_in = true; + } + + if (m->m->msg_type == 2 && m->m->view == view && m->m->seq == seq && + m->m->digest == digest_req) { + num_prepares += 1; + } + } + + return (is_request_in && is_preprepare_in && (num_prepares >= 2 * f)); +} + +bool committed_local(struct msg_log *mlog, int f, struct msg *request, + uint32_t view, uint32_t seq) { + + bool is_prepared = prepared(mlog, f, request, view, seq); + + uint32_t cor_seq = request->seq; + request->seq = 0; + uint32_t dg = small_digest(request); + + request->seq = cor_seq; + + struct msg_log *m; + + uint8_t commits = 0; + + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 3 && m->m->view == view && m->m->digest == dg && + m->m->seq == request->seq) { + struct msg_log *k; + uint32_t factor = 1; + for (k = mlog; k != m; k = k->next) { + if (k->m->msg_type == 3 && k->m->view == view && k->m->digest == dg && + k->m->seq == request->seq && + k->m->server_index == m->m->server_index) { + factor = 0; + } + } + commits += 1 * factor; + } + } + + // printf("committed? %d %d %d \n", is_prepared, commits, 2 * f + 1); + + return (is_prepared && (commits >= 2 * f + 1)); +} + +uint32_t sign_msg(struct msg *_message, struct sigpriv *_key) { return 0; } + +bool validate_signature(struct msg *_message, struct sigkey *_privkey) { + return true; +} + +void *todo_key() { return NULL; } diff --git a/online.h b/online.h new file mode 100644 index 0000000..0c5f2d4 --- /dev/null +++ b/online.h @@ -0,0 +1,39 @@ + +#ifndef H_ONL +#define H_ONL + +#include "hosts.h" +#include "msg.h" + +struct sigkey { + uint32_t key; +}; + +struct sigpriv { + uint32_t key; +}; + +uint32_t small_digest(struct msg *request); + +void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey); + +void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey); + +void handle_send_commit(struct msg *msg, struct msg *prepare, + struct sigpriv *privkey); + +void handle_operation(struct msg *msg, struct sigpriv *privkey); + +bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view, + uint32_t seq); + +bool committed_local(struct msg_log *mlog, int f, struct msg *request, + uint32_t view, uint32_t seq); + +uint32_t sign_msg(struct msg *message, struct sigpriv *privkey); + +bool validate_signature(struct msg *message, struct sigkey *key); + +void *todo_key(); + +#endif diff --git a/parse.c b/parse.c new file mode 100644 index 0000000..4fa0071 --- /dev/null +++ b/parse.c @@ -0,0 +1,28 @@ +#include +#include +#include +#include + +extern char *optarg; +char *f_host; + +int parse(int argc, char **argv) { + char opt; + bool h; + + while ((opt = getopt(argc, argv, "h:")) != EOF) { + switch (opt) { + case 'h': + f_host = optarg; + h = true; + break; + } + } + + if (!h) { + printf("Please provide all arguments.\n"); + exit(1); + } + + return optind; +} diff --git a/parse.h b/parse.h new file mode 100644 index 0000000..b1c757d --- /dev/null +++ b/parse.h @@ -0,0 +1,6 @@ +#ifndef H_PARSE +#define H_PARSE + +int parse(int argc, char **argv); + +#endif diff --git a/test/hcfg1 b/test/hcfg1 new file mode 100644 index 0000000..9832f13 --- /dev/null +++ b/test/hcfg1 @@ -0,0 +1,3 @@ +0 +1 8000 127.0.0.1 8010 +2 8001 127.0.0.1 8020 diff --git a/test/hcfg2 b/test/hcfg2 new file mode 100644 index 0000000..3cffc58 --- /dev/null +++ b/test/hcfg2 @@ -0,0 +1,3 @@ +1 +0 8010 127.0.0.1 8000 +2 8011 127.0.0.1 8021 diff --git a/test/hcfg3 b/test/hcfg3 new file mode 100644 index 0000000..1dad058 --- /dev/null +++ b/test/hcfg3 @@ -0,0 +1,3 @@ +2 +0 8020 127.0.0.1 8001 +1 8021 127.0.0.1 8011 diff --git a/test2/hcfg1 b/test2/hcfg1 new file mode 100644 index 0000000..a32e876 --- /dev/null +++ b/test2/hcfg1 @@ -0,0 +1,2 @@ +0 +1 8001 127.0.0.1 8010 diff --git a/test2/hcfg2 b/test2/hcfg2 new file mode 100644 index 0000000..ac816e3 --- /dev/null +++ b/test2/hcfg2 @@ -0,0 +1,2 @@ +1 +0 8010 127.0.0.1 8001