first BFT impl

This commit is contained in:
h 2025-01-25 00:01:36 -05:00
commit bffe959713
20 changed files with 1158 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.o
.ccls-cache/
pbft

17
Makefile Normal file
View File

@ -0,0 +1,17 @@
CC=gcc
CFLAGS=-Wall -Wextra -ggdb -std=gnu99
SRC=client.c hosts.c parse.c online.c msg.c
OBJ=$(SRC:.c=.o)
LIBS=-lcrypto
all: client
.SUFFIXES: .c .o
.c.o:
$(CC) -c $(CFLAGS) $*.c
client: $(OBJ)
$(CC) -o pbft $(OBJ) $(LIBS)
clean:
rm -f *.o pbft

4
byz/hcfg1 Normal file
View File

@ -0,0 +1,4 @@
0
1 8000 127.0.0.1 8010
2 8001 127.0.0.1 8020
3 8002 127.0.0.1 8030

4
byz/hcfg2 Normal file
View File

@ -0,0 +1,4 @@
1
0 8010 127.0.0.1 8000
2 8011 127.0.0.1 8021
3 8012 127.0.0.1 8023

4
byz/hcfg3 Normal file
View File

@ -0,0 +1,4 @@
2
0 8020 127.0.0.1 8001
1 8021 127.0.0.1 8011
3 8022 127.0.0.1 8100

4
byz/hcfg4 Normal file
View File

@ -0,0 +1,4 @@
3
0 8030 127.0.0.1 8002
1 8023 127.0.0.1 8012
2 8100 127.0.0.1 8022

437
client.c Normal file
View File

@ -0,0 +1,437 @@
#include "hosts.h"
#include "msg.h"
#include "online.h"
#include "parse.h"
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
extern char *f_host; // parsing data for hosts.
extern struct host *conns[1024]; // connection data for each peer.
extern int num_conns; // number of connected peers.
extern int f; // maximum number of potentially faulty nodes.
extern char *glob_name; // my name (not necessary here).
extern uint32_t glob_replica_index; // my replica index.
extern struct msg_log *mlog;
struct msg_log *msg_reqs =
NULL; // message requests received from a pre-prepare.
extern struct msg_log *outstanding; // outstanding requests.
extern struct msg_log *out_responses; // outstanding responses.
uint32_t state_machine_state = 0; // initialize state machine state to 0.
uint32_t view_next_seq_num = 1;
uint32_t low_watermark = 0;
uint32_t high_watermark = 300;
uint32_t curr_view = 0;
uint32_t accepted_req_num = 0;
// Initialize host connections.
void initialize() {
init_hosts(f_host);
printf("\n\n\n --- CONNECTIONS --- \n\n");
for (int i = 0; i < num_conns; i++) {
printf("Connected with %s:%d at %d\n", conns[i]->name, conns[i]->c_port,
conns[i]->h_port);
}
}
// Start up peer.
int main(int argc, char **argv) {
srand(time(NULL));
parse(argc, argv);
printf("Got host file: %s\n", f_host);
initialize();
uint32_t request_num = 1;
// Take requests from stdin: we want to poll on stdin and incoming messages.
struct pollfd *pollfds = malloc(sizeof(struct pollfd) * ((num_conns) + 1));
int infd = fileno(stdin);
pollfds[0].fd = infd;
pollfds[0].events = POLLIN | POLLPRI;
for (int i = 0; i < num_conns; i++) {
pollfds[i + 1].fd = conns[i]->sockfd;
pollfds[i + 1].events = POLLIN | POLLPRI;
}
printf("%d > ", request_num);
fflush(stdout);
struct timespec start_time;
clock_gettime(CLOCK_REALTIME, &start_time);
int timeout = 30; // this should be fixed for a view change.
while (1) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (poll(pollfds, num_conns + 1, timeout) != 0) {
if (pollfds[0].revents > 0) {
// update timeouts.
// handle stdin request for state change.
char *str = NULL;
size_t len;
if (getline(&str, &len, stdin) != -1) {
bool proper_request = false;
struct msg *req = init_msg();
req->msg_type = 0; // request
if (strncmp("ADD", str, strlen("ADD")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 0;
req->option_value = x;
proper_request = true;
}
if (strncmp("SUB", str, strlen("SUB")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 1;
req->option_value = x;
proper_request = true;
}
if (strncmp("MUL", str, strlen("MUL")) == 0) {
char *argument = str + 4;
uint32_t x = atoi(argument);
req->req_message = 2;
req->option_value = x;
proper_request = true;
}
if (strncmp("GET", str, strlen("GET")) == 0) {
req->req_message = 3;
proper_request = true;
}
if (proper_request) {
struct timespec creq_timestamp;
clock_gettime(CLOCK_MONOTONIC, &creq_timestamp);
req->timestamp = creq_timestamp.tv_sec;
req->client_index = glob_replica_index;
packet *p = parse_msg_as_packet(req);
if (curr_view % (num_conns + 1) == glob_replica_index) {
// case: we are the primary.
handle_primary_recv_request(req, todo_key());
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = req;
out_msg->m->seq = view_next_seq_num;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
view_next_seq_num += 1;
} else {
// case: we are a client sending to primary.
send_to_replica(conns, num_conns, curr_view % (num_conns + 1), p);
}
clean_packet(p);
struct msg_log *ml = malloc(sizeof(struct msg_log));
struct msg *standing = malloc(sizeof(struct msg));
memcpy(standing, req, sizeof(struct msg));
standing->client_index = request_num;
ml->m = standing;
ml->m->client_index = request_num;
ml->next = outstanding;
outstanding = ml;
request_num += 1;
}
}
}
for (int i = 0; i < num_conns; i++) {
if (pollfds[i + 1].revents > 0) {
// handle message from peer at this point.
uint8_t msglen;
socklen_t addr_len = sizeof(conns[i]->conn);
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *new_data = malloc(msglen);
memcpy(new_data, msg_buf + 1, msglen);
free(msg_buf);
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->len = msglen;
p->data = new_data;
struct msg *message = parse_packet_as_msg(p);
clean_packet(p);
if (message->msg_type == 0) {
// printf("\nRequest\n");
// request for message coming from a client
// check that we are the primary
if (curr_view % (num_conns + 1) == glob_replica_index) {
handle_primary_recv_request(message, todo_key());
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->m->seq = view_next_seq_num;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
view_next_seq_num += 1;
}
} else if (message->msg_type == 1) {
// printf("\nPreprepare\n");
// pre-prepare message sent out
//
// 1. parse the pre-prepare message.
//
// 2. receive the piggybacked message.
//
// validate and accept pre-prepare message.
// receive the piggybacked message.
uint8_t msglen;
socklen_t addr_len = sizeof(conns[i]->conn);
recvfrom(conns[i]->sockfd, &msglen, sizeof(msglen), MSG_PEEK,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *msg_buf = malloc(msglen + sizeof(msglen));
recvfrom(conns[i]->sockfd, msg_buf, msglen + sizeof(msglen), 0,
(struct sockaddr *)&(conns[i]->conn), &addr_len);
uint8_t *new_data = malloc(msglen);
memcpy(new_data, msg_buf + 1, msglen);
free(msg_buf);
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->len = msglen;
p->data = new_data;
struct msg *piggyback = parse_packet_as_msg(p);
uint32_t d = small_digest(piggyback);
bool check1 = d == message->digest;
bool check2 = validate_signature(message, todo_key());
bool check3 = message->view == curr_view;
bool check4 = true;
struct msg_log *m;
for (m = mlog; m != NULL; m = m->next) {
if (m->m->msg_type == 1 && m->m->view == message->view &&
m->m->seq == message->seq &&
m->m->digest != message->digest) {
check4 = false;
}
}
bool check5 = (low_watermark <= message->seq &&
message->seq <= high_watermark);
if (check1 && check2 && check3 && check4 && check5) {
struct msg_log *ml = malloc(sizeof(struct msg_log));
ml->m = message;
ml->next = mlog;
struct msg_log *ml2 = malloc(sizeof(struct msg_log));
ml2->m = piggyback;
ml2->next = ml;
mlog = ml2;
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
struct msg *cloned_msg = malloc(sizeof(struct msg));
memcpy(cloned_msg, piggyback, sizeof(struct msg));
cloned_msg->seq = message->seq;
out_msg->m = cloned_msg;
out_msg->next = msg_reqs;
msg_reqs = out_msg;
// now that the message log has been updated, we should broadcast
// prepare messages.
handle_send_prepare(message, todo_key());
struct msg_log *out;
for (out = msg_reqs; out != NULL; out = out->next) {
if (prepared(mlog, f, out->m, message->view, message->seq)) {
handle_send_commit(out->m, message, todo_key());
}
}
}
} else if (message->msg_type == 2) {
// printf("\nPrepare\n");
// receive a prepare message.
// 1. validate the prepare message.
// 2. check if this prepare message completes the `prepared` for any
// currently waiting message.
// if it does, send out a commit!
if (message->view == curr_view && low_watermark <= message->seq &&
message->seq <= high_watermark &&
validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = mlog;
mlog = out_msg;
struct msg_log *out;
for (out = msg_reqs; out != NULL; out = out->next) {
if (prepared(mlog, f, out->m, message->view, message->seq)) {
handle_send_commit(out->m, message, todo_key());
}
}
}
} else if (message->msg_type == 3) {
// printf("\nCommit\n");
// someone committed.
// validate committed message.
// struct msg_log *loop;
// for (loop = mlog; loop != NULL; loop = loop->next) {
// display_msg(loop->m);
// fflush(stdout);
// }
if (message->view == curr_view && low_watermark <= message->seq &&
message->seq <= high_watermark &&
validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = mlog;
mlog = out_msg;
struct msg_log *out;
uint32_t lowest_seq = UINT32_MAX;
struct msg_log *prev_to_curr;
struct msg_log *curr;
struct msg *curr_handle;
bool valid = false;
for (out = msg_reqs; out != NULL; out = out->next) {
if (out->next != NULL &&
committed_local(mlog, f, out->next->m, message->view,
message->seq)) {
if (out->next->m->seq < lowest_seq) {
lowest_seq = out->next->m->seq;
curr_handle = out->next->m;
prev_to_curr = out;
curr = out->next;
valid = true;
}
// [HERE] check if all reqs w lower sequence numbers have been
// established.
}
}
if (msg_reqs != NULL &&
committed_local(mlog, f, msg_reqs->m, message->view,
message->seq)) {
if (msg_reqs->m->seq < lowest_seq) {
lowest_seq = msg_reqs->m->seq;
curr_handle = msg_reqs->m;
prev_to_curr = NULL;
curr = msg_reqs;
valid = true;
}
}
if (valid == true && prev_to_curr != NULL && curr != NULL) {
printf("\nTest1\n");
prev_to_curr->next = curr->next;
printf("\nTest2\n");
} else {
if (valid == true && msg_reqs != NULL) {
msg_reqs = msg_reqs->next;
}
}
if (valid) {
handle_operation(curr_handle, todo_key());
}
}
} else if (message->msg_type == 5) {
// printf("\nResponse\n");
if (validate_signature(message, todo_key())) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = message;
out_msg->next = out_responses;
out_responses = out_msg;
}
struct msg_log *out;
uint32_t resp = message->option_value;
uint8_t count = 1;
for (out = out_responses; out != NULL; out = out->next) {
if (out->m->option_value == resp &&
out->m->timestamp == message->timestamp) {
count += 1;
}
}
uint32_t req_num = 0;
if (count > f) {
struct msg_log *out;
struct msg_log *curr;
for (out = outstanding; out != NULL; out = out->next) {
curr = out->next;
if (curr != NULL && curr->m->timestamp == message->timestamp) {
out->next = curr->next;
req_num = curr->m->client_index;
clean_msg(curr->m);
free(curr);
}
}
if (outstanding != NULL &&
outstanding->m->timestamp == message->timestamp) {
req_num = outstanding->m->client_index;
outstanding = outstanding->next;
}
if (req_num > accepted_req_num) {
struct msg_log *loop;
// for (loop = mlog; loop != NULL; loop = loop->next) {
// display_msg(loop->m);
// fflush(stdout);
// }
printf("\nRESP [%d] > %d \n", req_num, resp);
fflush(stdout);
printf("%d > ", request_num);
fflush(stdout);
accepted_req_num = req_num;
}
}
}
}
}
} else {
// handle view change.
}
}
// stdin msg -> forward to primary (check if it is us), start online phase
// other messages: keep message log, and validate as we proceed
}

94
hosts.c Normal file
View File

@ -0,0 +1,94 @@
#include "hosts.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
struct host *conns[1024];
int num_conns = 0;
int f = 0;
char *glob_name;
uint32_t glob_replica_index = 0;
int init_conn(int h_port) {
int sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_fd < 0) {
printf("Socket initialization failed.");
exit(1);
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(h_port);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
printf("Failed to bind socket.\n");
exit(1);
}
return sock_fd;
}
void init_hosts(char *file) {
char *line = NULL;
size_t len = 0;
ssize_t read;
FILE *fp = fopen(file, "r");
if (fp == NULL) {
printf("Host file not found.\n");
exit(1);
}
int name = 1;
while ((read = getline(&line, &len, fp)) != -1) {
if (name == 1) {
glob_name = malloc(sizeof(char) * (strlen(line) + 1));
strcpy(glob_name, line);
glob_name[strcspn(glob_name, "\n")] = 0;
glob_replica_index = atoi(glob_name);
printf("Name: %s", glob_name);
name = 0;
} else {
char *cname = malloc(200 * sizeof(char));
char *addr = malloc(32 * sizeof(char));
int c_port;
int h_port;
int ret = sscanf(line, "%200s %d %32s %d", cname, &h_port, addr, &c_port);
if (ret != 4) {
printf("Could not parse host file.\n");
exit(1);
}
struct host *h = malloc(sizeof(struct host));
h->name = cname;
h->replica_index = atoi(cname);
h->h_port = h_port;
h->c_port = c_port;
h->sockfd = init_conn(h_port);
struct sockaddr_in conn_addr;
memset(&conn_addr, 0, sizeof(conn_addr));
conn_addr.sin_family = AF_INET;
conn_addr.sin_port = htons(c_port);
conn_addr.sin_addr.s_addr = inet_addr(addr);
free(addr);
h->conn = conn_addr;
conns[num_conns] = h;
num_conns += 1;
}
}
f = (num_conns) / 3;
}

17
hosts.h Normal file
View File

@ -0,0 +1,17 @@
#ifndef H_HOST
#define H_HOST
#include <netinet/in.h>
struct host {
char *name;
int replica_index;
int sockfd;
struct sockaddr_in conn;
int h_port;
int c_port;
};
void init_hosts(char *file);
#endif

177
msg.c Normal file
View File

@ -0,0 +1,177 @@
#include "msg.h"
#include "hosts.h"
#include <poll.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct msg_log *mlog = NULL;
struct msg_log *outstanding = NULL;
struct msg_log *out_responses = NULL;
void broadcast(struct host **conns, int num_conns, packet *data) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
for (int i = 0; i < num_conns; i++) {
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
}
free(transmit);
}
void send_to(struct host *conn, packet *data) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
sendto(conn->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conn->conn, sizeof(conn->conn));
free(transmit);
}
void send_to_replica(struct host **conns, int num_conns, int replica,
packet *data) {
for (int i = 0; i < num_conns; i++) {
if (replica == conns[i]->replica_index) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
free(transmit);
}
}
}
void append_msg(struct msg *m) {
struct msg_log *cur = malloc(sizeof(struct msg_log));
cur->m = m;
cur->next = mlog;
}
void discard_before_seq(uint32_t seq) {
struct msg_log *old;
struct msg_log *cur;
for (old = mlog; old != NULL; old = old->next) {
cur = old->next;
if (cur != NULL && cur->m->seq < seq) {
old->next = cur->next;
free(cur->m);
free(cur);
}
}
if (mlog != NULL && mlog->m->seq < seq) {
mlog = mlog->next;
}
}
struct msg *parse_packet_as_msg(packet *p) {
uint8_t msg_type = p->data[0];
uint8_t req_message = p->data[1];
uint32_t option_value = ntohl(((uint32_t *)p->data)[1]);
uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]);
uint32_t client_index = ntohl(((uint32_t *)p->data)[4]);
uint32_t view = ntohl(((uint32_t *)p->data)[5]);
uint32_t seq = ntohl(((uint32_t *)p->data)[6]);
uint32_t digest = ntohl(((uint32_t *)p->data)[7]);
uint32_t server_index = ntohl(((uint32_t *)p->data)[8]);
uint32_t signature = ntohl(((uint32_t *)p->data)[9]);
struct msg *m = malloc(sizeof(struct msg));
m->msg_type = msg_type;
m->option_value = option_value;
m->timestamp = timestamp;
m->client_index = client_index;
m->req_message = req_message;
m->view = view;
m->seq = seq;
m->digest = digest;
m->server_index = server_index;
m->signature = signature;
return m;
}
packet *parse_msg_as_packet(struct msg *m) {
uint32_t *data = malloc(sizeof(uint32_t) * 10);
data[0] = 0;
((uint8_t *)data)[0] = m->msg_type;
((uint8_t *)data)[1] = m->req_message;
data[1] = htonl(m->option_value);
((uint64_t *)data)[1] = htobe64(m->timestamp);
data[4] = htonl(m->client_index);
data[5] = htonl(m->view);
data[6] = htonl(m->seq);
data[7] = htonl(m->digest);
data[8] = htonl(m->server_index);
data[9] = htonl(m->signature);
uint8_t len = sizeof(uint32_t) * 10;
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->data = (uint8_t *)data;
p->len = len;
return p;
}
void clean_packet(packet *p) {
free(p->data);
free(p);
}
void clean_msg(struct msg *m) { free(m); }
struct msg *init_msg() {
struct msg *m = malloc(sizeof(struct msg));
m->msg_type = 0;
m->req_message = 0;
m->option_value = 0;
m->timestamp = 0;
m->client_index = 0;
m->view = 0;
m->seq = 0;
m->digest = 0;
m->server_index = 0;
m->signature = 0;
return m;
}
void display_msg(struct msg *m) {
if (m->msg_type == 0) {
printf("Request: ");
if (m->req_message == 0) {
printf(" ADD %d [", m->option_value);
} else if (m->req_message == 1) {
printf(" SUB %d [", m->option_value);
} else if (m->req_message == 2) {
printf(" MUL %d [", m->option_value);
} else if (m->req_message == 3) {
printf(" GET [");
}
printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index);
} else if (m->msg_type == 1) {
printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq,
m->digest);
} else if (m->msg_type == 2) {
printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
m->server_index, m->digest);
} else if (m->msg_type == 3) {
printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
m->server_index, m->digest);
} else if (m->msg_type == 5) {
printf("Response: [ value %d ] \n", m->option_value);
}
}

61
msg.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef H_MSG
#define H_MSG
#include "hosts.h"
#include <stdbool.h>
#include <stdint.h>
typedef struct {
bool is_timed_out;
uint8_t *data; // always in network byte order
uint8_t len;
} packet;
// client msg requests: (ADD(x), SUB(x), MUL(x), GET)
// state += x, -= x, *= x, returns state
struct msg {
uint8_t msg_type; // 0 = request, 1 = preprepare, 2 = prepare, 3 = commit, 4 =
// response
// request fields
uint8_t req_message; // 0 = add, 1 = sub, 2 = mul, 3 = get
uint32_t option_value; // value for message.
uint64_t timestamp;
uint32_t client_index;
// preprepare, prepare, commit, checkpoint fields
uint32_t view;
uint32_t seq;
uint32_t digest;
uint32_t server_index;
uint32_t signature;
};
struct msg_log {
struct msg_log *next;
struct msg *m;
};
void append_msg(struct msg *m);
void discard_before_seq(uint32_t seq);
void clean_packet(packet *p);
struct msg *init_msg();
void clean_msg(struct msg *m);
void display_msg(struct msg *m);
struct msg *parse_packet_as_msg(packet *p);
packet *parse_msg_as_packet(struct msg *m);
void broadcast(struct host **conns, int num_conns, packet *data);
void send_to(struct host *conn, packet *data);
void send_to_replica(struct host **conns, int num_conns, int replica,
packet *data);
#endif

250
online.c Normal file
View File

@ -0,0 +1,250 @@
#include "online.h"
#include "hosts.h"
#include "msg.h"
#include <assert.h>
#include <openssl/sha.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
extern struct host *conns[1024];
extern int num_conns;
extern uint32_t curr_view;
extern uint32_t view_next_seq_num;
extern uint32_t state_machine_state;
extern struct msg_log *mlog;
extern struct msg_log *out_responses;
extern uint32_t glob_replica_index;
uint32_t small_digest(struct msg *request) {
char data[20] = {0};
char *out = malloc(32 * sizeof(char));
for (int i = 0; i < 32; i++) {
out[i] = 0;
}
data[0] = request->req_message;
((uint32_t *)data)[1] = request->option_value;
((uint64_t *)data)[1] = request->timestamp;
((uint32_t *)data)[4] = request->client_index;
SHA256((unsigned char *)data, 20, (unsigned char *)out);
uint32_t x = ((uint32_t *)out)[0];
free(out);
return x;
}
void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey) {
struct msg *pre_prepare = init_msg();
uint32_t d = small_digest(request);
pre_prepare->msg_type = 1;
pre_prepare->seq = view_next_seq_num;
pre_prepare->view = curr_view;
pre_prepare->digest = d;
pre_prepare->signature = sign_msg(pre_prepare, privkey);
broadcast(conns, num_conns, parse_msg_as_packet(pre_prepare));
broadcast(conns, num_conns, parse_msg_as_packet(request));
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
struct msg *o = malloc(sizeof(struct msg));
memcpy(o, request, sizeof(struct msg));
out_msg->m = o;
out_msg->next = mlog;
mlog = out_msg;
struct msg_log *out2 = malloc(sizeof(struct msg_log));
out2->m = pre_prepare;
out2->next = mlog;
mlog = out2;
}
void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey) {
struct msg *prepare = init_msg();
prepare->msg_type = 2;
prepare->seq = preprepare->seq;
prepare->view = preprepare->view;
prepare->server_index = glob_replica_index;
prepare->digest = preprepare->digest;
prepare->signature = sign_msg(prepare, privkey);
broadcast(conns, num_conns, parse_msg_as_packet(prepare));
struct msg_log *out2 = malloc(sizeof(struct msg_log));
out2->m = prepare;
out2->next = mlog;
mlog = out2;
}
void handle_send_commit(struct msg *msg, struct msg *prepare,
struct sigpriv *privkey) {
struct msg *commit = init_msg();
commit->msg_type = 3;
commit->seq = prepare->seq;
commit->view = prepare->view;
commit->server_index = glob_replica_index;
commit->digest = small_digest(msg);
commit->signature = sign_msg(commit, privkey);
broadcast(conns, num_conns, parse_msg_as_packet(commit));
struct msg_log *out2 = malloc(sizeof(struct msg_log));
out2->m = commit;
out2->next = mlog;
mlog = out2;
}
void handle_operation(struct msg *msg, struct sigpriv *privkey) {
struct msg *m = init_msg();
m->msg_type = 5;
if (msg->req_message == 0) {
state_machine_state += msg->option_value;
m->option_value = 1;
m->timestamp = msg->timestamp;
m->signature = sign_msg(m, privkey);
for (int i = 0; i < num_conns; i++) {
if ((uint32_t)conns[i]->replica_index == msg->client_index) {
send_to(conns[i], parse_msg_as_packet(m));
}
}
if (msg->client_index == glob_replica_index) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = m;
out_msg->next = out_responses;
out_responses = out_msg;
}
} else if (msg->req_message == 1) {
state_machine_state -= msg->option_value;
m->option_value = 1;
m->timestamp = msg->timestamp;
m->signature = sign_msg(m, privkey);
for (int i = 0; i < num_conns; i++) {
if ((uint32_t)conns[i]->replica_index == msg->client_index) {
send_to(conns[i], parse_msg_as_packet(m));
}
}
if (msg->client_index == glob_replica_index) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = m;
out_msg->next = out_responses;
out_responses = out_msg;
}
} else if (msg->req_message == 2) {
state_machine_state *= msg->option_value;
m->option_value = 1;
m->timestamp = msg->timestamp;
m->signature = sign_msg(m, privkey);
for (int i = 0; i < num_conns; i++) {
if ((uint32_t)conns[i]->replica_index == msg->client_index) {
send_to(conns[i], parse_msg_as_packet(m));
}
}
if (msg->client_index == glob_replica_index) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = m;
out_msg->next = out_responses;
out_responses = out_msg;
}
} else if (msg->req_message == 3) {
m->option_value = state_machine_state;
m->timestamp = msg->timestamp;
m->signature = sign_msg(m, privkey);
for (int i = 0; i < num_conns; i++) {
if ((uint32_t)conns[i]->replica_index == msg->client_index) {
send_to(conns[i], parse_msg_as_packet(m));
}
}
if (msg->client_index == glob_replica_index) {
struct msg_log *out_msg = malloc(sizeof(struct msg_log));
out_msg->m = m;
out_msg->next = out_responses;
out_responses = out_msg;
}
}
clean_msg(m);
}
bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view,
uint32_t seq) {
bool is_request_in = false;
bool is_preprepare_in = false;
uint8_t num_prepares = 0;
uint32_t digest_req = small_digest(request);
struct msg_log *m;
for (m = mlog; m != NULL; m = m->next) {
if (m->m->msg_type == 0 && request->msg_type == 0 &&
m->m->req_message == request->req_message &&
m->m->option_value == request->option_value &&
m->m->timestamp == request->timestamp &&
m->m->client_index == request->client_index) {
is_request_in = true;
}
if (m->m->msg_type == 1 && m->m->view == view && m->m->seq == seq &&
m->m->digest == digest_req) {
is_preprepare_in = true;
}
if (m->m->msg_type == 2 && m->m->view == view && m->m->seq == seq &&
m->m->digest == digest_req) {
num_prepares += 1;
}
}
return (is_request_in && is_preprepare_in && (num_prepares >= 2 * f));
}
bool committed_local(struct msg_log *mlog, int f, struct msg *request,
uint32_t view, uint32_t seq) {
bool is_prepared = prepared(mlog, f, request, view, seq);
uint32_t cor_seq = request->seq;
request->seq = 0;
uint32_t dg = small_digest(request);
request->seq = cor_seq;
struct msg_log *m;
uint8_t commits = 0;
for (m = mlog; m != NULL; m = m->next) {
if (m->m->msg_type == 3 && m->m->view == view && m->m->digest == dg &&
m->m->seq == request->seq) {
struct msg_log *k;
uint32_t factor = 1;
for (k = mlog; k != m; k = k->next) {
if (k->m->msg_type == 3 && k->m->view == view && k->m->digest == dg &&
k->m->seq == request->seq &&
k->m->server_index == m->m->server_index) {
factor = 0;
}
}
commits += 1 * factor;
}
}
// printf("committed? %d %d %d \n", is_prepared, commits, 2 * f + 1);
return (is_prepared && (commits >= 2 * f + 1));
}
uint32_t sign_msg(struct msg *_message, struct sigpriv *_key) { return 0; }
bool validate_signature(struct msg *_message, struct sigkey *_privkey) {
return true;
}
void *todo_key() { return NULL; }

39
online.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef H_ONL
#define H_ONL
#include "hosts.h"
#include "msg.h"
struct sigkey {
uint32_t key;
};
struct sigpriv {
uint32_t key;
};
uint32_t small_digest(struct msg *request);
void handle_primary_recv_request(struct msg *request, struct sigpriv *privkey);
void handle_send_prepare(struct msg *preprepare, struct sigpriv *privkey);
void handle_send_commit(struct msg *msg, struct msg *prepare,
struct sigpriv *privkey);
void handle_operation(struct msg *msg, struct sigpriv *privkey);
bool prepared(struct msg_log *mlog, int f, struct msg *request, uint32_t view,
uint32_t seq);
bool committed_local(struct msg_log *mlog, int f, struct msg *request,
uint32_t view, uint32_t seq);
uint32_t sign_msg(struct msg *message, struct sigpriv *privkey);
bool validate_signature(struct msg *message, struct sigkey *key);
void *todo_key();
#endif

28
parse.c Normal file
View File

@ -0,0 +1,28 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
extern char *optarg;
char *f_host;
int parse(int argc, char **argv) {
char opt;
bool h;
while ((opt = getopt(argc, argv, "h:")) != EOF) {
switch (opt) {
case 'h':
f_host = optarg;
h = true;
break;
}
}
if (!h) {
printf("Please provide all arguments.\n");
exit(1);
}
return optind;
}

6
parse.h Normal file
View File

@ -0,0 +1,6 @@
#ifndef H_PARSE
#define H_PARSE
int parse(int argc, char **argv);
#endif

3
test/hcfg1 Normal file
View File

@ -0,0 +1,3 @@
0
1 8000 127.0.0.1 8010
2 8001 127.0.0.1 8020

3
test/hcfg2 Normal file
View File

@ -0,0 +1,3 @@
1
0 8010 127.0.0.1 8000
2 8011 127.0.0.1 8021

3
test/hcfg3 Normal file
View File

@ -0,0 +1,3 @@
2
0 8020 127.0.0.1 8001
1 8021 127.0.0.1 8011

2
test2/hcfg1 Normal file
View File

@ -0,0 +1,2 @@
0
1 8001 127.0.0.1 8010

2
test2/hcfg2 Normal file
View File

@ -0,0 +1,2 @@
1
0 8010 127.0.0.1 8001