#include "msg.h" #include "hosts.h" #include #include #include #include #include struct msg_log *mlog = NULL; struct msg_log *outstanding = NULL; struct msg_log *out_responses = NULL; void broadcast(struct host **conns, int num_conns, packet *data) { char *transmit = malloc(sizeof(char) * data->len + 1); transmit[0] = (char)data->len; memcpy((transmit + 1), data->data, data->len); for (int i = 0; i < num_conns; i++) { sendto(conns[i]->sockfd, transmit, data->len + 1, 0, (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); } free(transmit); } void send_to(struct host *conn, packet *data) { char *transmit = malloc(sizeof(char) * data->len + 1); transmit[0] = (char)data->len; memcpy((transmit + 1), data->data, data->len); sendto(conn->sockfd, transmit, data->len + 1, 0, (const struct sockaddr *)&conn->conn, sizeof(conn->conn)); free(transmit); } void send_to_replica(struct host **conns, int num_conns, int replica, packet *data) { for (int i = 0; i < num_conns; i++) { if (replica == conns[i]->replica_index) { char *transmit = malloc(sizeof(char) * data->len + 1); transmit[0] = (char)data->len; memcpy((transmit + 1), data->data, data->len); sendto(conns[i]->sockfd, transmit, data->len + 1, 0, (const struct sockaddr *)&conns[i]->conn, sizeof(conns[i]->conn)); free(transmit); } } } void append_msg(struct msg *m) { struct msg_log *cur = malloc(sizeof(struct msg_log)); cur->m = m; cur->next = mlog; } void discard_before_seq(uint32_t seq) { struct msg_log *old; struct msg_log *cur; for (old = mlog; old != NULL; old = old->next) { cur = old->next; if (cur != NULL && cur->m->seq < seq) { old->next = cur->next; free(cur->m); free(cur); } } if (mlog != NULL && mlog->m->seq < seq) { mlog = mlog->next; } } struct msg *parse_packet_as_msg(packet *p) { uint8_t msg_type = p->data[0]; uint8_t req_message = p->data[1]; uint32_t option_value = ntohl(((uint32_t *)p->data)[1]); uint64_t timestamp = be64toh(((uint64_t *)p->data)[1]); uint32_t client_index = ntohl(((uint32_t *)p->data)[4]); uint32_t view = ntohl(((uint32_t *)p->data)[5]); uint32_t seq = ntohl(((uint32_t *)p->data)[6]); uint32_t digest = ntohl(((uint32_t *)p->data)[7]); uint32_t server_index = ntohl(((uint32_t *)p->data)[8]); uint32_t signature = ntohl(((uint32_t *)p->data)[9]); struct msg *m = malloc(sizeof(struct msg)); m->msg_type = msg_type; m->option_value = option_value; m->timestamp = timestamp; m->client_index = client_index; m->req_message = req_message; m->view = view; m->seq = seq; m->digest = digest; m->server_index = server_index; m->signature = signature; return m; } packet *parse_msg_as_packet(struct msg *m) { uint32_t *data = malloc(sizeof(uint32_t) * 10); data[0] = 0; ((uint8_t *)data)[0] = m->msg_type; ((uint8_t *)data)[1] = m->req_message; data[1] = htonl(m->option_value); ((uint64_t *)data)[1] = htobe64(m->timestamp); data[4] = htonl(m->client_index); data[5] = htonl(m->view); data[6] = htonl(m->seq); data[7] = htonl(m->digest); data[8] = htonl(m->server_index); data[9] = htonl(m->signature); uint8_t len = sizeof(uint32_t) * 10; packet *p = malloc(sizeof(packet)); p->is_timed_out = false; p->data = (uint8_t *)data; p->len = len; return p; } void clean_packet(packet *p) { free(p->data); free(p); } void clean_msg(struct msg *m) { free(m); } struct msg *init_msg() { struct msg *m = malloc(sizeof(struct msg)); m->msg_type = 0; m->req_message = 0; m->option_value = 0; m->timestamp = 0; m->client_index = 0; m->view = 0; m->seq = 0; m->digest = 0; m->server_index = 0; m->signature = 0; return m; } void display_msg(struct msg *m) { if (m->msg_type == 0) { printf("Request: "); if (m->req_message == 0) { printf(" ADD %d [", m->option_value); } else if (m->req_message == 1) { printf(" SUB %d [", m->option_value); } else if (m->req_message == 2) { printf(" MUL %d [", m->option_value); } else if (m->req_message == 3) { printf(" GET ["); } printf(" at: %ld, from: %d ]\n", m->timestamp, m->client_index); } else if (m->msg_type == 1) { printf("Pre-prepare: [ view %d, seq %d ] (D: %d) \n", m->view, m->seq, m->digest); } else if (m->msg_type == 2) { printf("Prepare: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, m->server_index, m->digest); } else if (m->msg_type == 3) { printf("Commit: [ view %d, seq %d, from: %d ] (D: %d) \n", m->view, m->seq, m->server_index, m->digest); } else if (m->msg_type == 5) { printf("Response: [ value %d ] \n", m->option_value); } }