Example of select-based in accepted Sockets to allow rapid termination.
#include <sys/socket.h>
#include <poll.h>
#include <atomic>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <iostream>
class CommandHandler {
public:
CommandHandler(
Socket::Ptr theSocket, std::atomic_bool& theTerminateRequested,
const std::string& greeting)
: socket(theSocket), terminateRequested(theTerminateRequested)
{
try {
send(greeting);
send("Any line beginning with \"quit\" will terminate this echo client.\n");
send("Any line beginning with \"terminate\" will terminate the echo server.\n");
} catch (...) {
}
}
void operator() () {
try {
while(!terminateRequested.load()) {
if(readOne()) {
break;
}
}
} catch(...) {
}
}
private:
bool readOne() {
const SelectResult result = socket->select(timeout_ms, true, false, true);
if(terminateRequested.load()) {
return true;
}
if(result.readyWithError) {
return true;
}
if(result.readyToRead) {
return readCurrent();
}
return false;
}
bool readCurrent() {
const int bufferLen = 1024;
char buffer[bufferLen];
const ssize_t readLength = socket->blockingReceive(buffer, bufferLen - 1);
if(0 == readLength) {
return false;
}
return actOn(std::string(buffer, readLength));
}
bool actOn(const std::string& buffer) {
if(0 == buffer.find("terminate")) {
send("Requesting shutdown now!\n");
terminateRequested.store(true);
return true;
} else if(0 == buffer.find("quit")) {
send("Goodbye!\n");
return true;
}
send(buffer);
return false;
}
void send(const std::string& message) {
socket->send(message.c_str(), message.length() + 1);
}
private:
static const int timeout_ms = 500;
std::atomic_bool& terminateRequested;
};
class CommandServer {
public:
: serverSocket(ServerSocket::open(address))
{
terminateRequested.store(false);
}
void operator()() {
try {
while(!terminateRequested.load()) {
if(acceptOne()) {
break;
}
}
} catch(...) {
}
for(std::thread& acceptedThread : acceptedThreads) {
try {
acceptedThread.join();
} catch (...) {
}
}
}
private:
bool acceptOne() {
const SelectResult result = serverSocket->select(timeout_ms, true, false, true);
if(terminateRequested.load()) {
return true;
}
if(result.readyWithError) {
return true;
}
if(result.readyToRead) {
acceptCurrent();
}
return false;
}
void acceptCurrent() {
const int id = acceptedThreads.size() + 1;
acceptedThreads.push_back(serverSocket->acceptInNewThread(
const std::string greeting = std::string("Welcome, Connection #") + std::to_string(id) + "\n";
CommandHandler e(accepted, terminateRequested, greeting);
e();
}));
}
private:
static const int timeout_ms = 500;
std::vector<std::thread> acceptedThreads;
std::atomic_bool terminateRequested;
};
int main(int argc, char** argv) {
std::thread serverThread(
[=]() {
CommandServer server(addr);
server();
});
serverThread.join();
return 0;
}
std::shared_ptr< Address > Ptr
std::shared_ptr< InetAddress > Ptr
static int create(SocketType socketType, const std::string &serviceName, const std::string &address, std::vector< InetAddress::Ptr > &created, unsigned int max=0)
std::shared_ptr< ServerSocket > Ptr
std::shared_ptr< Socket > Ptr
Namespace of the BsdSockets library.