pbft/client.c

438 lines
15 KiB
C
Raw Permalink Normal View History

2025-01-25 00:01:36 -05:00
#include "hosts.h"
#include "msg.h"
#include "online.h"
#include "parse.h"
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
extern char *f_host; // parsing data for hosts.
extern struct host *conns[1024]; // connection data for each peer.
extern int num_conns; // number of connected peers.
extern int f; // maximum number of potentially faulty nodes.
extern char *glob_name; // my name (not necessary here).
extern uint32_t glob_replica_index; // my replica index.
extern struct msg_log *mlog;
struct msg_log *msg_reqs =
NULL; // message requests received from a pre-prepare.
extern struct msg_log *outstanding; // outstanding requests.
extern struct msg_log *out_responses; // outstanding responses.
uint32_t state_machine_state = 0; // initialize state machine state to 0.
uint32_t view_next_seq_num = 1;
uint32_t low_watermark = 0;
uint32_t high_watermark = 300;
uint32_t curr_view = 0;
uint32_t accepted_req_num = 0;
// Initialize host connections.
void initialize() {
init_hosts(f_host);
printf("\n\n\n --- CONNECTIONS --- \n\n");
for (int i = 0; i < num_conns; i++) {
printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port,
conns[i]->h_port);
}
}
// Start up peer.
int main(int argc, char **argv) {
srand(time(NULL));
parse(argc, argv);
printf("Got host file: %s\n", f_host);
initialize();
uint32_t request_num = 1;
// Take requests from stdin: we want to poll on stdin and incoming messages.
struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1));
int infd = fileno(stdin);
pollfds[0].fd = infd;
pollfds[0].events = POLLIN | POLLPRI;
for (int i = 0; i < num_conns; i++) {
pollfds[i + 1].fd = conns[i]->sockfd;
pollfds[i + 1].events = POLLIN | POLLPRI;
}
printf("%d > ", request_num);
fflush(stdout);
struct timespec start_time;
clock_gettime(CLOCK_REALTIME, &start_time);
int timeout = 30; // this should be fixed for a view change.
while (1) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (poll(pollfds, num_conns + 1, timeout) != 0) {
if (pollfds[0].revents > 0) {
// update timeouts.
// handle stdin request for state change.
char *str = NULL;
size_t len;
if (getline(&str, &len, stdin) != -1) {
bool proper_request = false;
struct msg *req = init_msg();
req->msg_type = 0; // request
if (strncmp("ADD", str, strlen("ADD")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 0;
req->option_value = x;
proper_request = true;
}
if (strncmp("SUB", str, strlen("SUB")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 1;
req->option_value = x;
proper_request = true;
}
if (strncmp("MUL", str, strlen("MUL")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 2;
req->option_value = x;
proper_request = true;
}
if (strncmp("GET", str, strlen("GET")) == 0) {
req->req_message = 3;
proper_request = true;
}
if (proper_request) {
struct timespec creq_timestamp;
clock_gettime(CLOCK_MONOTONIC, &creq_timestamp);
req->timestamp = creq_timestamp.tv_sec;
req->client_index = glob_replica_index;
packet *p = parse_msg_as_packet(req);
if (curr_view % (num_conns + 1) == glob_replica_index) {
// case: we are the primary.
handle_primary_recv_request(req, todo_key());
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = req;
out_msg->m->seq = view_next_seq_num;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
view_next_seq_num += 1;
} else {
// case: we are a client sending to primary.
send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p);
}
clean_packet(p);
struct msg_log *ml = malloc(sizeof(struct msg_log));
struct msg *standing = malloc(sizeof(struct msg));
memcpy(standing, req, sizeof(struct msg));
standing->client_index = request_num;
ml->m = standing;
ml->m->client_index = request_num;
ml->next = outstanding;
outstanding = ml;
request_num += 1;
}
}
}
for (int i = 0; i < num_conns; i++) {
if (pollfds[i + 1].revents > 0) {
// handle message from peer at this point.
uint8_t msglen;
socklen_t addr_len = sizeof(conns[i]->conn);
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *new_data = malloc(msglen);
memcpy(new_data, msg_buf + 1, msglen);
free(msg_buf);
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->len = msglen;
p->data = new_data;
struct msg *message = parse_packet_as_msg(p);
clean_packet(p);
if (message->msg_type == 0) {
// printf("\nRequest\n");
// request for message coming from a client
// check that we are the primary
if (curr_view % (num_conns + 1) == glob_replica_index) {
handle_primary_recv_request(message, todo_key());
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->m->seq = view_next_seq_num;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
view_next_seq_num += 1;
}
} else if (message->msg_type == 1) {
// printf("\nPreprepare\n");
// pre-prepare message sent out
//
// 1. parse the pre-prepare message.
//
// 2. receive the piggybacked message.
//
// validate and accept pre-prepare message.
// receive the piggybacked message.
uint8_t msglen;
socklen_t addr_len = sizeof(conns[i]->conn);
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *new_data = malloc(msglen);
memcpy(new_data, msg_buf + 1, msglen);
free(msg_buf);
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->len = msglen;
p->data = new_data;
struct msg *piggyback = parse_packet_as_msg(p);
uint32_t d = small_digest(piggyback);
bool check1 = d == message->digest;
bool check2 = validate_signature(message, todo_key());
bool check3 = message->view == curr_view;
bool check4 = true;
struct msg_log *m;
for (m = mlog; m != NULL; m = m->next) {
if (m->m->msg_type == 1 && m->m->view == message->view &&
m->m->seq == message->seq &&
m->m->digest != message->digest) {
check4 = false;
}
}
bool check5 = (low_watermark <= message->seq &&
message->seq <= high_watermark);
if (check1 && check2 && check3 && check4 && check5) {
struct msg_log *ml = malloc(sizeof(struct msg_log));
ml->m = message;
ml->next = mlog;
struct msg_log *ml2 = malloc(sizeof(struct msg_log));
ml2->m = piggyback;
ml2->next = ml;
mlog = ml2;
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
struct msg *cloned_msg = malloc(sizeof(struct msg));
memcpy(cloned_msg, piggyback, sizeof(struct msg));
cloned_msg->seq = message->seq;
out_msg->m = cloned_msg;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
// now that the message log has been updated, we should broadcast
// prepare messages.
handle_send_prepare(message, todo_key());
struct msg_log *out;
for (out = msg_reqs; out != NULL; out = out->next) {
if (prepared(mlog, f, out->m, message->view, message->seq)) {
handle_send_commit(out->m, message, todo_key());
}
}
}
} else if (message->msg_type == 2) {
// printf("\nPrepare\n");
// receive a prepare message.
// 1. validate the prepare message.
// 2. check if this prepare message completes the `prepared` for any
// currently waiting message.
// if it does, send out a commit!
if (message->view == curr_view && low_watermark <= message->seq &&
message->seq <= high_watermark &&
validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = mlog;
mlog = out_msg;
struct msg_log *out;
for (out = msg_reqs; out != NULL; out = out->next) {
if (prepared(mlog, f, out->m, message->view, message->seq)) {
handle_send_commit(out->m, message, todo_key());
}
}
}
} else if (message->msg_type == 3) {
// printf("\nCommit\n");
// someone committed.
// validate committed message.
// struct msg_log *loop;
// for (loop = mlog; loop != NULL; loop = loop->next) {
// display_msg(loop->m);
// fflush(stdout);
// }
if (message->view == curr_view && low_watermark <= message->seq &&
message->seq <= high_watermark &&
validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = mlog;
mlog = out_msg;
struct msg_log *out;
uint32_t lowest_seq = UINT32_MAX;
struct msg_log *prev_to_curr;
struct msg_log *curr;
struct msg *curr_handle;
bool valid = false;
for (out = msg_reqs; out != NULL; out = out->next) {
if (out->next != NULL &&
committed_local(mlog, f, out->next->m, message->view,
message->seq)) {
if (out->next->m->seq < lowest_seq) {
lowest_seq = out->next->m->seq;
curr_handle = out->next->m;
prev_to_curr = out;
curr = out->next;
valid = true;
}
// [HERE] check if all reqs w lower sequence numbers have been
// established.
}
}
if (msg_reqs != NULL &&
committed_local(mlog, f, msg_reqs->m, message->view,
message->seq)) {
if (msg_reqs->m->seq < lowest_seq) {
lowest_seq = msg_reqs->m->seq;
curr_handle = msg_reqs->m;
prev_to_curr = NULL;
curr = msg_reqs;
valid = true;
}
}
if (valid == true && prev_to_curr != NULL && curr != NULL) {
printf("\nTest1\n");
prev_to_curr->next = curr->next;
printf("\nTest2\n");
} else {
if (valid == true && msg_reqs != NULL) {
msg_reqs = msg_reqs->next;
}
}
if (valid) {
handle_operation(curr_handle, todo_key());
}
}
} else if (message->msg_type == 5) {
// printf("\nResponse\n");
if (validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = out_responses;
out_responses = out_msg;
}
struct msg_log *out;
uint32_t resp = message->option_value;
uint8_t count = 1;
for (out = out_responses; out != NULL; out = out->next) {
if (out->m->option_value == resp &&
out->m->timestamp == message->timestamp) {
count += 1;
}
}
uint32_t req_num = 0;
if (count > f) {
struct msg_log *out;
struct msg_log *curr;
for (out = outstanding; out != NULL; out = out->next) {
curr = out->next;
if (curr != NULL && curr->m->timestamp == message->timestamp) {
out->next = curr->next;
req_num = curr->m->client_index;
clean_msg(curr->m);
free(curr);
}
}
if (outstanding != NULL &&
outstanding->m->timestamp == message->timestamp) {
req_num = outstanding->m->client_index;
outstanding = outstanding->next;
}
if (req_num > accepted_req_num) {
struct msg_log *loop;
// for (loop = mlog; loop != NULL; loop = loop->next) {
// display_msg(loop->m);
// fflush(stdout);
// }
printf("\nRESP [%d] > %d \n", req_num, resp);
fflush(stdout);
printf("%d > ", request_num);
fflush(stdout);
accepted_req_num = req_num;
}
}
}
}
}
} else {
// handle view change.
}
}
// stdin msg -> forward to primary (check if it is us), start online phase
// other messages: keep message log, and validate as we proceed
}