Skip to content

A Million Concurrency

Building on top of epoll and the Reactor pattern, we will further enhance our TCP server to achieve the capability of handling a million concurrent requests in a non-blocking manner with a single thread.

Starting Point

TCP Client

We'll begin with the TCP server implemented using the Reactor pattern, as discussed in Reactor. To test its capabilities, we'll create a client program that attempts to establish one million connections and then send and receive data in a non-blocking manner.

tcp_client.c
#include <cerrno>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>

// the max number of concurrent connections (1M)
#define MAX_CONN_SIZE 1000000
#define BUFFER_SIZE 128

// sets the socket to non-blocking mode
// by default, fd is in blocking mode.
// operations like read or write will block (wait indefinetly)
// if no data is available
int set_non_block(int fd) {
  // get the current fd flags
  int flags = fcntl(fd, F_GETFL, 0);
  if (flags < 0) return flags;

  // add the non-blocking flag to flags
  flags |= O_NONBLOCK;
  // apply the new flags
  int status = fcntl(fd, F_SETFL, flags);
  if (status < 0) return status;

  return 0;
}

// enable address reuse for the socket
// when a socket is closed, its port might
// go into the TIME_WAIT state, preventing
// the application from immediately re-binding
// to the same port
// the SO_REUSEADDR option allows the socket to
// bind to an address that is in the TIME_WAIT
// state, enabling faster restarts of the application
int set_reuse_addr(int fd) {
  int reuse = 1;
  return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}

struct epoll_event events[MAX_CONN_SIZE] = {0};

int main() {
  int port = 8081;
  int connections = 0; // number of connections

  int epfd = epoll_create(1);

  struct sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(8001);
  addr.sin_addr.s_addr = htonl(INADDR_ANY);

  int socket_fd;
  char buffer[BUFFER_SIZE] = {0};
  while(1) {
    struct epoll_event ev;

    // creating connections until MAX_CONN_SIZE is reached
    if (connections < MAX_CONN_SIZE) {
      socket_fd = socket(AF_INET, SOCK_STREAM, 0);
      if (socket_fd == -1) {
        perror("socket");
        return -1;
      }

      int status = connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr));

      if (status == -1) {
        perror("socket connect");
        return -1;
      }

      connections++;

      set_non_block(socket_fd);
      set_reuse_addr(socket_fd);

      // ensure there is no old data in the buffer
      // then write new data to it
      memset(buffer, 0, sizeof(buffer));
      sprintf(buffer, "client connections %d\n", connections);
      // send it
      send(socket_fd, buffer, strlen(buffer), 0);

      ev.data.fd = socket_fd;
      ev.events = EPOLLIN; // after the send above, we monitor for data from server
      epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
    }

    // every once in a while process events that are ready
    if (connections % 1000 == 999 || connections >= MAX_CONN_SIZE) {
      // timeout is 100ms
      int nfds = epoll_wait(epfd, events, connections, 100);

      int i;
      for (i = 0; i < nfds; i++) {
        int fd = events[i].data.fd;

        if (events[i].events & EPOLLOUT) {
          send(socket_fd, buffer, strlen(buffer), 0);
        } else if (events[i].events & EPOLLIN) {
          char rbuffer[BUFFER_SIZE] = {0};
          ssize_t len = recv(socket_fd, rbuffer, BUFFER_SIZE, 0);

          if (len == 0) { // EOF, close connection
            printf("EOF, closing fd %d\n", fd);
            connections--;
            close(fd);
          } else  {
            // ignore these errors
            if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
            printf("error %d, closing fd %d\n", errno, fd);
            connections--;
            close(fd);
          }
        } else {
          printf("expected error %d for fd %d\n", errno, fd);
          close(fd);
        }
      }
    }

    usleep(50000); // sleep for 5ms
  }
}

Core dumped?

You may encounter a Segmentation fault (core dumped) error if you move events from global scope to main().

When events is declared in main(), it is allocated on the stack, which has limited memory and may not be sufficient for a million epoll_event structs. To address this, we declare events as a global array, which is allocated in a memory segment that is not restricted by the stack size. Alternatively, you can check the current stack size limit using ulimit -a and, if needed, temporarily increase it with ulimit -s.

TCP Server

tcp_server.c
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>

#include <netinet/in.h>

#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1000000

int listen_fd = -1;

void handle_signal(int sig) {
  if (listen_fd >= 0) {
    printf("\nserver closing...\n");
    close(listen_fd);
  }
  exit(0);
}

int init_server(unsigned int port) {
  listen_fd = socket(AF_INET, SOCK_STREAM, 0);

  if (listen_fd == -1) {
    perror("socket");
    return -1;
  }

  // define server address: ip + port 
  struct sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  // bind and listen for connection
  int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));

  if (status == -1) {
    perror("bind socket");
    return -1;
  }

  listen(listen_fd, 10); // allows 10 queued connections

  return listen_fd;
}

int epfd;

typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);

// fd and its callbacks, along with buffers to support cbs
struct connection {
  int fd;

  char rbuffer[BUFFER_LENGTH];
  int rlength;

  char wbuffer[BUFFER_LENGTH];
  int wlength;

  RCALLBACK send_callback;
  // handle EPOLLIN
  // for listen_fd it's used for accepting connection
  // for client_fd it's used for receiving data
  RCALLBACK recv_callback;
};

// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};

// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
  int ctl;
  if (flag) ctl = EPOLL_CTL_ADD;
  else ctl = EPOLL_CTL_MOD;
  struct epoll_event ev;
  ev.events = events;
  ev.data.fd = fd;
  return epoll_ctl(epfd, ctl, fd, &ev);
}

int register_event(int fd, int events) {
  if (fd < 0) return -1; // otherwise may get "Segmentation fault (core dumped)"
  connection_list[fd].fd = fd;
  connection_list[fd].recv_callback = recv_cb;
  connection_list[fd].send_callback = send_cb;

  // reset buffers since fd can be reused
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].rlength = 0;
  memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].wlength = 0;

  // listen for event (readability)
  set_event(fd, events, 1);
  return 0;
}

int accept_cb(int fd) { // listen_fd
  struct sockaddr_in client_addr;
  socklen_t len = sizeof(client_addr);
  int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);

  if (client_fd == -1) {
    perror("accept");
    return -1;
  }

  register_event(client_fd, EPOLLIN);


  printf("connection accepted, fd %d\n", client_fd);

  return 0; // success
}

int recv_cb(int fd) {
  int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);
  printf("[fd %d] received: '%s' (%d bytes)\n", fd, connection_list[fd].rbuffer, count);

  if (count == 0) { // EOF, close connection
    printf("disconnect fd %d\n", fd);
    close(fd);
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
    return 0;
  }

  connection_list[fd].rlength = count;

  // echo to wbuffer (response)
  connection_list[fd].wlength = count;
  memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
  // clear the read buffer for the next read otherwise it may contains data from the last ready
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);

  set_event(fd, EPOLLOUT, 0); // modify

  return count;
}

int send_cb(int fd) {
  int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);

  set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance

  return count;
}

int main() {
  int listen_fd = init_server(8001);
  if (listen_fd == -1) return -1;

  signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)

  epfd = epoll_create(1);

  connection_list[listen_fd].fd = listen_fd;
  connection_list[listen_fd].recv_callback = accept_cb;
  set_event(listen_fd, EPOLLIN, 1);

  while(1) { // main loop
    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, -1);

    int i;
    for (i = 0; i < nready; i++) {
      int fd = events[i].data.fd;

      // may handle read and write in the same iteration

      if (events[i].events & EPOLLIN) {
        connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
      }

      if (events[i].events & EPOLLOUT) {
        connection_list[fd].send_callback(fd);
      }
    }
  }
}

For the server, we increased the CONNECTION_SIZE to one million with some minor fixes. For example, we introduced a fix in the register_event function to ensure it always checks if the fd is valid before proceeding.

Testing

Environment

I'm testing on WSL2 and have allocated 14 GiB of memory to it.

$ uname -a
Linux tpc1 5.15.167.4-microsoft-standard-WSL2 #1 SMP Tue Nov 5 00:21:55 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 24.04.1 LTS
Release:        24.04
Codename:       noble

Too many open files

Too many open files

Too many open files

When starting the tcp_server and then the tcp_client, you will encounter accept: Too many open files on the server side and socket: Too many open files on the client side once the total number of file descriptors (fds) reaches 1024. This happens because the default maximum number of open files is set to 1024, which can be checked using ulimit -a. Since there will be at least one million fds on either the client or server, we need to increase this limit to a value slightly above that, such as 1048576 (1024 * 1024), using ulimit -n 1048576.

After increasing the file descriptor limits in both the client and server sessions, let's try running the setup again.

Port exhaustion

Cannot assign requested address

Server Side Cannot assign required addresses - server

Client Side Cannot assign required addresses - client

Now, we can create more than 1024 fds on both the client and server, but the system stops creating new connections after reaching about 28,000. This issue occurs because, in socket programming, a client typically does not explicitly assign a port when creating a socket. Instead, the operating system automatically assigns an ephemeral (temporary) port from a predefined range, which by default is 32768 to 60999 on my Linux. Event though we've increased the number of open file descriptors, we're still limited by the range of ephemeral ports, which can accommodate only around 28,000 connections at most.

A practical solution is for the server to create a range of ports for clients to connect to, rather than relying on a single port. By distributing connections across multiple server ports, the load is spread out, reducing competition for ephemeral ports and allowing clients to connect without exhausting the limited port range.

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>

#include <netinet/in.h>

#include <sys/time.h>

#define BUFFER_LENGTH 1024
// the client will try to create a million conection
// the connection size needs to be bigger than a million
// because we are also using fd as index and new fd doesn't
// start from 0
#define CONNECTION_SIZE 1012000

// for port cycling
#define MAX_PORTS 30

#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

struct timeval ts;

int listen_fds[MAX_PORTS] = {0};

void handle_signal(int sig) {
  int i;
  printf("\nserver closing...\n");
  for (i = 0; i < MAX_PORTS; i++) {
    int fd = listen_fds[i];
    if (fd > 0) {
      printf("[%d] closing fd %d\n", i, fd);
      close(fd);
    };
  }
  printf("bye\n");
  exit(0);
}

int init_server(unsigned int port) {
  int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

  if (listen_fd == -1) {
    perror("socket");
    return -1;
  }

  // define server address: ip + port 
  struct sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  // bind and listen for connection
  int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));

  if (status == -1) {
    perror("bind socket");
    return -1;
  }

  listen(listen_fd, 10); // allows 10 queued connections

  return listen_fd;
}

int epfd;

typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);

// fd and its callbacks, along with buffers to support cbs
struct connection {
  int fd;

  char rbuffer[BUFFER_LENGTH];
  int rlength;

  char wbuffer[BUFFER_LENGTH];
  int wlength;

  RCALLBACK send_callback;
  // handle EPOLLIN
  // for listen_fd it's used for accepting connection
  // for client_fd it's used for receiving data
  RCALLBACK recv_callback;
};

// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};

// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
  int ctl;
  if (flag) ctl = EPOLL_CTL_ADD;
  else ctl = EPOLL_CTL_MOD;
  struct epoll_event ev;
  ev.events = events;
  ev.data.fd = fd;
  return epoll_ctl(epfd, ctl, fd, &ev);
}

int register_event(int fd, int events) {
  if (fd < 0) return -1; // otherwise may get "Segmentation fault (core dumped)"
  connection_list[fd].fd = fd;
  connection_list[fd].recv_callback = recv_cb;
  connection_list[fd].send_callback = send_cb;

  // reset buffers since fd can be reused
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].rlength = 0;
  memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].wlength = 0;

  // listen for event (readability)
  set_event(fd, events, 1);
  return 0;
}

int connections = 0;

int accept_cb(int fd) { // listen_fd
  struct sockaddr_in client_addr;
  socklen_t len = sizeof(client_addr);
  int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);

  if (client_fd == -1) {
    perror("accept");
    return -1;
  }

  register_event(client_fd, EPOLLIN);

  connections++;
  // shows the number of connections 
  // and the time to create them
  if (connections % 1000 == 0 || connections > (CONNECTION_SIZE - 1000)) {
    struct timeval now;
    gettimeofday(&now, NULL);
    int ts_elapsed = TIME_SUB_MS(now, ts);
    memcpy(&ts, &now, sizeof(struct timeval));

    printf("number of connections: %d, time elapsed: %dms\n", connections, ts_elapsed);
  }

  return 0; // success
}

int recv_cb(int fd) {
  int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);

  if (count == 0) { // EOF, close connection
    printf("disconnect fd %d\n", fd);
    close(fd);
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
    return 0;
  }

  connection_list[fd].rlength = count;

  // echo to wbuffer (response)
  connection_list[fd].wlength = count;
  memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
  // clear the read buffer for the next read otherwise it may contains data from the last ready
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);

  set_event(fd, EPOLLOUT, 0); // modify

  return count;
}

int send_cb(int fd) {
  int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);
  // printf("fd %d send %d bytes\n", fd, count);

  set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance

  return count;
}

int main() {
  signal(SIGPIPE, SIG_IGN); // trap broken pipes
  signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)

  gettimeofday(&ts, NULL);

  unsigned short port = 8001;
  epfd = epoll_create(1);
  int i;
  for (i = 0; i < MAX_PORTS; i++) {
    int listen_fd = init_server(port + i);
    if (listen_fd == -1) return -1;

    listen_fds[i] = listen_fd; // register for cleanup

    connection_list[listen_fd].fd = listen_fd;
    connection_list[listen_fd].recv_callback = accept_cb;
    set_event(listen_fd, EPOLLIN, 1);
  }


  while(1) { // main loop
    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, -1);

    int i;
    for (i = 0; i < nready; i++) {
      int fd = events[i].data.fd;

      // may handle read and write in the same iteration

      if (events[i].events & EPOLLIN) {
        connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
      } 
      if (events[i].events & EPOLLOUT) {
        connection_list[fd].send_callback(fd);
      }
    }
  }
}
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>

// the max number of concurrent connections, a million
#define MAX_CONN_SIZE 1000000
#define BUFFER_SIZE 128

// for port cycling
#define MAX_PORTS 30

// sets the socket to non-blocking mode
// by default, fd is in blocking mode.
// operations like read or write will block (wait indefinetly)
// if no data is available
int set_non_block(int fd) {
  // get the current fd flags
  int flags = fcntl(fd, F_GETFL, 0);
  if (flags < 0) return flags;

  // add the non-blocking flag to flags
  flags |= O_NONBLOCK;
  // apply the new flags
  int status = fcntl(fd, F_SETFL, flags);
  if (status < 0) return status;

  return 0;
}

// enable address reuse for the socket
// when a socket is closed, its port might
// go into the TIME_WAIT state, preventing
// the application from immediately re-binding
// to the same port
// the SO_REUSEADDR option allows the socket to
// bind to an address that is in the TIME_WAIT
// state, enabling faster restarts of the application
int set_reuse_addr(int fd) {
  int reuse = 1;
  return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}

struct epoll_event events[MAX_CONN_SIZE] = {0};

int main() {
  // prevent unexpected terminations due to 
  // broken pipes or closed sockets
  signal(SIGPIPE, SIG_IGN);

  int port = 8001;
  int connections = 0; // number of connections

  int epfd = epoll_create(1);

  struct sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = htonl(INADDR_ANY);

  int socket_fd;
  char buffer[BUFFER_SIZE] = {0};

  // for port cycling
  // tracks which port to use next
  int port_idx = 0;
  while(1) {
    struct epoll_event ev;

    // creating connections until MAX_CONN_SIZE is reached
    if (connections < MAX_CONN_SIZE) {
      socket_fd = socket(AF_INET, SOCK_STREAM, 0);
      if (socket_fd == -1) {
        perror("socket");
        return -1;
      }

      // port cycling
      addr.sin_port = htons(port + port_idx % MAX_PORTS);
      port_idx++;

      int status = connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr));

      if (status == -1) {
        perror("socket connect");
        return -1;
        // continue;
      }

      connections++;

      set_non_block(socket_fd);
      set_reuse_addr(socket_fd);

      // ensure there is no old data in the buffer
      // then write new data to it
      memset(buffer, 0, sizeof(buffer));
      sprintf(buffer, "client connections %d\n", connections);
      // send it
      send(socket_fd, buffer, strlen(buffer), 0);

      ev.data.fd = socket_fd;
      ev.events = EPOLLIN; // after the send above, we monitor for data from server
      epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
    }

    printf("connections %d\n", connections);

    // every once in a while process events that are ready
    if (connections % 1000 == 999 || connections >= MAX_CONN_SIZE) {
      // timeout is 100ms
      int nfds = epoll_wait(epfd, events, connections, 100);

      int i;
      for (i = 0; i < nfds; i++) {
        int fd = events[i].data.fd;

        if (events[i].events & EPOLLOUT) {
          send(socket_fd, buffer, strlen(buffer), 0);
        } else if (events[i].events & EPOLLIN) {
          char rbuffer[BUFFER_SIZE] = {0};
          ssize_t len = recv(socket_fd, rbuffer, BUFFER_SIZE, 0);

          if (len == 0) { // EOF, close connection
            printf("EOF, closing fd %d\n", fd);
            connections--;
            close(fd);
          } else {
            // ignore these errors
            if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
            printf("error %d, closing fd %d\n", errno, fd);
            connections--;
            close(fd);
          }
        } else {
          printf("expected error %d for fd %d\n", errno, fd);
          close(fd);
        }
      }
    }

    usleep(50); // sleep for 50 microseconds
  }
}

sysctl.conf

We increased the maximum number of open files using ulimit -n 1048576 in both the client and server sessions, but we may still encounter a "too many open files" error at some point while running the program. In such cases, we can increase the system-wide limit by setting fs.file-max in /etc/sysctl.conf and applying the changes with sudo sysctl -p. Additionally, we may notice performance degradation at a certain number of concurrent connections. To address this, we can further optimize system settings in /etc/sysctl.conf to enhance the system's ability to handle high traffic and improve scalability.

/etc/sysctl.conf
# Additional settings
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_wmem = 1024 1024 2048
net.ipv4.tcp_rmem = 1024 1024 2048
fs.file-max = 2097152
net.nf_conntrack_max = 2097152
net.netfilter.nf_conntrack_tcp_timeout_established = 1200

Result

In my current settings, it successfully creates connections without significant performance degradation until reaching 968,000 connections, after which performance suddenly drops.

result2

Then it slowly reaches 1,000,000 concurrent connections.

result3

The performance degradation seems to start when CPU usage reaches 200%.

I also tested with a t2.2xlarge EC2 instance, which has 8 vCPUs and 32 GiB of memory, and observed similar results.

ec2-1 ec2-2 ec2-3

Note

There are several factors to consider when testing the server, KV storage, and other components, including concurrency (the number of simultaneous connections), QPS (Queries Per Second, which measures request rate), latency, and test cases that cover business logic.