I/O Multiplexing
Using multiple threads to handle concurrent requests is a straightforward approach, as demonstrated in Network I/O. However, this method has limitations. A process can only create a finite number of threads, and performance tends to degrade as more resources are allocated to create and manage threads. This includes maintaining their states and coordinating their execution, which is not efficient. This approach can probably handle 1000 concurrent requests, but not much more. Let's discuss alternative methods, such as select
, poll
, and epoll
.
Fd lifecycle
File descriptors, or fd
s, are integers that serve as handles to system resources such as files and sockets. Each process has a file descriptor table maintained by the kernel, which maps fd
s (small integer from 0) to files, sockets, or other resources in the kernel. fd
0, 1, and 2 are reserved for stdin, stdout, and stderr respectively.
When a file or socket is opened, a new fd
is allocated. The kernel scans the file descriptor table for the lowest unused fd
and assigns it to the new resource (for example, 3
, 4
, 5
, etc).
When a process closes a fd
using close(fd)
, the kernel removes the mapping from the file descriptor table. After a fd
is closed, it becomes available for reuse. The next call to open a file or create a socket will likely reuse the lowest available fd
.
When a process terminates, the kernel automatically closes all open fd
s associated with that process.
TIME_WAIT
File descriptors can be reused immediately after being closed, and the system does not impose any wait period. The TIME_WAIT state specifically applies to TCP connections and ensures proper closure of the connection. During this time, the associated port cannot be reused, but the file descriptor can be reused once the socket is closed.
select
The select
system call is an efficient method of Multiplexing I/O operations, allowing a program to monitor multiple file descriptors and wait until one or more of them are ready for some I/O operation, like reading, writing, or error checking.
#include <sys/select.h>
// nfds: the highest file descriptor value + 1.
// for example, if you're monitoring fd 3, 5, and 7,
// you would set nfds to 8
//
// readfds: a pointer to a set of fds that you want to check
// for readability (if data is available to read)
// or NULL
//
// writefds: a pointer to a set of fds that you want to check
// for writability (if fd is read for writing data)
// or NULL
//
// exceptfds: a pointer to a set of fds to check for exceptional
// conditions (error conditions) or NULL
//
// timeout: the maximum amount of time select should wait for
// one or more fds to become ready
// if NULL, select will block indefinitely until at least
// one fd is ready.
// if 0, select will return immediately, performing a non-blocking check
//
// Returns:
// positive value: the number of fds that are ready for one of the
// requested I/O operations (read, write, or exception)
// 0: no fd is ready, select returns after the specified timeout expires
// -1: an error occurred, and errno is set to indicate the error
int select(int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
fd_set
is a data structure used to represent a set of file descriptors. FD_SET
,
FD_CLR
, FD_ISSET
, and FD_ZERO
macros are used to manipulate fd_set
.
#include <sys/select.h>
// removes the fd from fd set
void FD_CLR(fd, fd_set *fdset);
// checks whether the fd is part of the fd set
int FD_ISSET(fd, fd_set *fdset);
// adds the fd to the fd set
void FD_SET(fd, fd_set *fdset);
// clears all fds in the fd set
void FD_ZERO(fd_set *fdset);
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allow 10 queued connections
// client address info that will be filled by accept
struct sockaddr_in client_addr;
socklen_t client_addr_len;
fd_set fds, readfds;
FD_ZERO(&fds);
FD_SET(listen_fd, &fds); // fds is the initial set that only contains listen_fd
int fdmax = listen_fd; // the max fd is listen_fd since it's the only fd for now
while (1) {
readfds = fds; // readfds is used by the kernel in the select call
// fds stores its state in the application
// readfds is reset to fds before each iteration
// we only care about read fds
int nfds = select(fdmax + 1, &readfds, NULL, NULL, NULL);
if (nfds == -1) {
perror("select");
}
// one or more sockets are ready, check them one by one
for (int fd = 0; fd < (fdmax + 1); fd++) {
if (FD_ISSET(fd, &readfds)) { // fd is ready for reading
if (fd == listen_fd) { // a request for new connection
// accept the connection, which creates a new fd
int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
continue;
}
// update the fds and fdmax
FD_SET(client_fd, &fds);
if (client_fd > fdmax) fdmax = client_fd;
printf("connection accepted, fd %d\n", client_fd);
} else { // data from existing connection is ready
// receive the data
char buffer[1024] = {0};
int count = recv(fd, buffer, sizeof(buffer), 0);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
FD_CLR(fd, &fds);
continue;
}
printf("[fd %d] received: %s\n", fd, buffer);
// send it back
count = send(fd, buffer, sizeof(buffer), 0);
printf("[fd %d] send %d bytes\n", fd, count);
}
}
}
}
}
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket");
return -1;
}
// server address (ip + port)
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int status = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("socket connection");
return -1;
}
char buffer[1024] = {0};
int nread;
while ((nread = read(STDIN_FILENO, buffer, sizeof(buffer))) > 0) {
int count = send(fd, &buffer, sizeof(buffer), 0);
printf("[client] send %d bytes: %s\n", count, buffer);
count = recv(fd, &buffer, sizeof(buffer), 0);
printf("[client] received %d bytes: %s\n", count, buffer);
}
printf("[client] closing connection\n");
close(fd);
return 0;
}
Compared to a multi-threaded model, select
offers the advantage of
lower resource usage, as it operates within a single thread, avoiding
the overhead of creating and managing multiple threads. It eliminates
the need for complex synchronization mechanisms.
However, select
is limited by FD_SETSIZE
, which is typically 1024
on many systems. It uses fd_set
bit masks for file descriptors, requring
linear scans to check readiness, which reduces efficiency as the number
of file descriptors increases.
Additionally, the file descriptor sets (readfds
, writefds
, and exceptfds
)
are modified in-place by the kernel, requiring their state to be saved in another
set and reinitialized before each call in every loop, which adds further overhead.
poll
poll
uses an array of struct pollfd
instead of fd_set
, avoiding the FD_SETSIZE
limit,
though it is still constrained by available memory. Unlike select
, poll
doesn't modify
its input array, eliminating the need for reinitialization in each loop, making it more efficient.
However, like select
, it still performs a linear scan in the kernel, meaning it doesn't scale well
for very large numbers of file descriptors, though it can be more efficient than select
in many cases.
struct pollfd {
int fd; // the file descriptor to be monitored
short events; // events to monitor (e.g. POLLIN, POLLOUT)
short revents; // events the actually occurred
};
#include <poll.h>
// fds: a pointer to an array of struct pollfd, where each struct pollfd
// represents a fd and the events you want to monitor
// nfds: the number of fds to monitor. or the maxfds+1 for lesser iteration
// timeout: the maximum amount of time to wait for events, in ms
// if greater than 0, it specifies the maximum wait time
// if 0, poll returns immediately (non-blocking)
// if -1, poll blocks indefinitely until at least one event occurs
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <poll.h>
#include <sys/types.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allow 10 queued connections
// client address info that will be filled by accept
struct sockaddr_in client_addr;
socklen_t client_addr_len;
struct pollfd fds[1024] = {0};
fds[listen_fd].fd = listen_fd;
fds[listen_fd].events = POLLIN;
int fdmax = listen_fd; // the max fd is listen_fd since it's the only fd for now
while (1) {
int nfds = poll(fds, fdmax + 1, -1);
if (nfds == -1) {
perror("poll");
}
// one or more sockets are ready, check them one by one
for (int fd = 0; fd < (fdmax + 1); fd++) {
if (fds[fd].revents & POLLIN) { // fd is ready for reading
if (fd == listen_fd) { // a request for new connection
// accept the connection, which creates a new fd
int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
continue;
}
// update the fd array for monitoring
fds[client_fd].fd = client_fd;
fds[client_fd].events = POLLIN;
if (client_fd > fdmax) fdmax = client_fd;
printf("connection accepted, fd %d\n", client_fd);
} else { // data from existing connection is ready
// receive the data
char buffer[1024] = {0};
int count = recv(fd, buffer, sizeof(buffer), 0);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
fds[fd].fd = -1; // remove fd from the array for monitoring
fds[fd].events = 0;
continue;
}
printf("[fd %d] received: %s\n", fd, buffer);
// send it back
count = send(fd, buffer, sizeof(buffer), 0);
printf("[fd %d] send %d bytes\n", fd, count);
}
}
}
}
}
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket");
return -1;
}
// server address (ip + port)
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int status = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("socket connection");
return -1;
}
char buffer[1024] = {0};
int nread;
while ((nread = read(STDIN_FILENO, buffer, sizeof(buffer))) > 0) {
int count = send(fd, &buffer, sizeof(buffer), 0);
printf("[client] send %d bytes: %s\n", count, buffer);
count = recv(fd, &buffer, sizeof(buffer), 0);
printf("[client] received %d bytes: %s\n", count, buffer);
}
printf("[client] closing connection\n");
close(fd);
return 0;
}
epoll
epoll
provides a more efficient mechanism than select
and poll
,
especially in systems with a large number of monitored connections.
Unlike select
and poll
, which requires scanning all file descriptors,
epoll
only returns the file descriptors that have triggered events,
making it more efficient for handling large number of connections.
It's O(1) event notification mechanism ensures consistent performance,
regardless of the number of file descriptors being monitored. Additionally,
epoll
maintains a persistent interest list, so file descriptors only
need to be registered once, avoiding reinitialization in each loop.
A useful comparison is a restaurant pager system: instead of the waiter (application) scanning every table (file descriptor) to check if an order (event) is ready, each table gets a pager. Only the tables with updates buzz their pagers, notifying the waiter. This event-driven approach focuses only on active events, improving efficiency, especially when managing many connections.
struct epoll_event {
__uint32_t events; // a bitmask specifying the types of events the application wants to monitor
union {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} data; // union for storing additional data alongside the event
};
// example events
EPOLLIN // ready for reading
EPOLLOUT // ready for writing
EPOLLERR // error condition
EPOLLHUB // hang-up (conneciton closed)
// example usage
struct epoll_event ev;
ev.events = EPOLLIN; // monitor for readability
ev.data.fd = fd; // store the file descriptor
epoll_create
, epoll_ctl
, and epoll_wait
.
// creates an epoll instance and returns a fd or managing events
// it is used to initialize the event notification system
// size is ignored in modern implementation as long as it's not 0
// returns a non-negative ineger for a fd referring to the epoll instance
// or -1 on error
int epoll_create(int size);
// waits for events to occur on the monitored file descriptors.
// it blocks until one or more file descriptors are ready for
// specified event, or a timeout occurs.
// returns the number of event returned, or -1 on error
// a timeout of -1 blocks indefinitely until events occur
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// adds, modifies, or removes file descriptors to/from the epoll instance
// this system call allows you to specify which events you want to monitor for each fd
// epfd: the fd of the epoll instance created by epoll_create
// op: EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL
// fd: the fd to be added, modified, or deleted
// event: a pointer to epoll_event that contains events to monitor for the fd
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allow 10 queued connections
// client address info that will be filled by accept
struct sockaddr_in client_addr;
socklen_t client_addr_len;
// create an epoll instance
int epfd = epoll_create(1);
// add listen_fd to epoll instance to monitor readiness (redability)
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);
while (1) {
// monitor 1024 fds in total here
struct epoll_event events[1024] = {0};
// waits indefinetly until one or more events
int nready = epoll_wait(epfd, events, 1024, -1); // maxevents is 1024 not sizeof(events)
if (nready == -1) {
perror("epoll");
}
// one or more sockets are ready, check them one by one
// see we are loop only over nready not fdmax+1 as we do in select and poll, thus more efficient
// and we are using index not fd in the loop statement
int i = 0;
for (i = 0; i < nready; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLIN) { // fd is ready for reading
if (fd == listen_fd) { // a request for new connection
// accept the connection, which creates a new fd
int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
continue;
}
// add newly created client_fd to epoll instance for monitoring readability
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
// no need to update fdmax since the iteration is through nready not fdmax+1
printf("connection accepted, fd %d\n", client_fd);
} else { // data from existing connection is ready
// receive the data
char buffer[1024] = {0};
int count = recv(fd, buffer, sizeof(buffer), 0);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
// remove this fd from epoll monitoring
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // the event is NULL since it's a removal
continue;
}
printf("[fd %d] received: %s\n", fd, buffer);
// send it back
count = send(fd, buffer, sizeof(buffer), 0);
printf("[fd %d] send %d bytes\n", fd, count);
}
}
}
}
}
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket");
return -1;
}
// server address (ip + port)
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int status = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("socket connection");
return -1;
}
char buffer[1024] = {0};
int nread;
while ((nread = read(STDIN_FILENO, buffer, sizeof(buffer))) > 0) {
int count = send(fd, &buffer, sizeof(buffer), 0);
printf("[client] send %d bytes: %s\n", count, buffer);
count = recv(fd, &buffer, sizeof(buffer), 0);
printf("[client] received %d bytes: %s\n", count, buffer);
}
printf("[client] closing connection\n");
close(fd);
return 0;
}
Reactor
select
, poll
, and epoll
are system calls primarily focused on managing I/O by providing notifications for I/O events.
These mechanisms allow applications to detect when file descriptors are ready for actions like reading or writing.
While epoll
is particularly efficient at waiting for I/O events on multiple file descriptors,
the focus at this level is on monitoring and detecting these events, rather than handling the higher-level business logic.
The Reactor pattern addresses this by shifting the focus from low-level I/O management to higher-level event management.
It is an event-driven design pattern that organizes and abstracts the handling of events triggered by an event demultiplexer like epoll
, select
, or poll
.
When events occur, the Reactor maps them to specific event handlers, where the actual business logic for processing those events is defined.
This separation allows for scalable, efficient, and more maintainable systems that can handle asynchronous events without complicating the core application logic.