#include "running_mainloop.h" #include #include #include #include #include #include #include #include #include "thread_synchronization.h" #include "os_utils.h" #include "http_structures/client_request_parse.h" #include "http_structures/response_gen.h" #include "baza_inter.h" namespace een9 { // todo: add timeout for multiple bytes, add more settings ClientRequest process_connection_input(int fd, const EEN9_ServerTips& s_tips) { ClientRequest res; ClientRequestParser parser(res); int ret; char buf[2048]; ASSERT_pl(!parser.completed()) while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) { for (size_t i = 0; i < ret; i++) parser.feedByte(buf[i]); if (parser.completed()) break; } ASSERT_on_iret(ret, "recv"); ASSERT_pl(parser.completed()); // printf("Log: worker received clients request\n%s\n", client_request.toString().c_str()); return res; } void process_connection_output(int fd, const std::string& server_response) { size_t N = server_response.size(), i = 0; while (i < N) { int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), 0); ASSERT_on_iret(written, "sending"); ASSERT_pl(written > 0); i += written; } printf("Log: worker: succesfully asnwered with response\n"); } void process_connection(const SlaveTask& task, const guest_core_t& guest_core) { ClientRequest client_request = process_connection_input(task.fd(), task.s_tips); std::string server_response = guest_core(task, client_request); process_connection_output(task.fd(), server_response); } struct QElementHttpConnections { SlaveTask task; QElementHttpConnections* nxt = NULL; explicit QElementHttpConnections(SlaveTask task): task(std::move(task)) {} }; struct WorkersTaskQueue { QElementHttpConnections* first = NULL; QElementHttpConnections** afterLastPtr; size_t sz = 0; WorkersTaskQueue() { afterLastPtr = &first; } bool empty() const { return sz == 0; } size_t size() const { return sz; } void push_back(SlaveTask task) { /* Throws a goddamn execption. Because why not. Ofcourse everything has to throw an exception */ /* CLion says. Allocated memory is leaking. YOUR MOTHER IS LEAKING YOU FOOL!! MY CODE IS FINE!! */ QElementHttpConnections* el = new QElementHttpConnections(std::move(task)); /* Exception does not leave queue in incorrect state */ *afterLastPtr = el; afterLastPtr = &(el->nxt); sz++; } void pop_first(SlaveTask& ret_task) { assert(!empty()); ret_task = std::move(first->task); if (sz == 1) { delete first; first = NULL; afterLastPtr = &first; sz = 0; } else { /* Before I popped the first, this element was second, but now it took place of the first */ QElementHttpConnections* old_deut = first->nxt; delete first; first = old_deut; sz--; } } }; struct WorkersTaskEnv { /* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */ CondVarBedObj corvee_bed; WorkersTaskQueue queue; bool& termination; guest_core_t guest_core; WorkersTaskEnv(bool& term, guest_core_t g_c): termination(term), guest_core(std::move(g_c)){} }; void* worker_func(void* wte_ptr) { WorkersTaskEnv& wte = *((WorkersTaskEnv*)wte_ptr); printf("Worker started\n"); while (true) { try { MutexLockGuard cb_lg(wte.corvee_bed, __func__); woke: if (wte.termination) break; if (wte.queue.empty()) { wte.corvee_bed.sleep(__func__); goto woke; } SlaveTask task; wte.queue.pop_first(task); process_connection(task, wte.guest_core); } catch (const std::exception& e) { printf("Client request procession failure in worker\n"); printf("%s\n", e.what()); } } printf("Worker finished\n"); return NULL; } void configure_socket_rcvsndtimeo(int fd, timeval tv) { int ret; ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(timeval)); ASSERT_on_iret_pl(ret); ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(timeval)); ASSERT_on_iret_pl(ret); } // todo: retrieve address of connected client void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) { WorkersTaskEnv wte(termination_trigger, params.guest_core); ASSERT(params.slave_number > 0, "No workers spawned"); size_t Nip = params.ports_to_listen.size(); ASSERT(Nip > 0, "No open listeting addresses"); std::vector workers(params.slave_number); for (size_t i = 0; i < params.slave_number; i++) { pthread_create(&workers[i], NULL, worker_func, &wte); } try { int ret; std::vector listening_socks(Nip); for (size_t i = 0; i < Nip; i++) { printf("Creating listening socket\n"); uint16_t port = params.ports_to_listen[i]; int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0); ASSERT_on_iret(listening_socket_fd, "Listening socket creation"); UniqueFdWrapper listening_socket(listening_socket_fd); printf("Listening socket created\n"); sockaddr_in listening_address; listening_address.sin_family = AF_INET; listening_address.sin_port = htons(port); uint32_t lca = (127u << 24) | 1; listening_address.sin_addr.s_addr = htonl(lca); int reuseaddr_nozero_option_value = 1; ret = setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_nozero_option_value, sizeof(int)); ASSERT_on_iret(ret, "setting SO_REUSEADDR befire binding to address"); ret = bind(listening_socket(), (const sockaddr*)&listening_address, sizeof(listening_address)); ASSERT_on_iret(ret, "binding to INADDR_ANY:" + std::to_string(port)); printf("Binded socket to address\n"); ret = listen(listening_socket(), 128); ASSERT_on_iret(ret, "listening for connections"); printf("Listening socket succesfully started listening\n"); listening_socks[i] = std::move(listening_socket); } std::vector pollfds(Nip); for (size_t i = 0; i < Nip; i++) { pollfds[i].fd = listening_socks[i](); pollfds[i].events = POLLRDNORM; } printf("Entering mainloop\n"); ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout"); while (true) { MutexLockGuard lg1(wte.corvee_bed, "poller termination check"); if (wte.termination) break; lg1.unlock(); for (size_t i = 0; i < Nip; i++) { pollfds[i].revents = 0; } errno = 0; ret = poll(pollfds.data(), Nip, params.mainloop_recheck_interval_us); if (errno == EINTR) break; ASSERT_on_iret(ret, "polling"); for (size_t i = 0; i < Nip; i++) { if ((pollfds[i].revents & POLLRDNORM)) { try { sockaddr client_address; socklen_t client_addr_len = sizeof(client_address); int session_sock = accept(pollfds[i].fd, &client_address, &client_addr_len); ASSERT_on_iret(session_sock, "Failed to accept incoming connection"); printf("Log: successful connection\n"); UniqueFdWrapper session_sock_fdw(session_sock); configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout); { MutexLockGuard lg2(wte.corvee_bed, "poller adds connection"); SlaveTask task{ConnectionInfo{}, std::move(session_sock_fdw), EEN9_ServerTips{wte.queue.size(), params.s_conf.critical_load_1, params.s_conf.request_timeout}}; if (wte.queue.size() < params.s_conf.critical_load_2) wte.queue.push_back(std::move(task)); } wte.corvee_bed.din_don(); } catch (const std::exception& e) { printf("Error aceepting connection\n"); printf("%s\n", e.what()); } } } } } catch (const std::exception& e) { printf("System failure 2\n"); printf("%s\n", e.what()); /* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind */ wte.termination = true; wte.corvee_bed.wake_them_all(); } wte.termination = true; wte.corvee_bed.wake_them_all(); for (size_t i = 0; i < params.slave_number; i++) { pthread_join(workers[i], NULL); } } void safe_electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) { try { electric_boogaloo(params, termination_trigger); } catch (const std::exception& e) { printf("System failure\n"); printf("%s\n", e.what()); } } }