A Million Concurrency
Building on top of epoll
and the Reactor pattern, we will further enhance our
TCP server to achieve the capability of handling a million concurrent requests
in a non-blocking manner with a single thread.
Starting Point
TCP Client
We'll begin with the TCP server implemented using the Reactor pattern, as discussed in Reactor. To test its capabilities, we'll create a client program that attempts to establish one million connections and then send and receive data in a non-blocking manner.
#include <cerrno>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
// the max number of concurrent connections (1M)
#define MAX_CONN_SIZE 1000000
#define BUFFER_SIZE 128
// sets the socket to non-blocking mode
// by default, fd is in blocking mode.
// operations like read or write will block (wait indefinetly)
// if no data is available
int set_non_block(int fd) {
// get the current fd flags
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
// add the non-blocking flag to flags
flags |= O_NONBLOCK;
// apply the new flags
int status = fcntl(fd, F_SETFL, flags);
if (status < 0) return status;
return 0;
}
// enable address reuse for the socket
// when a socket is closed, its port might
// go into the TIME_WAIT state, preventing
// the application from immediately re-binding
// to the same port
// the SO_REUSEADDR option allows the socket to
// bind to an address that is in the TIME_WAIT
// state, enabling faster restarts of the application
int set_reuse_addr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
struct epoll_event events[MAX_CONN_SIZE] = {0};
int main() {
int port = 8081;
int connections = 0; // number of connections
int epfd = epoll_create(1);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(8001);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int socket_fd;
char buffer[BUFFER_SIZE] = {0};
while(1) {
struct epoll_event ev;
// creating connections until MAX_CONN_SIZE is reached
if (connections < MAX_CONN_SIZE) {
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1) {
perror("socket");
return -1;
}
int status = connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr));
if (status == -1) {
perror("socket connect");
return -1;
}
connections++;
set_non_block(socket_fd);
set_reuse_addr(socket_fd);
// ensure there is no old data in the buffer
// then write new data to it
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "client connections %d\n", connections);
// send it
send(socket_fd, buffer, strlen(buffer), 0);
ev.data.fd = socket_fd;
ev.events = EPOLLIN; // after the send above, we monitor for data from server
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
}
// every once in a while process events that are ready
if (connections % 1000 == 999 || connections >= MAX_CONN_SIZE) {
// timeout is 100ms
int nfds = epoll_wait(epfd, events, connections, 100);
int i;
for (i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
send(socket_fd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rbuffer[BUFFER_SIZE] = {0};
ssize_t len = recv(socket_fd, rbuffer, BUFFER_SIZE, 0);
if (len == 0) { // EOF, close connection
printf("EOF, closing fd %d\n", fd);
connections--;
close(fd);
} else {
// ignore these errors
if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
printf("error %d, closing fd %d\n", errno, fd);
connections--;
close(fd);
}
} else {
printf("expected error %d for fd %d\n", errno, fd);
close(fd);
}
}
}
usleep(50000); // sleep for 5ms
}
}
Core dumped?
You may encounter a Segmentation fault (core dumped)
error if you move events
from global scope to main()
.
When events
is declared in main()
, it is allocated on the stack, which has limited memory and may not be sufficient for a million epoll_event
structs.
To address this, we declare events
as a global array, which is allocated in a memory segment that is not restricted by the stack size.
Alternatively, you can check the current stack size limit using ulimit -a
and, if needed, temporarily increase it with ulimit -s
.
TCP Server
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1000000
int listen_fd = -1;
void handle_signal(int sig) {
if (listen_fd >= 0) {
printf("\nserver closing...\n");
close(listen_fd);
}
exit(0);
}
int init_server(unsigned int port) {
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allows 10 queued connections
return listen_fd;
}
int epfd;
typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);
// fd and its callbacks, along with buffers to support cbs
struct connection {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
// handle EPOLLIN
// for listen_fd it's used for accepting connection
// for client_fd it's used for receiving data
RCALLBACK recv_callback;
};
// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};
// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
int ctl;
if (flag) ctl = EPOLL_CTL_ADD;
else ctl = EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
return epoll_ctl(epfd, ctl, fd, &ev);
}
int register_event(int fd, int events) {
if (fd < 0) return -1; // otherwise may get "Segmentation fault (core dumped)"
connection_list[fd].fd = fd;
connection_list[fd].recv_callback = recv_cb;
connection_list[fd].send_callback = send_cb;
// reset buffers since fd can be reused
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
connection_list[fd].rlength = 0;
memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
connection_list[fd].wlength = 0;
// listen for event (readability)
set_event(fd, events, 1);
return 0;
}
int accept_cb(int fd) { // listen_fd
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);
if (client_fd == -1) {
perror("accept");
return -1;
}
register_event(client_fd, EPOLLIN);
printf("connection accepted, fd %d\n", client_fd);
return 0; // success
}
int recv_cb(int fd) {
int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);
printf("[fd %d] received: '%s' (%d bytes)\n", fd, connection_list[fd].rbuffer, count);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
connection_list[fd].rlength = count;
// echo to wbuffer (response)
connection_list[fd].wlength = count;
memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
// clear the read buffer for the next read otherwise it may contains data from the last ready
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
set_event(fd, EPOLLOUT, 0); // modify
return count;
}
int send_cb(int fd) {
int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance
return count;
}
int main() {
int listen_fd = init_server(8001);
if (listen_fd == -1) return -1;
signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)
epfd = epoll_create(1);
connection_list[listen_fd].fd = listen_fd;
connection_list[listen_fd].recv_callback = accept_cb;
set_event(listen_fd, EPOLLIN, 1);
while(1) { // main loop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i;
for (i = 0; i < nready; i++) {
int fd = events[i].data.fd;
// may handle read and write in the same iteration
if (events[i].events & EPOLLIN) {
connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
}
if (events[i].events & EPOLLOUT) {
connection_list[fd].send_callback(fd);
}
}
}
}
For the server, we increased the CONNECTION_SIZE
to one million with some minor fixes. For example, we introduced a fix in the register_event
function to ensure it always checks if the fd is valid before proceeding.
Testing
Environment
I'm testing on WSL2 and have allocated 14 GiB of memory to it.
Too many open files
When starting the tcp_server
and then the tcp_client
, you will encounter accept: Too many open files
on the server side and socket: Too many open files
on the client side once the total number of file descriptors (fds) reaches 1024. This happens because the default maximum number of open files is set to 1024, which can be checked using ulimit -a
. Since there will be at least one million fds on either the client or server, we need to increase this limit to a value slightly above that, such as 1048576 (1024 * 1024), using ulimit -n 1048576
.
After increasing the file descriptor limits in both the client and server sessions, let's try running the setup again.
Port exhaustion
Now, we can create more than 1024 fds on both the client and server, but the system stops creating new connections after reaching about 28,000. This issue occurs because, in socket programming, a client typically does not explicitly assign a port when creating a socket. Instead, the operating system automatically assigns an ephemeral (temporary) port from a predefined range, which by default is 32768 to 60999 on my Linux. Event though we've increased the number of open file descriptors, we're still limited by the range of ephemeral ports, which can accommodate only around 28,000 connections at most.
A practical solution is for the server to create a range of ports for clients to connect to, rather than relying on a single port. By distributing connections across multiple server ports, the load is spread out, reducing competition for ephemeral ports and allowing clients to connect without exhausting the limited port range.
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/time.h>
#define BUFFER_LENGTH 1024
// the client will try to create a million conection
// the connection size needs to be bigger than a million
// because we are also using fd as index and new fd doesn't
// start from 0
#define CONNECTION_SIZE 1012000
// for port cycling
#define MAX_PORTS 30
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
struct timeval ts;
int listen_fds[MAX_PORTS] = {0};
void handle_signal(int sig) {
int i;
printf("\nserver closing...\n");
for (i = 0; i < MAX_PORTS; i++) {
int fd = listen_fds[i];
if (fd > 0) {
printf("[%d] closing fd %d\n", i, fd);
close(fd);
};
}
printf("bye\n");
exit(0);
}
int init_server(unsigned int port) {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allows 10 queued connections
return listen_fd;
}
int epfd;
typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);
// fd and its callbacks, along with buffers to support cbs
struct connection {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
// handle EPOLLIN
// for listen_fd it's used for accepting connection
// for client_fd it's used for receiving data
RCALLBACK recv_callback;
};
// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};
// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
int ctl;
if (flag) ctl = EPOLL_CTL_ADD;
else ctl = EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
return epoll_ctl(epfd, ctl, fd, &ev);
}
int register_event(int fd, int events) {
if (fd < 0) return -1; // otherwise may get "Segmentation fault (core dumped)"
connection_list[fd].fd = fd;
connection_list[fd].recv_callback = recv_cb;
connection_list[fd].send_callback = send_cb;
// reset buffers since fd can be reused
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
connection_list[fd].rlength = 0;
memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
connection_list[fd].wlength = 0;
// listen for event (readability)
set_event(fd, events, 1);
return 0;
}
int connections = 0;
int accept_cb(int fd) { // listen_fd
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);
if (client_fd == -1) {
perror("accept");
return -1;
}
register_event(client_fd, EPOLLIN);
connections++;
// shows the number of connections
// and the time to create them
if (connections % 1000 == 0 || connections > (CONNECTION_SIZE - 1000)) {
struct timeval now;
gettimeofday(&now, NULL);
int ts_elapsed = TIME_SUB_MS(now, ts);
memcpy(&ts, &now, sizeof(struct timeval));
printf("number of connections: %d, time elapsed: %dms\n", connections, ts_elapsed);
}
return 0; // success
}
int recv_cb(int fd) {
int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
connection_list[fd].rlength = count;
// echo to wbuffer (response)
connection_list[fd].wlength = count;
memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
// clear the read buffer for the next read otherwise it may contains data from the last ready
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
set_event(fd, EPOLLOUT, 0); // modify
return count;
}
int send_cb(int fd) {
int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);
// printf("fd %d send %d bytes\n", fd, count);
set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance
return count;
}
int main() {
signal(SIGPIPE, SIG_IGN); // trap broken pipes
signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)
gettimeofday(&ts, NULL);
unsigned short port = 8001;
epfd = epoll_create(1);
int i;
for (i = 0; i < MAX_PORTS; i++) {
int listen_fd = init_server(port + i);
if (listen_fd == -1) return -1;
listen_fds[i] = listen_fd; // register for cleanup
connection_list[listen_fd].fd = listen_fd;
connection_list[listen_fd].recv_callback = accept_cb;
set_event(listen_fd, EPOLLIN, 1);
}
while(1) { // main loop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i;
for (i = 0; i < nready; i++) {
int fd = events[i].data.fd;
// may handle read and write in the same iteration
if (events[i].events & EPOLLIN) {
connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
}
if (events[i].events & EPOLLOUT) {
connection_list[fd].send_callback(fd);
}
}
}
}
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
// the max number of concurrent connections, a million
#define MAX_CONN_SIZE 1000000
#define BUFFER_SIZE 128
// for port cycling
#define MAX_PORTS 30
// sets the socket to non-blocking mode
// by default, fd is in blocking mode.
// operations like read or write will block (wait indefinetly)
// if no data is available
int set_non_block(int fd) {
// get the current fd flags
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
// add the non-blocking flag to flags
flags |= O_NONBLOCK;
// apply the new flags
int status = fcntl(fd, F_SETFL, flags);
if (status < 0) return status;
return 0;
}
// enable address reuse for the socket
// when a socket is closed, its port might
// go into the TIME_WAIT state, preventing
// the application from immediately re-binding
// to the same port
// the SO_REUSEADDR option allows the socket to
// bind to an address that is in the TIME_WAIT
// state, enabling faster restarts of the application
int set_reuse_addr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
struct epoll_event events[MAX_CONN_SIZE] = {0};
int main() {
// prevent unexpected terminations due to
// broken pipes or closed sockets
signal(SIGPIPE, SIG_IGN);
int port = 8001;
int connections = 0; // number of connections
int epfd = epoll_create(1);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int socket_fd;
char buffer[BUFFER_SIZE] = {0};
// for port cycling
// tracks which port to use next
int port_idx = 0;
while(1) {
struct epoll_event ev;
// creating connections until MAX_CONN_SIZE is reached
if (connections < MAX_CONN_SIZE) {
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1) {
perror("socket");
return -1;
}
// port cycling
addr.sin_port = htons(port + port_idx % MAX_PORTS);
port_idx++;
int status = connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr));
if (status == -1) {
perror("socket connect");
return -1;
// continue;
}
connections++;
set_non_block(socket_fd);
set_reuse_addr(socket_fd);
// ensure there is no old data in the buffer
// then write new data to it
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "client connections %d\n", connections);
// send it
send(socket_fd, buffer, strlen(buffer), 0);
ev.data.fd = socket_fd;
ev.events = EPOLLIN; // after the send above, we monitor for data from server
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
}
printf("connections %d\n", connections);
// every once in a while process events that are ready
if (connections % 1000 == 999 || connections >= MAX_CONN_SIZE) {
// timeout is 100ms
int nfds = epoll_wait(epfd, events, connections, 100);
int i;
for (i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
send(socket_fd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rbuffer[BUFFER_SIZE] = {0};
ssize_t len = recv(socket_fd, rbuffer, BUFFER_SIZE, 0);
if (len == 0) { // EOF, close connection
printf("EOF, closing fd %d\n", fd);
connections--;
close(fd);
} else {
// ignore these errors
if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
printf("error %d, closing fd %d\n", errno, fd);
connections--;
close(fd);
}
} else {
printf("expected error %d for fd %d\n", errno, fd);
close(fd);
}
}
}
usleep(50); // sleep for 50 microseconds
}
}
sysctl.conf
We increased the maximum number of open files using ulimit -n 1048576
in both the client and server sessions, but we may still encounter a "too many open files" error at some point while running the program. In such cases, we can increase the system-wide limit by setting fs.file-max
in /etc/sysctl.conf
and applying the changes with sudo sysctl -p
. Additionally, we may notice performance degradation at a certain number of concurrent connections. To address this, we can further optimize system settings in /etc/sysctl.conf
to enhance the system's ability to handle high traffic and improve scalability.
# Additional settings
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_wmem = 1024 1024 2048
net.ipv4.tcp_rmem = 1024 1024 2048
fs.file-max = 2097152
net.nf_conntrack_max = 2097152
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
Result
In my current settings, it successfully creates connections without significant performance degradation until reaching 968,000 connections, after which performance suddenly drops.
Then it slowly reaches 1,000,000 concurrent connections.
The performance degradation seems to start when CPU usage reaches 200%.
I also tested with a t2.2xlarge EC2 instance, which has 8 vCPUs and 32 GiB of memory, and observed similar results.
Note
There are several factors to consider when testing the server, KV storage, and other components, including concurrency (the number of simultaneous connections), QPS (Queries Per Second, which measures request rate), latency, and test cases that cover business logic.