Skip to content

Reactor

The Reactor pattern demultiplexes and dispatches I/O events to corresponding handlers. It enables non-blocking I/O by ensuring that a single thread, or event loop, can handle many connections or requests without waiting for individual operations to complete.

The Reactor pattern can be broken into three main parts:

  1. Event Demultiplexer: A central component (e.g., select, poll, or epoll in Linux) that monitors multiple file descriptors (such as sockets or files) for specific events, like read readiness, write readiness, or errors.

  2. Reactor (Event Loop): The core event loop that uses the Event Demultiplexer to wait for events. When an event occurs (e.g., data arriving on a socket), the Reactor identifies the event and delegates it to the appropriate handler.

  3. Handlers (Callbacks): Application-level logic designed to process specific events. When the Reactor dispatches an event, the corresponding handler is executed to handle tasks such as processing a client request or sending a response.

Reactor with epoll

Let's refactor the epoll example from I/O Multiplexing using Reactor pattern.

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>

#include <netinet/in.h>

#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024

int listen_fd = -1;

void handle_signal(int sig) {
  if (listen_fd >= 0) {
    printf("\nserver closing...\n");
    close(listen_fd);
  }
  exit(0);
}

int init_server(unsigned int port) {
  listen_fd = socket(AF_INET, SOCK_STREAM, 0);

  if (listen_fd == -1) {
    perror("socket");
    return -1;
  }

  // define server address: ip + port 
  struct sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  // bind and listen for connection
  int status = bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));

  if (status == -1) {
    perror("bind socket");
    return -1;
  }

  listen(listen_fd, 10); // allows 10 queued connections

  return listen_fd;
}

int epfd;

typedef int (*RCALLBACK)(int fd);
int accept_cb(int);
int recv_cb(int);
int send_cb(int);

// fd and its callbacks, along with buffers to support cbs
struct connection {
  int fd;

  char rbuffer[BUFFER_LENGTH];
  int rlength;

  char wbuffer[BUFFER_LENGTH];
  int wlength;

  RCALLBACK send_callback;
  // handle EPOLLIN
  // for listen_fd it's used for accepting connection
  // for client_fd it's used for receiving data
  RCALLBACK recv_callback;
};

// where we define fd and its callbacks for handling events
// use fd as index
struct connection connection_list[CONNECTION_SIZE] = {0};

// flag determines whether ADD (if non-zero) or MODIFY
int set_event(int fd, int events, int flag) {
  int ctl;
  if (flag) ctl = EPOLL_CTL_ADD;
  else ctl = EPOLL_CTL_MOD;
  struct epoll_event ev;
  ev.events = events;
  ev.data.fd = fd;
  return epoll_ctl(epfd, ctl, fd, &ev);
}

int register_event(int fd, int events) {
  connection_list[fd].fd = fd;
  connection_list[fd].recv_callback = recv_cb;
  connection_list[fd].send_callback = send_cb;

  // reset buffers since fd can be reused
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].rlength = 0;
  memset(connection_list[fd].wbuffer, 0, BUFFER_LENGTH);
  connection_list[fd].wlength = 0;

  // listen for event (readability)
  set_event(fd, events, 1);
  return 0;
}

int accept_cb(int fd) { // listen_fd
  struct sockaddr_in client_addr;
  socklen_t len = sizeof(client_addr);
  int client_fd = accept(fd, (struct sockaddr *)&client_addr, &len);

  if (client_fd == -1) {
    perror("accept");
    return -1;
  }

  register_event(client_fd, EPOLLIN);


  printf("connection accepted, fd %d\n", client_fd);

  return 0; // success
}

int recv_cb(int fd) {
  int count = recv(fd, connection_list[fd].rbuffer, BUFFER_LENGTH, 0);
  printf("[fd %d] received: '%s' (%d bytes)\n", fd, connection_list[fd].rbuffer, count);

  if (count == 0) { // EOF, close connection
    printf("disconnect fd %d\n", fd);
    close(fd);
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
    return 0;
  }

  connection_list[fd].rlength = count;

  // echo to wbuffer (response)
  connection_list[fd].wlength = count;
  memcpy(connection_list[fd].wbuffer, connection_list[fd].rbuffer, connection_list[fd].wlength);
  // clear the read buffer for the next read otherwise it may contains data from the last ready
  memset(connection_list[fd].rbuffer, 0, BUFFER_LENGTH);

  set_event(fd, EPOLLOUT, 0); // modify

  return count;
}

int send_cb(int fd) {
  int count = send(fd, connection_list[fd].wbuffer, connection_list[fd].wlength, 0);

  set_event(fd, EPOLLIN, 0); // modify, fd is already in epoll instance

  return count;
}

int main() {
  int listen_fd = init_server(8001);
  if (listen_fd == -1) return -1;

  signal(SIGINT, handle_signal); // handle Ctrl+C (SIGINT)

  epfd = epoll_create(1);

  connection_list[listen_fd].fd = listen_fd;
  connection_list[listen_fd].recv_callback = accept_cb;
  set_event(listen_fd, EPOLLIN, 1);

  while(1) { // main loop
    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, -1);

    int i;
    for (i = 0; i < nready; i++) {
      int fd = events[i].data.fd;

      // may handle read and write in the same iteration

      if (events[i].events & EPOLLIN) {
        connection_list[fd].recv_callback(fd); // either accept or recv depending on lt's listen_fd or client_fd
      }

      if (events[i].events & EPOLLOUT) {
        connection_list[fd].send_callback(fd);
      }
    }
  }

}
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main() {
  // create a tcp socket
  int fd = socket(AF_INET, SOCK_STREAM, 0);

  if (fd == -1) {
    perror("socket");
    return -1;
  }

  // server address (ip + port)
  struct sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(8001);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  int status = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

  if (status == -1) {
    perror("socket connection");
    return -1;
  }

  char buffer[1024] = {0};
  int nread;
  while ((nread = read(STDIN_FILENO, buffer, sizeof(buffer))) > 0) {
    /*int count = send(fd, &buffer, sizeof(buffer), 0);*/ // this will always send 1024 bytes
    printf("[client] buffer is '%s'\n", buffer);
    int count = send(fd, &buffer, strlen(buffer), 0);
    printf("[client] send %d bytes: %s\n", count, buffer);

    count = recv(fd, &buffer, sizeof(buffer), 0);
    printf("[client] received %d bytes: %s\n", count, buffer);

    // reset the buffer for next read
    memset(buffer, 0, sizeof(buffer));
  }

  printf("[client] closing connection\n");
  close(fd);
  return 0;
}
cmake_minimum_required(VERSION 3.20)

project(refactor_demo)

set(CMAKE_C_STANDARD 99)

add_executable(tcp_client tcp_client.c)
add_executable(tcp_server tcp_server.c)
cmake -Bbuild .
cmake --build build

./build/tcp_server
./build/tcp_client # client 1
./build/tcp_client # client 2

Details all the details and debatable abstractions, the essential idea is to map epoll events to their corresponding callbacks:

  • EPOLLIN triggers either aceept for listen fd or recv for client fd
  • EPLLOUT triggers send

Info

We've added a few more details to the tcp_client and tcp_server examples compared to the ones in I/O Multiplexing and Network I/O. For instance, the read buffer is always cleared first, and SIGINT is handled gracefully to prevent "Address already in use".