pbft/msg.c

178 lines
4.8 KiB
C
Raw Permalink Normal View History

2025-01-25 00:01:36 -05:00
#include "msg.h"
#include "hosts.h"
#include <poll.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct msg_log *mlog = NULL;
struct msg_log *outstanding = NULL;
struct msg_log *out_responses = NULL;
void broadcast(struct host **conns, int num_conns, packet *data) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
for (int i = 0; i < num_conns; i++) {
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
}
free(transmit);
}
void send_to(struct host *conn, packet *data) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
sendto(conn->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conn->conn, sizeof(conn->conn));
free(transmit);
}
void send_to_replica(struct host **conns, int num_conns, int replica,
packet *data) {
for (int i = 0; i < num_conns; i++) {
if (replica == conns[i]->replica_index) {
char *transmit = malloc(sizeof(char) * data->len + 1);
transmit[0] = (char)data->len;
memcpy((transmit + 1), data->data, data->len);
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
free(transmit);
}
}
}
void append_msg(struct msg *m) {
struct msg_log *cur = malloc(sizeof(struct msg_log));
cur->m = m;
cur->next = mlog;
}
void discard_before_seq(uint32_t seq) {
struct msg_log *old;
struct msg_log *cur;
for (old = mlog; old != NULL; old = old->next) {
cur = old->next;
if (cur != NULL && cur->m->seq < seq) {
old->next = cur->next;
free(cur->m);
free(cur);
}
}
if (mlog != NULL && mlog->m->seq < seq) {
mlog = mlog->next;
}
}
struct msg *parse_packet_as_msg(packet *p) {
uint8_t msg_type = p->data[0];
uint8_t req_message = p->data[1];
uint32_t option_value = ntohl(((uint32_t *)p->data)[1]);
uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]);
uint32_t client_index = ntohl(((uint32_t *)p->data)[4]);
uint32_t view = ntohl(((uint32_t *)p->data)[5]);
uint32_t seq = ntohl(((uint32_t *)p->data)[6]);
uint32_t digest = ntohl(((uint32_t *)p->data)[7]);
uint32_t server_index = ntohl(((uint32_t *)p->data)[8]);
uint32_t signature = ntohl(((uint32_t *)p->data)[9]);
struct msg *m = malloc(sizeof(struct msg));
m->msg_type = msg_type;
m->option_value = option_value;
m->timestamp = timestamp;
m->client_index = client_index;
m->req_message = req_message;
m->view = view;
m->seq = seq;
m->digest = digest;
m->server_index = server_index;
m->signature = signature;
return m;
}
packet *parse_msg_as_packet(struct msg *m) {
uint32_t *data = malloc(sizeof(uint32_t) * 10);
data[0] = 0;
((uint8_t *)data)[0] = m->msg_type;
((uint8_t *)data)[1] = m->req_message;
data[1] = htonl(m->option_value);
((uint64_t *)data)[1] = htobe64(m->timestamp);
data[4] = htonl(m->client_index);
data[5] = htonl(m->view);
data[6] = htonl(m->seq);
data[7] = htonl(m->digest);
data[8] = htonl(m->server_index);
data[9] = htonl(m->signature);
uint8_t len = sizeof(uint32_t) * 10;
packet *p = malloc(sizeof(packet));
p->is_timed_out = false;
p->data = (uint8_t *)data;
p->len = len;
return p;
}
void clean_packet(packet *p) {
free(p->data);
free(p);
}
void clean_msg(struct msg *m) { free(m); }
struct msg *init_msg() {
struct msg *m = malloc(sizeof(struct msg));
m->msg_type = 0;
m->req_message = 0;
m->option_value = 0;
m->timestamp = 0;
m->client_index = 0;
m->view = 0;
m->seq = 0;
m->digest = 0;
m->server_index = 0;
m->signature = 0;
return m;
}
void display_msg(struct msg *m) {
if (m->msg_type == 0) {
printf("Request: ");
if (m->req_message == 0) {
printf(" ADD %d [", m->option_value);
} else if (m->req_message == 1) {
printf(" SUB %d [", m->option_value);
} else if (m->req_message == 2) {
printf(" MUL %d [", m->option_value);
} else if (m->req_message == 3) {
printf(" GET [");
}
printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index);
} else if (m->msg_type == 1) {
printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq,
m->digest);
} else if (m->msg_type == 2) {
printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
m->server_index, m->digest);
} else if (m->msg_type == 3) {
printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
m->server_index, m->digest);
} else if (m->msg_type == 5) {
printf("Response: [ value %d ] \n", m->option_value);
}
}