From bffe95971313b47d43c95c9852b3754395d6a4c4 Mon Sep 17 00:00:00 2001 From: h Date: Sat, 25 Jan 2025 00:01:36 -0500 Subject: [PATCH] first BFT impl --- .gitignore | 3 + Makefile | 17 ++ byz/hcfg1 | 4 + byz/hcfg2 | 4 + byz/hcfg3 | 4 + byz/hcfg4 | 4 + client.c | 437 ++++++++++++++++++++++++++++++++++++++++++++++++++++ hosts.c | 94 +++++++++++ hosts.h | 17 ++ msg.c | 177 +++++++++++++++++++++ msg.h | 61 ++++++++ online.c | 250 ++++++++++++++++++++++++++++++ online.h | 39 +++++ parse.c | 28 ++++ parse.h | 6 + test/hcfg1 | 3 + test/hcfg2 | 3 + test/hcfg3 | 3 + test2/hcfg1 | 2 + test2/hcfg2 | 2 + 20 files changed, 1158 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 byz/hcfg1 create mode 100644 byz/hcfg2 create mode 100644 byz/hcfg3 create mode 100644 byz/hcfg4 create mode 100644 client.c create mode 100644 hosts.c create mode 100644 hosts.h create mode 100644 msg.c create mode 100644 msg.h create mode 100644 online.c create mode 100644 online.h create mode 100644 parse.c create mode 100644 parse.h create mode 100644 test/hcfg1 create mode 100644 test/hcfg2 create mode 100644 test/hcfg3 create mode 100644 test2/hcfg1 create mode 100644 test2/hcfg2 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..28bd2ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +.ccls-cache/ +pbft diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3cdb5b4 --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +CC=gcc +CFLAGS=-Wall -Wextra -ggdb -std=gnu99 +SRC=client.c hosts.c parse.c online.c msg.c +OBJ=$(SRC:.c=.o) +LIBS=-lcrypto + +all: client + +.SUFFIXES: .c .o +.c.o: + $(CC) -c $(CFLAGS) $*.c + +client: $(OBJ) + $(CC) -o pbft $(OBJ) $(LIBS) + +clean: + rm -f *.o pbft diff --git a/byz/hcfg1 b/byz/hcfg1 new file mode 100644 index 0000000..afb72c0 --- /dev/null +++ b/byz/hcfg1 @@ -0,0 +1,4 @@ +0 +1 8000 127.0.0.1 8010 +2 8001 127.0.0.1 8020 +3 8002 127.0.0.1 8030 diff --git a/byz/hcfg2 b/byz/hcfg2 new file mode 100644 index 0000000..dbc819c --- /dev/null +++ b/byz/hcfg2 @@ -0,0 +1,4 @@ +1 +0 8010 127.0.0.1 8000 +2 8011 127.0.0.1 8021 +3 8012 127.0.0.1 8023 diff --git a/byz/hcfg3 b/byz/hcfg3 new file mode 100644 index 0000000..a67a24f --- /dev/null +++ b/byz/hcfg3 @@ -0,0 +1,4 @@ +2 +0 8020 127.0.0.1 8001 +1 8021 127.0.0.1 8011 +3 8022 127.0.0.1 8100 diff --git a/byz/hcfg4 b/byz/hcfg4 new file mode 100644 index 0000000..bdd4e43 --- /dev/null +++ b/byz/hcfg4 @@ -0,0 +1,4 @@ +3 +0 8030 127.0.0.1 8002 +1 8023 127.0.0.1 8012 +2 8100 127.0.0.1 8022 diff --git a/client.c b/client.c new file mode 100644 index 0000000..273a842 --- /dev/null +++ b/client.c @@ -0,0 +1,437 @@ +#include "hosts.h" +#include "msg.h" +#include "online.h" +#include "parse.h" +#include +#include +#include +#include +#include +#include + +extern char *f_host; // parsing data for hosts. + +extern struct host *conns[1024]; // connection data for each peer. + +extern int num_conns; // number of connected peers. + +extern int f; // maximum number of potentially faulty nodes. + +extern char *glob_name; // my name (not necessary here). + +extern uint32_t glob_replica_index; // my replica index. + +extern struct msg_log *mlog; + +struct msg_log *msg_reqs = + NULL; // message requests received from a pre-prepare. + +extern struct msg_log *outstanding; // outstanding requests. +extern struct msg_log *out_responses; // outstanding responses. + +uint32_t state_machine_state = 0; // initialize state machine state to 0. + +uint32_t view_next_seq_num = 1; + +uint32_t low_watermark = 0; + +uint32_t high_watermark = 300; + +uint32_t curr_view = 0; + +uint32_t accepted_req_num = 0; + +// Initialize host connections. +void initialize() { + init_hosts(f_host); + + printf("\n\n\n --- CONNECTIONS --- \n\n"); + for (int i = 0; i < num_conns; i++) { + printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port, + conns[i]->h_port); + } +} + +// Start up peer. +int main(int argc, char **argv) { + srand(time(NULL)); + parse(argc, argv); + + printf("Got host file: %s\n", f_host); + + initialize(); + + uint32_t request_num = 1; + + // Take requests from stdin: we want to poll on stdin and incoming messages. + + struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1)); + + int infd = fileno(stdin); + + pollfds[0].fd = infd; + pollfds[0].events = POLLIN | POLLPRI; + + for (int i = 0; i < num_conns; i++) { + pollfds[i + 1].fd = conns[i]->sockfd; + pollfds[i + 1].events = POLLIN | POLLPRI; + } + + printf("%d > ", request_num); + fflush(stdout); + + struct timespec start_time; + clock_gettime(CLOCK_REALTIME, &start_time); + + int timeout = 30; // this should be fixed for a view change. + + while (1) { + + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + if (poll(pollfds, num_conns + 1, timeout) != 0) { + if (pollfds[0].revents > 0) { + // update timeouts. + + // handle stdin request for state change. + char *str = NULL; + size_t len; + if (getline(&str, &len, stdin) != -1) { + bool proper_request = false; + struct msg *req = init_msg(); + req->msg_type = 0; // request + if (strncmp("ADD", str, strlen("ADD")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 0; + req->option_value = x; + proper_request = true; + } + if (strncmp("SUB", str, strlen("SUB")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 1; + req->option_value = x; + proper_request = true; + } + if (strncmp("MUL", str, strlen("MUL")) == 0) { + char *argument = str + 4; + uint32_t x = atoi(argument); + req->req_message = 2; + req->option_value = x; + proper_request = true; + } + if (strncmp("GET", str, strlen("GET")) == 0) { + req->req_message = 3; + proper_request = true; + } + + if (proper_request) { + struct timespec creq_timestamp; + clock_gettime(CLOCK_MONOTONIC, &creq_timestamp); + req->timestamp = creq_timestamp.tv_sec; + req->client_index = glob_replica_index; + packet *p = parse_msg_as_packet(req); + if (curr_view % (num_conns + 1) == glob_replica_index) { + // case: we are the primary. + handle_primary_recv_request(req, todo_key()); + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = req; + out_msg->m->seq = view_next_seq_num; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + view_next_seq_num += 1; + } else { + // case: we are a client sending to primary. + send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p); + } + clean_packet(p); + struct msg_log *ml = malloc(sizeof(struct msg_log)); + struct msg *standing = malloc(sizeof(struct msg)); + memcpy(standing, req, sizeof(struct msg)); + standing->client_index = request_num; + ml->m = standing; + ml->m->client_index = request_num; + ml->next = outstanding; + outstanding = ml; + request_num += 1; + } + } + } + + for (int i = 0; i < num_conns; i++) { + if (pollfds[i + 1].revents > 0) { + // handle message from peer at this point. + uint8_t msglen; + socklen_t addr_len = sizeof(conns[i]->conn); + recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); + recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + + uint8_t *new_data = malloc(msglen); + memcpy(new_data, msg_buf + 1, msglen); + free(msg_buf); + packet *p = malloc(sizeof(packet)); + p->is_timed_out = false; + p->len = msglen; + p->data = new_data; + struct msg *message = parse_packet_as_msg(p); + clean_packet(p); + + if (message->msg_type == 0) { + // printf("\nRequest\n"); + // request for message coming from a client + // check that we are the primary + if (curr_view % (num_conns + 1) == glob_replica_index) { + handle_primary_recv_request(message, todo_key()); + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->m->seq = view_next_seq_num; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + view_next_seq_num += 1; + } + } else if (message->msg_type == 1) { + // printf("\nPreprepare\n"); + // pre-prepare message sent out + // + // 1. parse the pre-prepare message. + // + // 2. receive the piggybacked message. + // + // validate and accept pre-prepare message. + + // receive the piggybacked message. + uint8_t msglen; + socklen_t addr_len = sizeof(conns[i]->conn); + recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + uint8_t *msg_buf = malloc(msglen + sizeof(msglen)); + recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0, + (struct sockaddr *)&(conns[i]->conn), &addr_len); + + uint8_t *new_data = malloc(msglen); + memcpy(new_data, msg_buf + 1, msglen); + free(msg_buf); + packet *p = malloc(sizeof(packet)); + p->is_timed_out = false; + p->len = msglen; + p->data = new_data; + struct msg *piggyback = parse_packet_as_msg(p); + + uint32_t d = small_digest(piggyback); + + bool check1 = d == message->digest; + bool check2 = validate_signature(message, todo_key()); + bool check3 = message->view == curr_view; + bool check4 = true; + struct msg_log *m; + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 1 && m->m->view == message->view && + m->m->seq == message->seq && + m->m->digest != message->digest) { + check4 = false; + } + } + bool check5 = (low_watermark <= message->seq && + message->seq <= high_watermark); + if (check1 && check2 && check3 && check4 && check5) { + struct msg_log *ml = malloc(sizeof(struct msg_log)); + ml->m = message; + ml->next = mlog; + struct msg_log *ml2 = malloc(sizeof(struct msg_log)); + ml2->m = piggyback; + ml2->next = ml; + mlog = ml2; + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + struct msg *cloned_msg = malloc(sizeof(struct msg)); + memcpy(cloned_msg, piggyback, sizeof(struct msg)); + cloned_msg->seq = message->seq; + out_msg->m = cloned_msg; + out_msg->next = msg_reqs; + msg_reqs = out_msg; + + // now that the message log has been updated, we should broadcast + // prepare messages. + handle_send_prepare(message, todo_key()); + + struct msg_log *out; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (prepared(mlog, f, out->m, message->view, message->seq)) { + handle_send_commit(out->m, message, todo_key()); + } + } + } + } else if (message->msg_type == 2) { + // printf("\nPrepare\n"); + // receive a prepare message. + // 1. validate the prepare message. + // 2. check if this prepare message completes the `prepared` for any + // currently waiting message. + // if it does, send out a commit! + if (message->view == curr_view && low_watermark <= message->seq && + message->seq <= high_watermark && + validate_signature(message, todo_key())) { + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (prepared(mlog, f, out->m, message->view, message->seq)) { + handle_send_commit(out->m, message, todo_key()); + } + } + } + } else if (message->msg_type == 3) { + // printf("\nCommit\n"); + // someone committed. + // validate committed message. + + // struct msg_log *loop; + // for (loop = mlog; loop != NULL; loop = loop->next) { + // display_msg(loop->m); + // fflush(stdout); + // } + + if (message->view == curr_view && low_watermark <= message->seq && + message->seq <= high_watermark && + validate_signature(message, todo_key())) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out; + + uint32_t lowest_seq = UINT32_MAX; + + struct msg_log *prev_to_curr; + struct msg_log *curr; + struct msg *curr_handle; + + bool valid = false; + + for (out = msg_reqs; out != NULL; out = out->next) { + if (out->next != NULL && + committed_local(mlog, f, out->next->m, message->view, + message->seq)) { + + if (out->next->m->seq < lowest_seq) { + lowest_seq = out->next->m->seq; + curr_handle = out->next->m; + prev_to_curr = out; + curr = out->next; + valid = true; + } + // [HERE] check if all reqs w lower sequence numbers have been + // established. + } + } + + if (msg_reqs != NULL && + committed_local(mlog, f, msg_reqs->m, message->view, + message->seq)) { + if (msg_reqs->m->seq < lowest_seq) { + lowest_seq = msg_reqs->m->seq; + curr_handle = msg_reqs->m; + prev_to_curr = NULL; + curr = msg_reqs; + valid = true; + } + } + + if (valid == true && prev_to_curr != NULL && curr != NULL) { + printf("\nTest1\n"); + prev_to_curr->next = curr->next; + printf("\nTest2\n"); + } else { + if (valid == true && msg_reqs != NULL) { + msg_reqs = msg_reqs->next; + } + } + + if (valid) { + handle_operation(curr_handle, todo_key()); + } + } + } else if (message->msg_type == 5) { + // printf("\nResponse\n"); + + if (validate_signature(message, todo_key())) { + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = message; + out_msg->next = out_responses; + out_responses = out_msg; + } + struct msg_log *out; + + uint32_t resp = message->option_value; + + uint8_t count = 1; + + for (out = out_responses; out != NULL; out = out->next) { + if (out->m->option_value == resp && + out->m->timestamp == message->timestamp) { + count += 1; + } + } + + uint32_t req_num = 0; + + if (count > f) { + struct msg_log *out; + struct msg_log *curr; + + for (out = outstanding; out != NULL; out = out->next) { + curr = out->next; + + if (curr != NULL && curr->m->timestamp == message->timestamp) { + out->next = curr->next; + req_num = curr->m->client_index; + clean_msg(curr->m); + free(curr); + } + } + + if (outstanding != NULL && + outstanding->m->timestamp == message->timestamp) { + req_num = outstanding->m->client_index; + outstanding = outstanding->next; + } + + if (req_num > accepted_req_num) { + + struct msg_log *loop; + // for (loop = mlog; loop != NULL; loop = loop->next) { + // display_msg(loop->m); + // fflush(stdout); + // } + + printf("\nRESP [%d] > %d \n", req_num, resp); + fflush(stdout); + printf("%d > ", request_num); + fflush(stdout); + accepted_req_num = req_num; + } + } + } + } + } + + } else { + // handle view change. + } + } + + // stdin msg -> forward to primary (check if it is us), start online phase + // other messages: keep message log, and validate as we proceed +} diff --git a/hosts.c b/hosts.c new file mode 100644 index 0000000..e37f830 --- /dev/null +++ b/hosts.c @@ -0,0 +1,94 @@ +#include "hosts.h" +#include +#include +#include +#include +#include +#include + +struct host *conns[1024]; +int num_conns = 0; +int f = 0; +char *glob_name; +uint32_t glob_replica_index = 0; + +int init_conn(int h_port) { + + int sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + + if (sock_fd < 0) { + printf("Socket initialization failed."); + exit(1); + } + + struct sockaddr_in serv_addr; + + memset(&serv_addr, 0, sizeof(serv_addr)); + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(h_port); + serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + printf("Failed to bind socket.\n"); + exit(1); + } + + return sock_fd; +} + +void init_hosts(char *file) { + + char *line = NULL; + size_t len = 0; + ssize_t read; + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + printf("Host file not found.\n"); + exit(1); + } + + int name = 1; + + while ((read = getline(&line, &len, fp)) != -1) { + if (name == 1) { + glob_name = malloc(sizeof(char) * (strlen(line) + 1)); + strcpy(glob_name, line); + glob_name[strcspn(glob_name, "\n")] = 0; + glob_replica_index = atoi(glob_name); + printf("Name: %s", glob_name); + name = 0; + } else { + char *cname = malloc(200 * sizeof(char)); + char *addr = malloc(32 * sizeof(char)); + int c_port; + int h_port; + int ret = sscanf(line, "%200s %d %32s %d", cname, &h_port, addr, &c_port); + if (ret != 4) { + printf("Could not parse host file.\n"); + exit(1); + } + struct host *h = malloc(sizeof(struct host)); + h->name = cname; + h->replica_index = atoi(cname); + h->h_port = h_port; + h->c_port = c_port; + h->sockfd = init_conn(h_port); + + struct sockaddr_in conn_addr; + + memset(&conn_addr, 0, sizeof(conn_addr)); + + conn_addr.sin_family = AF_INET; + conn_addr.sin_port = htons(c_port); + conn_addr.sin_addr.s_addr = inet_addr(addr); + free(addr); + h->conn = conn_addr; + conns[num_conns] = h; + num_conns += 1; + } + } + + f = (num_conns) / 3; +} diff --git a/hosts.h b/hosts.h new file mode 100644 index 0000000..2585905 --- /dev/null +++ b/hosts.h @@ -0,0 +1,17 @@ +#ifndef H_HOST +#define H_HOST + +#include + +struct host { + char *name; + int replica_index; + int sockfd; + struct sockaddr_in conn; + int h_port; + int c_port; +}; + +void init_hosts(char *file); + +#endif diff --git a/msg.c b/msg.c new file mode 100644 index 0000000..27a734c --- /dev/null +++ b/msg.c @@ -0,0 +1,177 @@ +#include "msg.h" +#include "hosts.h" +#include +#include +#include +#include +#include + +struct msg_log *mlog = NULL; + +struct msg_log *outstanding = NULL; +struct msg_log *out_responses = NULL; + +void broadcast(struct host **conns, int num_conns, packet *data) { + + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + for (int i = 0; i < num_conns; i++) { + sendto(conns[i]->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); + } + free(transmit); +} + +void send_to(struct host *conn, packet *data) { + + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + sendto(conn->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conn->conn, sizeof(conn->conn)); + free(transmit); +} + +void send_to_replica(struct host **conns, int num_conns, int replica, + packet *data) { + for (int i = 0; i < num_conns; i++) { + if (replica == conns[i]->replica_index) { + char *transmit = malloc(sizeof(char) * data->len + 1); + transmit[0] = (char)data->len; + memcpy((transmit + 1), data->data, data->len); + sendto(conns[i]->sockfd, transmit, data->len + 1, 0, + (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); + free(transmit); + } + } +} + +void append_msg(struct msg *m) { + struct msg_log *cur = malloc(sizeof(struct msg_log)); + cur->m = m; + cur->next = mlog; +} + +void discard_before_seq(uint32_t seq) { + struct msg_log *old; + struct msg_log *cur; + for (old = mlog; old != NULL; old = old->next) { + cur = old->next; + if (cur != NULL && cur->m->seq < seq) { + old->next = cur->next; + free(cur->m); + free(cur); + } + } + + if (mlog != NULL && mlog->m->seq < seq) { + mlog = mlog->next; + } +} + +struct msg *parse_packet_as_msg(packet *p) { + uint8_t msg_type = p->data[0]; + uint8_t req_message = p->data[1]; + uint32_t option_value = ntohl(((uint32_t *)p->data)[1]); + uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]); + uint32_t client_index = ntohl(((uint32_t *)p->data)[4]); + uint32_t view = ntohl(((uint32_t *)p->data)[5]); + uint32_t seq = ntohl(((uint32_t *)p->data)[6]); + uint32_t digest = ntohl(((uint32_t *)p->data)[7]); + uint32_t server_index = ntohl(((uint32_t *)p->data)[8]); + uint32_t signature = ntohl(((uint32_t *)p->data)[9]); + + struct msg *m = malloc(sizeof(struct msg)); + + m->msg_type = msg_type; + m->option_value = option_value; + m->timestamp = timestamp; + m->client_index = client_index; + m->req_message = req_message; + m->view = view; + m->seq = seq; + m->digest = digest; + m->server_index = server_index; + m->signature = signature; + return m; +} + +packet *parse_msg_as_packet(struct msg *m) { + uint32_t *data = malloc(sizeof(uint32_t) * 10); + data[0] = 0; + ((uint8_t *)data)[0] = m->msg_type; + ((uint8_t *)data)[1] = m->req_message; + + data[1] = htonl(m->option_value); + ((uint64_t *)data)[1] = htobe64(m->timestamp); + data[4] = htonl(m->client_index); + data[5] = htonl(m->view); + data[6] = htonl(m->seq); + data[7] = htonl(m->digest); + data[8] = htonl(m->server_index); + data[9] = htonl(m->signature); + + uint8_t len = sizeof(uint32_t) * 10; + + packet *p = malloc(sizeof(packet)); + + p->is_timed_out = false; + p->data = (uint8_t *)data; + p->len = len; + + return p; +} + +void clean_packet(packet *p) { + free(p->data); + free(p); +} + +void clean_msg(struct msg *m) { free(m); } + +struct msg *init_msg() { + struct msg *m = malloc(sizeof(struct msg)); + m->msg_type = 0; + + m->req_message = 0; + m->option_value = 0; + m->timestamp = 0; + m->client_index = 0; + + m->view = 0; + m->seq = 0; + m->digest = 0; + m->server_index = 0; + m->signature = 0; + + return m; +} + +void display_msg(struct msg *m) { + if (m->msg_type == 0) { + printf("Request: "); + if (m->req_message == 0) { + printf(" ADD %d [", m->option_value); + } else if (m->req_message == 1) { + printf(" SUB %d [", m->option_value); + } else if (m->req_message == 2) { + printf(" MUL %d [", m->option_value); + } else if (m->req_message == 3) { + printf(" GET ["); + } + + printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index); + } else if (m->msg_type == 1) { + printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq, + m->digest); + } else if (m->msg_type == 2) { + printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, + m->server_index, m->digest); + } else if (m->msg_type == 3) { + printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, + m->server_index, m->digest); + } else if (m->msg_type == 5) { + printf("Response: [ value %d ] \n", m->option_value); + } +} diff --git a/msg.h b/msg.h new file mode 100644 index 0000000..d39b8bd --- /dev/null +++ b/msg.h @@ -0,0 +1,61 @@ +#ifndef H_MSG +#define H_MSG + +#include "hosts.h" +#include +#include + +typedef struct { + bool is_timed_out; + uint8_t *data; // always in network byte order + uint8_t len; +} packet; + +// client msg requests: (ADD(x), SUB(x), MUL(x), GET) +// state += x, -= x, *= x, returns state + +struct msg { + uint8_t msg_type; // 0 = request, 1 = preprepare, 2 = prepare, 3 = commit, 4 = + // response + + // request fields + uint8_t req_message; // 0 = add, 1 = sub, 2 = mul, 3 = get + uint32_t option_value; // value for message. + uint64_t timestamp; + uint32_t client_index; + + // preprepare, prepare, commit, checkpoint fields + uint32_t view; + uint32_t seq; + uint32_t digest; + uint32_t server_index; + uint32_t signature; +}; + +struct msg_log { + struct msg_log *next; + struct msg *m; +}; + +void append_msg(struct msg *m); + +void discard_before_seq(uint32_t seq); + +void clean_packet(packet *p); + +struct msg *init_msg(); + +void clean_msg(struct msg *m); + +void display_msg(struct msg *m); + +struct msg *parse_packet_as_msg(packet *p); +packet *parse_msg_as_packet(struct msg *m); + +void broadcast(struct host **conns, int num_conns, packet *data); +void send_to(struct host *conn, packet *data); + +void send_to_replica(struct host **conns, int num_conns, int replica, + packet *data); + +#endif diff --git a/online.c b/online.c new file mode 100644 index 0000000..0f56066 --- /dev/null +++ b/online.c @@ -0,0 +1,250 @@ +#include "online.h" +#include "hosts.h" +#include "msg.h" +#include +#include +#include +#include +#include + +extern struct host *conns[1024]; +extern int num_conns; + +extern uint32_t curr_view; +extern uint32_t view_next_seq_num; + +extern uint32_t state_machine_state; + +extern struct msg_log *mlog; +extern struct msg_log *out_responses; + +extern uint32_t glob_replica_index; + +uint32_t small_digest(struct msg *request) { + char data[20] = {0}; + char *out = malloc(32 * sizeof(char)); + for (int i = 0; i < 32; i++) { + out[i] = 0; + } + + data[0] = request->req_message; + ((uint32_t *)data)[1] = request->option_value; + ((uint64_t *)data)[1] = request->timestamp; + + ((uint32_t *)data)[4] = request->client_index; + SHA256((unsigned char *)data, 20, (unsigned char *)out); + uint32_t x = ((uint32_t *)out)[0]; + + free(out); + + return x; +} + +void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey) { + struct msg *pre_prepare = init_msg(); + uint32_t d = small_digest(request); + pre_prepare->msg_type = 1; + pre_prepare->seq = view_next_seq_num; + pre_prepare->view = curr_view; + pre_prepare->digest = d; + + pre_prepare->signature = sign_msg(pre_prepare, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(pre_prepare)); + broadcast(conns, num_conns, parse_msg_as_packet(request)); + + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + struct msg *o = malloc(sizeof(struct msg)); + memcpy(o, request, sizeof(struct msg)); + out_msg->m = o; + out_msg->next = mlog; + mlog = out_msg; + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = pre_prepare; + out2->next = mlog; + mlog = out2; +} + +void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey) { + + struct msg *prepare = init_msg(); + prepare->msg_type = 2; + prepare->seq = preprepare->seq; + prepare->view = preprepare->view; + prepare->server_index = glob_replica_index; + prepare->digest = preprepare->digest; + prepare->signature = sign_msg(prepare, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(prepare)); + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = prepare; + out2->next = mlog; + mlog = out2; +} + +void handle_send_commit(struct msg *msg, struct msg *prepare, + struct sigpriv *privkey) { + + struct msg *commit = init_msg(); + commit->msg_type = 3; + commit->seq = prepare->seq; + commit->view = prepare->view; + commit->server_index = glob_replica_index; + commit->digest = small_digest(msg); + commit->signature = sign_msg(commit, privkey); + broadcast(conns, num_conns, parse_msg_as_packet(commit)); + + struct msg_log *out2 = malloc(sizeof(struct msg_log)); + out2->m = commit; + out2->next = mlog; + mlog = out2; +} + +void handle_operation(struct msg *msg, struct sigpriv *privkey) { + + struct msg *m = init_msg(); + m->msg_type = 5; + if (msg->req_message == 0) { + state_machine_state += msg->option_value; + + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 1) { + state_machine_state -= msg->option_value; + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 2) { + state_machine_state *= msg->option_value; + m->option_value = 1; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } else if (msg->req_message == 3) { + m->option_value = state_machine_state; + m->timestamp = msg->timestamp; + m->signature = sign_msg(m, privkey); + for (int i = 0; i < num_conns; i++) { + if ((uint32_t)conns[i]->replica_index == msg->client_index) { + send_to(conns[i], parse_msg_as_packet(m)); + } + } + if (msg->client_index == glob_replica_index) { + struct msg_log *out_msg = malloc(sizeof(struct msg_log)); + out_msg->m = m; + out_msg->next = out_responses; + out_responses = out_msg; + } + } + clean_msg(m); +} + +bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view, + uint32_t seq) { + bool is_request_in = false; + bool is_preprepare_in = false; + uint8_t num_prepares = 0; + + uint32_t digest_req = small_digest(request); + + struct msg_log *m; + + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 0 && request->msg_type == 0 && + m->m->req_message == request->req_message && + m->m->option_value == request->option_value && + m->m->timestamp == request->timestamp && + m->m->client_index == request->client_index) { + is_request_in = true; + } + + if (m->m->msg_type == 1 && m->m->view == view && m->m->seq == seq && + m->m->digest == digest_req) { + is_preprepare_in = true; + } + + if (m->m->msg_type == 2 && m->m->view == view && m->m->seq == seq && + m->m->digest == digest_req) { + num_prepares += 1; + } + } + + return (is_request_in && is_preprepare_in && (num_prepares >= 2 * f)); +} + +bool committed_local(struct msg_log *mlog, int f, struct msg *request, + uint32_t view, uint32_t seq) { + + bool is_prepared = prepared(mlog, f, request, view, seq); + + uint32_t cor_seq = request->seq; + request->seq = 0; + uint32_t dg = small_digest(request); + + request->seq = cor_seq; + + struct msg_log *m; + + uint8_t commits = 0; + + for (m = mlog; m != NULL; m = m->next) { + if (m->m->msg_type == 3 && m->m->view == view && m->m->digest == dg && + m->m->seq == request->seq) { + struct msg_log *k; + uint32_t factor = 1; + for (k = mlog; k != m; k = k->next) { + if (k->m->msg_type == 3 && k->m->view == view && k->m->digest == dg && + k->m->seq == request->seq && + k->m->server_index == m->m->server_index) { + factor = 0; + } + } + commits += 1 * factor; + } + } + + // printf("committed? %d %d %d \n", is_prepared, commits, 2 * f + 1); + + return (is_prepared && (commits >= 2 * f + 1)); +} + +uint32_t sign_msg(struct msg *_message, struct sigpriv *_key) { return 0; } + +bool validate_signature(struct msg *_message, struct sigkey *_privkey) { + return true; +} + +void *todo_key() { return NULL; } diff --git a/online.h b/online.h new file mode 100644 index 0000000..0c5f2d4 --- /dev/null +++ b/online.h @@ -0,0 +1,39 @@ + +#ifndef H_ONL +#define H_ONL + +#include "hosts.h" +#include "msg.h" + +struct sigkey { + uint32_t key; +}; + +struct sigpriv { + uint32_t key; +}; + +uint32_t small_digest(struct msg *request); + +void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey); + +void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey); + +void handle_send_commit(struct msg *msg, struct msg *prepare, + struct sigpriv *privkey); + +void handle_operation(struct msg *msg, struct sigpriv *privkey); + +bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view, + uint32_t seq); + +bool committed_local(struct msg_log *mlog, int f, struct msg *request, + uint32_t view, uint32_t seq); + +uint32_t sign_msg(struct msg *message, struct sigpriv *privkey); + +bool validate_signature(struct msg *message, struct sigkey *key); + +void *todo_key(); + +#endif diff --git a/parse.c b/parse.c new file mode 100644 index 0000000..4fa0071 --- /dev/null +++ b/parse.c @@ -0,0 +1,28 @@ +#include +#include +#include +#include + +extern char *optarg; +char *f_host; + +int parse(int argc, char **argv) { + char opt; + bool h; + + while ((opt = getopt(argc, argv, "h:")) != EOF) { + switch (opt) { + case 'h': + f_host = optarg; + h = true; + break; + } + } + + if (!h) { + printf("Please provide all arguments.\n"); + exit(1); + } + + return optind; +} diff --git a/parse.h b/parse.h new file mode 100644 index 0000000..b1c757d --- /dev/null +++ b/parse.h @@ -0,0 +1,6 @@ +#ifndef H_PARSE +#define H_PARSE + +int parse(int argc, char **argv); + +#endif diff --git a/test/hcfg1 b/test/hcfg1 new file mode 100644 index 0000000..9832f13 --- /dev/null +++ b/test/hcfg1 @@ -0,0 +1,3 @@ +0 +1 8000 127.0.0.1 8010 +2 8001 127.0.0.1 8020 diff --git a/test/hcfg2 b/test/hcfg2 new file mode 100644 index 0000000..3cffc58 --- /dev/null +++ b/test/hcfg2 @@ -0,0 +1,3 @@ +1 +0 8010 127.0.0.1 8000 +2 8011 127.0.0.1 8021 diff --git a/test/hcfg3 b/test/hcfg3 new file mode 100644 index 0000000..1dad058 --- /dev/null +++ b/test/hcfg3 @@ -0,0 +1,3 @@ +2 +0 8020 127.0.0.1 8001 +1 8021 127.0.0.1 8011 diff --git a/test2/hcfg1 b/test2/hcfg1 new file mode 100644 index 0000000..a32e876 --- /dev/null +++ b/test2/hcfg1 @@ -0,0 +1,2 @@ +0 +1 8001 127.0.0.1 8010 diff --git a/test2/hcfg2 b/test2/hcfg2 new file mode 100644 index 0000000..ac816e3 --- /dev/null +++ b/test2/hcfg2 @@ -0,0 +1,2 @@ +1 +0 8010 127.0.0.1 8001