Reactor
The Reactor pattern demultiplexes and dispatches I/O events to corresponding handlers. It enables non-blocking I/O by ensuring that a single thread, or event loop, can handle many connections or requests without waiting for individual operations to complete.
The Reactor pattern can be broken into three main parts:
-
Event Demultiplexer: A central component (e.g.,
select
,poll
, orepoll
in Linux) that monitors multiple file descriptors (such as sockets or files) for specific events, like read readiness, write readiness, or errors. -
Reactor (Event Loop): The core event loop that uses the Event Demultiplexer to wait for events. When an event occurs (e.g., data arriving on a socket), the Reactor identifies the event and delegates it to the appropriate handler.
-
Handlers (Callbacks): Application-level logic designed to process specific events. When the Reactor dispatches an event, the corresponding handler is executed to handle tasks such as processing a client request or sending a response.
Reactor with epoll
Let's refactor the epoll
example from I/O Multiplexing using Reactor pattern.
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024
int listen_fd = -1;
void handle_signal(int sig) {
if (listen_fd >= 0) {
printf("\nserver closing...\n");
close(listen_fd);
}
exit(0);
}
int init_server(unsigned int port) {
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return -1;
}
// define server address: ip + port
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind and listen for connection
int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
if (status == -1) {
perror("bind socket");
return -1;
}
listen(listen_fd, 10); // allows 10 queued connections
return listen_fd;
}
int epfd;
typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);
// fd and its callbacks, along with buffers to support cbs
struct connection {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
// handle EPOLLIN
// for listen_fd it's used for accepting connection
// for client_fd it's used for receiving data
RCALLBACK recv_callback;
};
// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};
// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
int ctl;
if (flag) ctl = EPOLL_CTL_ADD;
else ctl = EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
return epoll_ctl(epfd, ctl, fd, &ev);
}
int register_event(int fd, int events) {
connection_list[fd].fd = fd;
connection_list[fd].recv_callback = recv_cb;
connection_list[fd].send_callback = send_cb;
// reset buffers since fd can be reused
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
connection_list[fd].rlength = 0;
memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
connection_list[fd].wlength = 0;
// listen for event (readability)
set_event(fd, events, 1);
return 0;
}
int accept_cb(int fd) { // listen_fd
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);
if (client_fd == -1) {
perror("accept");
return -1;
}
register_event(client_fd, EPOLLIN);
printf("connection accepted, fd %d\n", client_fd);
return 0; // success
}
int recv_cb(int fd) {
int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);
printf("[fd %d] received: '%s' (%d bytes)\n", fd, connection_list[fd].rbuffer, count);
if (count == 0) { // EOF, close connection
printf("disconnect fd %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
connection_list[fd].rlength = count;
// echo to wbuffer (response)
connection_list[fd].wlength = count;
memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
// clear the read buffer for the next read otherwise it may contains data from the last ready
memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
set_event(fd, EPOLLOUT, 0); // modify
return count;
}
int send_cb(int fd) {
int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance
return count;
}
int main() {
int listen_fd = init_server(8001);
if (listen_fd == -1) return -1;
signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)
epfd = epoll_create(1);
connection_list[listen_fd].fd = listen_fd;
connection_list[listen_fd].recv_callback = accept_cb;
set_event(listen_fd, EPOLLIN, 1);
while(1) { // main loop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i;
for (i = 0; i < nready; i++) {
int fd = events[i].data.fd;
// may handle read and write in the same iteration
if (events[i].events & EPOLLIN) {
connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
}
if (events[i].events & EPOLLOUT) {
connection_list[fd].send_callback(fd);
}
}
}
}
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main() {
// create a tcp socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket");
return -1;
}
// server address (ip + port)
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8001);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int status = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status == -1) {
perror("socket connection");
return -1;
}
char buffer[1024] = {0};
int nread;
while ((nread = read(STDIN_FILENO, buffer, sizeof(buffer))) > 0) {
/*int count = send(fd, &buffer, sizeof(buffer), 0);*/ // this will always send 1024 bytes
printf("[client] buffer is '%s'\n", buffer);
int count = send(fd, &buffer, strlen(buffer), 0);
printf("[client] send %d bytes: %s\n", count, buffer);
count = recv(fd, &buffer, sizeof(buffer), 0);
printf("[client] received %d bytes: %s\n", count, buffer);
// reset the buffer for next read
memset(buffer, 0, sizeof(buffer));
}
printf("[client] closing connection\n");
close(fd);
return 0;
}
Details all the details and debatable abstractions, the essential idea is to map epoll
events
to their corresponding callbacks:
EPOLLIN
triggers eitheraceept
for listen fd orrecv
for client fdEPLLOUT
triggerssend
Info
We've added a few more details to the tcp_client
and tcp_server
examples compared to the ones
in I/O Multiplexing and Network I/O.
For instance, the read buffer is always cleared first, and SIGINT
is handled gracefully to prevent
"Address already in use".