iu9-ca-web-chat/src/http_server/engine_engine_number_9/running_mainloop.cpp

300 lines
12 KiB
C++
Raw Normal View History

2024-07-27 11:27:52 +00:00
#include "running_mainloop.h"
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>
#include <assert.h>
#include <map>
#include <queue>
#include <utility>
#include "thread_synchronization.h"
#include "os_utils.h"
#include "http_structures/client_request_parse.h"
#include "http_structures/response_gen.h"
#include "baza_inter.h"
#include "admin_control.h"
2024-07-27 11:27:52 +00:00
namespace een9 {
struct QElementHttpConnections {
SlaveTask task;
QElementHttpConnections* nxt = NULL;
explicit QElementHttpConnections(SlaveTask task): task(std::move(task)) {}
};
struct WorkersTaskQueue {
QElementHttpConnections* first = NULL;
QElementHttpConnections** afterLastPtr;
size_t sz = 0;
WorkersTaskQueue() {
afterLastPtr = &first;
}
bool empty() const {
return sz == 0;
}
size_t size() const {
return sz;
}
void push_back(SlaveTask task) {
/* Throws a goddamn execption. Because why not. Ofcourse everything has to throw an exception */
/* CLion says. Allocated memory is leaking. YOUR MOTHER IS LEAKING YOU FOOL!! MY CODE IS FINE!! */
QElementHttpConnections* el = new QElementHttpConnections(std::move(task));
/* Exception does not leave queue in incorrect state */
*afterLastPtr = el;
afterLastPtr = &(el->nxt);
sz++;
}
void pop_first(SlaveTask& ret_task) {
assert(!empty());
ret_task = std::move(first->task);
if (sz == 1) {
delete first;
first = NULL;
afterLastPtr = &first;
sz = 0;
} else {
/* Before I popped the first, this element was second, but now it took place of the first */
QElementHttpConnections* old_deut = first->nxt;
delete first;
first = old_deut;
sz--;
}
}
};
struct WorkersEnvCommon {
2024-07-27 11:27:52 +00:00
/* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */
CondVarBedObj corvee_bed;
WorkersTaskQueue queue;
bool& termination;
guest_core_t guest_core;
guest_core_admin_control_t guest_core_admin_control;
2024-07-27 11:27:52 +00:00
/* Parser programs */
ClientRequestParser_CommonPrograms parser_programs;
WorkersEnvCommon(bool& term, const MainloopParameters& params): termination(term),
guest_core(params.guest_core), guest_core_admin_control(params.guest_core_admin_control){}
2024-07-27 11:27:52 +00:00
};
struct WorkersEnv {
WorkersEnvCommon& wtec;
worker_id_t id;
ClientRequestParser_WorkerBuffers personal_parser_buffer;
explicit WorkersEnv(WorkersEnvCommon& wtec, worker_id_t id): wtec(wtec), id(id), personal_parser_buffer(wtec.parser_programs){}
};
// todo: add timeout for multiple bytes, add more settings
ClientRequest process_http_connection_input(int fd, const EEN9_ServerTips& s_tips, WorkersEnv& wte) {
ClientRequest res;
ClientHttpRequestParser_Ctx parser(res, wte.personal_parser_buffer, wte.wtec.parser_programs);
int ret;
char buf[2048];
assert(parser.status == 0);
while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) {
for (size_t i = 0; i < ret; i++) {
if (parser.feedCharacter(buf[i]) != 0)
break;
}
if (parser.status != 0)
break;
}
ASSERT(parser.status == 1, "Incorrect request"); // todo: do the same thing everywhere else
ASSERT_on_iret(ret, "recv");
return res;
}
void process_connection_output(int fd, const std::string& server_response) {
size_t N = server_response.size(), i = 0;
while (i < N) {
/* MSG_NOSIGNAL set to prevent SIGPIPE */
int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), MSG_NOSIGNAL);
ASSERT_on_iret(written, "sending");
ASSERT_pl(written > 0);
i += written;
}
printf("Log: worker: succesfully asnwered with response\n");
}
std::string process_admin_control_connection_input(int fd, const EEN9_ServerTips& s_tips, WorkersEnv& wte) {
AdminControlRequestRCtx pctx;
int ret;
char buf[2048];
assert(pctx.status == 0);
while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) {
for (size_t i = 0; i < ret; i++) {
if (pctx.feedCharacter(buf[i]) != 0)
break;
}
if (pctx.status != 0)
break;
}
ASSERT(pctx.status == 1, "Incorrect request");
ASSERT_on_iret(ret, "recv");
return pctx.body;
}
void process_connection(const SlaveTask& task, WorkersEnv& wte) {
if (task.conn_info.type == 0) {
printf("%d::Got http reuest\n", wte.id);
ClientRequest client_request = process_http_connection_input(task.fd(), task.s_tips, wte);
printf("%d::Http request has been read\n", wte.id);
std::string server_response = wte.wtec.guest_core(task, client_request, wte.id);
process_connection_output(task.fd(), server_response);
printf("%d::Http response has been sent\n", wte.id);
} else if (task.conn_info.type == 1) {
printf("%d::Got admin-cmd request\n", wte.id);
std::string admin_request = process_admin_control_connection_input(task.fd(), task.s_tips, wte);
printf("%d::Admin-cmd request has been read\n", wte.id);
std::string server_response_content = wte.wtec.guest_core_admin_control(task, admin_request, wte.id);
std::string server_response = generate_admin_control_response(server_response_content);
process_connection_output(task.fd(), server_response);
printf("%d::Admin-cmd response has been sent\n", wte.id);
}
}
2024-07-27 11:27:52 +00:00
void* worker_func(void* wte_ptr) {
WorkersEnv& wte = *((WorkersEnv*)wte_ptr);
WorkersEnvCommon& wtec = wte.wtec;
2024-07-27 11:27:52 +00:00
printf("Worker started\n");
while (true) {
try {
MutexLockGuard cb_lg(wtec.corvee_bed, __func__);
2024-07-27 11:27:52 +00:00
woke:
if (wtec.termination)
2024-07-27 11:27:52 +00:00
break;
if (wtec.queue.empty()) {
wtec.corvee_bed.sleep(__func__);
2024-07-27 11:27:52 +00:00
goto woke;
}
SlaveTask task;
wtec.queue.pop_first(task);
process_connection(task, wte);
2024-07-27 11:27:52 +00:00
} catch (const std::exception& e) {
printf("Client request procession failure in worker\n");
printf("%s\n", e.what());
/* Under mysterious some circumstances, in this place destructor of string in SystemError causes segfault. I can't fix that */
2024-07-27 11:27:52 +00:00
}
}
printf("Worker finished\n");
return NULL;
}
void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) {
WorkersEnvCommon wtec(termination_trigger, params);
2024-07-27 11:27:52 +00:00
ASSERT(params.slave_number > 0, "No workers spawned");
size_t CRL_Nip = params.client_regular_listened.size();
ASSERT(CRL_Nip > 0, "No open listeting addresses (http)");
size_t ACL_Nip = params.admin_control_listened.size();
size_t Nip = CRL_Nip + ACL_Nip;
2024-07-27 11:27:52 +00:00
std::vector<pthread_t> workers(params.slave_number);
std::vector<uptr<WorkersEnv>> wtes(params.slave_number);
2024-07-27 11:27:52 +00:00
for (size_t i = 0; i < params.slave_number; i++) {
wtes[i] = std::make_unique<WorkersEnv>(wtec, (worker_id_t)i);
}
for (size_t i = 0; i < params.slave_number; i++) {
pthread_create(&workers[i], NULL, worker_func, wtes[i].get());
2024-07-27 11:27:52 +00:00
}
// todo: move this try block inside the loop
2024-07-27 11:27:52 +00:00
try {
int ret;
struct Ear {
/* A copy from params */
SocketAddress my_addr;
UniqueFdWrapper listening_sock;
/* type 0 is for http protocol
* type 1 is for admin-cmd protocol */
int type;
};
std::vector<Ear> ears(Nip);
for (size_t i = 0; i < CRL_Nip; i++) {
ears[i].my_addr = params.client_regular_listened[i];
ears[i].type = 0;
}
for (size_t i = 0; i < ACL_Nip; i++) {
ears[i + CRL_Nip].my_addr = params.admin_control_listened[i];
ears[i + CRL_Nip].type = 1;
}
2024-07-27 11:27:52 +00:00
for (size_t i = 0; i < Nip; i++) {
int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
ASSERT_on_iret(listening_socket_fd, "'Listening socket' creation");
2024-07-27 11:27:52 +00:00
UniqueFdWrapper listening_socket(listening_socket_fd);
int reuseaddr_nozero_option_value = 1;
ret = setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_nozero_option_value, sizeof(int));
ASSERT_on_iret(ret, "Can't set SO_REUSEADDR");
bind_to_socket_address(listening_socket_fd, ears[i].my_addr);
2024-07-27 11:27:52 +00:00
ret = listen(listening_socket(), 128);
ASSERT_on_iret(ret, "listen() listening for connections");
ears[i].listening_sock = std::move(listening_socket);
2024-07-27 11:27:52 +00:00
}
std::vector<pollfd> pollfds(Nip);
for (size_t i = 0; i < Nip; i++) {
pollfds[i].fd = ears[i].listening_sock();
2024-07-27 11:27:52 +00:00
pollfds[i].events = POLLRDNORM;
}
ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout");
while (true) {
// MutexLockGuard lg1(wtec.corvee_bed, "poller termination check");
if (wtec.termination)
2024-07-27 11:27:52 +00:00
break;
// lg1.unlock();
2024-07-27 11:27:52 +00:00
for (size_t i = 0; i < Nip; i++) {
pollfds[i].revents = 0;
}
errno = 0;
ret = poll(pollfds.data(), Nip, params.mainloop_recheck_interval_us);
if (errno == EINTR)
break;
// todo: do not end program here (in the loop. Nothing in the loop should be able to end program)
ASSERT_on_iret(ret, "poll()");
2024-07-27 11:27:52 +00:00
for (size_t i = 0; i < Nip; i++) {
if ((pollfds[i].revents & POLLRDNORM)) {
try {
int session_sock = accept(pollfds[i].fd, NULL, NULL);
2024-07-27 11:27:52 +00:00
ASSERT_on_iret(session_sock, "Failed to accept incoming connection");
UniqueFdWrapper session_sock_fdw(session_sock);
configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout);
SocketAddress peer_addr;
get_peer_name_as_socket_address(session_sock, peer_addr);
{ MutexLockGuard lg2(wtec.corvee_bed, "poller adds connection");
SlaveTask task{
ConnectionInfo{ears[i].my_addr, peer_addr, ears[i].type},
std::move(session_sock_fdw),
EEN9_ServerTips{wtec.queue.size(),
params.s_conf.critical_load_1, params.s_conf.request_timeout}
};
if (wtec.queue.size() < params.s_conf.critical_load_2)
wtec.queue.push_back(std::move(task));
2024-07-27 11:27:52 +00:00
}
wtec.corvee_bed.din_don();
2024-07-27 11:27:52 +00:00
} catch (const std::exception& e) {
printf("Error aceepting connection\n");
printf("%s\n", e.what());
}
}
}
}
} catch (const std::exception& e) {
printf("System failure 2\n");
printf("%s\n", e.what());
/* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind */
wtec.termination = true;
wtec.corvee_bed.wake_them_all();
2024-07-27 11:27:52 +00:00
}
wtec.termination = true;
wtec.corvee_bed.wake_them_all();
2024-07-27 11:27:52 +00:00
for (size_t i = 0; i < params.slave_number; i++) {
pthread_join(workers[i], NULL);
}
}
}