178 lines
4.8 KiB
C
178 lines
4.8 KiB
C
|
#include "msg.h"
|
||
|
#include "hosts.h"
|
||
|
#include <poll.h>
|
||
|
#include <stdint.h>
|
||
|
#include <stdio.h>
|
||
|
#include <stdlib.h>
|
||
|
#include <string.h>
|
||
|
|
||
|
struct msg_log *mlog = NULL;
|
||
|
|
||
|
struct msg_log *outstanding = NULL;
|
||
|
struct msg_log *out_responses = NULL;
|
||
|
|
||
|
void broadcast(struct host **conns, int num_conns, packet *data) {
|
||
|
|
||
|
char *transmit = malloc(sizeof(char) * data->len + 1);
|
||
|
transmit[0] = (char)data->len;
|
||
|
memcpy((transmit + 1), data->data, data->len);
|
||
|
for (int i = 0; i < num_conns; i++) {
|
||
|
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
|
||
|
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
|
||
|
}
|
||
|
free(transmit);
|
||
|
}
|
||
|
|
||
|
void send_to(struct host *conn, packet *data) {
|
||
|
|
||
|
char *transmit = malloc(sizeof(char) * data->len + 1);
|
||
|
transmit[0] = (char)data->len;
|
||
|
memcpy((transmit + 1), data->data, data->len);
|
||
|
sendto(conn->sockfd, transmit, data->len + 1, 0,
|
||
|
(const struct sockaddr *)&conn->conn, sizeof(conn->conn));
|
||
|
free(transmit);
|
||
|
}
|
||
|
|
||
|
void send_to_replica(struct host **conns, int num_conns, int replica,
|
||
|
packet *data) {
|
||
|
for (int i = 0; i < num_conns; i++) {
|
||
|
if (replica == conns[i]->replica_index) {
|
||
|
char *transmit = malloc(sizeof(char) * data->len + 1);
|
||
|
transmit[0] = (char)data->len;
|
||
|
memcpy((transmit + 1), data->data, data->len);
|
||
|
sendto(conns[i]->sockfd, transmit, data->len + 1, 0,
|
||
|
(const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn));
|
||
|
free(transmit);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void append_msg(struct msg *m) {
|
||
|
struct msg_log *cur = malloc(sizeof(struct msg_log));
|
||
|
cur->m = m;
|
||
|
cur->next = mlog;
|
||
|
}
|
||
|
|
||
|
void discard_before_seq(uint32_t seq) {
|
||
|
struct msg_log *old;
|
||
|
struct msg_log *cur;
|
||
|
for (old = mlog; old != NULL; old = old->next) {
|
||
|
cur = old->next;
|
||
|
if (cur != NULL && cur->m->seq < seq) {
|
||
|
old->next = cur->next;
|
||
|
free(cur->m);
|
||
|
free(cur);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (mlog != NULL && mlog->m->seq < seq) {
|
||
|
mlog = mlog->next;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
struct msg *parse_packet_as_msg(packet *p) {
|
||
|
uint8_t msg_type = p->data[0];
|
||
|
uint8_t req_message = p->data[1];
|
||
|
uint32_t option_value = ntohl(((uint32_t *)p->data)[1]);
|
||
|
uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]);
|
||
|
uint32_t client_index = ntohl(((uint32_t *)p->data)[4]);
|
||
|
uint32_t view = ntohl(((uint32_t *)p->data)[5]);
|
||
|
uint32_t seq = ntohl(((uint32_t *)p->data)[6]);
|
||
|
uint32_t digest = ntohl(((uint32_t *)p->data)[7]);
|
||
|
uint32_t server_index = ntohl(((uint32_t *)p->data)[8]);
|
||
|
uint32_t signature = ntohl(((uint32_t *)p->data)[9]);
|
||
|
|
||
|
struct msg *m = malloc(sizeof(struct msg));
|
||
|
|
||
|
m->msg_type = msg_type;
|
||
|
m->option_value = option_value;
|
||
|
m->timestamp = timestamp;
|
||
|
m->client_index = client_index;
|
||
|
m->req_message = req_message;
|
||
|
m->view = view;
|
||
|
m->seq = seq;
|
||
|
m->digest = digest;
|
||
|
m->server_index = server_index;
|
||
|
m->signature = signature;
|
||
|
return m;
|
||
|
}
|
||
|
|
||
|
packet *parse_msg_as_packet(struct msg *m) {
|
||
|
uint32_t *data = malloc(sizeof(uint32_t) * 10);
|
||
|
data[0] = 0;
|
||
|
((uint8_t *)data)[0] = m->msg_type;
|
||
|
((uint8_t *)data)[1] = m->req_message;
|
||
|
|
||
|
data[1] = htonl(m->option_value);
|
||
|
((uint64_t *)data)[1] = htobe64(m->timestamp);
|
||
|
data[4] = htonl(m->client_index);
|
||
|
data[5] = htonl(m->view);
|
||
|
data[6] = htonl(m->seq);
|
||
|
data[7] = htonl(m->digest);
|
||
|
data[8] = htonl(m->server_index);
|
||
|
data[9] = htonl(m->signature);
|
||
|
|
||
|
uint8_t len = sizeof(uint32_t) * 10;
|
||
|
|
||
|
packet *p = malloc(sizeof(packet));
|
||
|
|
||
|
p->is_timed_out = false;
|
||
|
p->data = (uint8_t *)data;
|
||
|
p->len = len;
|
||
|
|
||
|
return p;
|
||
|
}
|
||
|
|
||
|
void clean_packet(packet *p) {
|
||
|
free(p->data);
|
||
|
free(p);
|
||
|
}
|
||
|
|
||
|
void clean_msg(struct msg *m) { free(m); }
|
||
|
|
||
|
struct msg *init_msg() {
|
||
|
struct msg *m = malloc(sizeof(struct msg));
|
||
|
m->msg_type = 0;
|
||
|
|
||
|
m->req_message = 0;
|
||
|
m->option_value = 0;
|
||
|
m->timestamp = 0;
|
||
|
m->client_index = 0;
|
||
|
|
||
|
m->view = 0;
|
||
|
m->seq = 0;
|
||
|
m->digest = 0;
|
||
|
m->server_index = 0;
|
||
|
m->signature = 0;
|
||
|
|
||
|
return m;
|
||
|
}
|
||
|
|
||
|
void display_msg(struct msg *m) {
|
||
|
if (m->msg_type == 0) {
|
||
|
printf("Request: ");
|
||
|
if (m->req_message == 0) {
|
||
|
printf(" ADD %d [", m->option_value);
|
||
|
} else if (m->req_message == 1) {
|
||
|
printf(" SUB %d [", m->option_value);
|
||
|
} else if (m->req_message == 2) {
|
||
|
printf(" MUL %d [", m->option_value);
|
||
|
} else if (m->req_message == 3) {
|
||
|
printf(" GET [");
|
||
|
}
|
||
|
|
||
|
printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index);
|
||
|
} else if (m->msg_type == 1) {
|
||
|
printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq,
|
||
|
m->digest);
|
||
|
} else if (m->msg_type == 2) {
|
||
|
printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
|
||
|
m->server_index, m->digest);
|
||
|
} else if (m->msg_type == 3) {
|
||
|
printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq,
|
||
|
m->server_index, m->digest);
|
||
|
} else if (m->msg_type == 5) {
|
||
|
printf("Response: [ value %d ] \n", m->option_value);
|
||
|
}
|
||
|
}
|