Web Server Application (epserver)

Introduction

The epserver program is a sample HTTP web server which handles HTTP request and provides web pages through HTTP response. epserver uses epoll (event poll) interface to receive network events from mTCP sockets.

Code Walkthrough

The following sections provide an explanation of the main components of the epserver code. All mOS library functions used in the sample code are prefixed with mtcp_ and are explained in detail in the Programmer’s Guide - mOS Programming API.

(1) The main() Function

The main() function performs the initialization and calls the execution threads for each CPU core.

The first task is to initialize mOS thread based on the mOS configuration file. fname holds the path to the mos.conf file which will be supplied to mtcp_init().

/* parse mos configuration file */
ret = mtcp_init(fname);

The next step is to initialize server settings which is done by calling GlobInitServer() function. The GlobInitServer() function reads the application-specific configuration file (epserver.conf) and loads all available web pages. We will decribe the details of this function in the next subsection.

The last step is to create and run per-core mTCP threads. For each CPU core, it creates a new mTCP thread which gets spawned from a function named RunMTCP(). We describe this function in further detail in the later subsections.

for (i = 0; i < core_limit; i++) {
        cores[i] = i;
        /* Run mtcp thread */
        if ((g_mcfg.cpu_mask & (1L << i)) &&
                pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
                perror("pthread_create");
                TRACE_ERROR("Failed to create msg_test thread.\n");
                exit(-1);
        }
}

(2) The Global Parameter Initialization Function

The GlobInitServer() function loads the epserver application configuration from epserver.conf file. The following code block shows the example configuration for epserver. www_main parameter indicates the path to the directory that holds the web pages. By setting core_limit parameter, the application can override the number of CPU cores to be used.

www_main = www
core_limit = 8

GlobInitServer() function parses the configuration file, stores the parameters, and opens the files in the folder it has to serve.

if (!dir) {
        TRACE_ERROR("Failed to open %s.\n", www_main);
        perror("opendir");
        exit(-1);
}

After opening the files, it loads the file contents in memory in order to accelerate web page transfers (by avoiding expensive file read operations during data transmission).

fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
if (!fcache[nfiles].file) {
        TRACE_ERROR("Failed to allocate memory for file %s\n",
                        fcache[nfiles].name);
        perror("malloc");
        continue;
}

total_read = 0;
while (1) {
        ret = read(fd, fcache[nfiles].file + total_read, fcache[nfiles].size - total_read);
        if (ret <= 0) {
                break;
        }
        total_read += ret;
}
if (total_read < fcache[nfiles].size) {
        free(fcache[nfiles].file);
        continue;
}
close(fd);

(3) The RunMTCP() Function

The RunMTCP() function is executed in a per-thread manner. First, RunMTCP() function affinitizes a CPU core to each thread and creates a mtcp context. Next, it calls the RunApplication function, which opens the sockets, receives HTTP request, and sends back HTTP response.

/* affinitize the mTCP thread to a core */
mtcp_core_affinitize(core);

/* mTCP initialization */
mctx = mtcp_create_context(core);
if (!mctx) {
        pthread_exit(NULL);
        TRACE_ERROR("Failed to craete mtcp context.\n");
        return NULL;
}
RunApplication(mctx);

RunApplication() function consists of InitServer() function and RunServer() functions. InitServer() creates a thread context which holds thread-specific metadata including epoll-related variables and statistics of the flows related to their status (e.g., started, pending, done, errors, and incompletes).

Inside the InitServer() function, it creates an epoll loop to receive the read and write availability events. Afterwards, it creates a listening socket to accept the connections from new clients.

/* create epoll descriptor */
ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS);
if (ctx->ep < 0) {
        TRACE_ERROR("Failed to create epoll descriptor!\n");
        exit(-1);
}

...

ctx->listener = CreateListeningSocket(ctx);
if (ctx->listener < 0) {
        TRACE_ERROR("Failed to create listening socket.\n");
        exit(-1);
}

RunServer() is the core of this program. In this function, using the epoll event API, it accepts incoming connections and receives/sends web content.

while (1) {
        nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
        if (nevents < 0) {
                if (errno != EINTR)
                        perror("mtcp_epoll_wait");
                break;
        }

        do_accept = FALSE;
        for (i = 0; i < nevents; i++) {

                if (events[i].data.sock == ctx->listener) {
                        /* if the event is for the listener, accept connection */
                        do_accept = TRUE;

                } else if (events[i].events & MOS_EPOLLERR) {
                        int err;
                        socklen_t len = sizeof(err);

                        /* error on the connection */
                        TRACE_APP("[CPU %d] Error on socket %d\n",
                                        core, events[i].data.sock);
                        if (mtcp_getsockopt(mctx, events[i].data.sock,
                                        SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
                                if (err != ETIMEDOUT) {
                                        fprintf(stderr, "Error on socket %d: %s\n",
                                                        events[i].data.sock, strerror(err));
                                }
                        } else {
                                fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n",
                                        strerror(errno), events[i].data.sock);
                                exit(-1);
                        }
                        CloseConnection(ctx, events[i].data.sock,
                                        &ctx->svars[events[i].data.sock]);

                } else if (events[i].events & MOS_EPOLLIN) {
                        ret = HandleReadEvent(ctx, events[i].data.sock,
                                        &ctx->svars[events[i].data.sock]);

                        if (ret == 0) {
                                /* connection closed by remote host */
                                CloseConnection(ctx, events[i].data.sock,
                                                &ctx->svars[events[i].data.sock]);
                        } else if (ret < 0) {
                                /* if not EAGAIN, it's an error */
                                if (errno != EAGAIN) {
                                        CloseConnection(ctx, events[i].data.sock,
                                                        &ctx->svars[events[i].data.sock]);
                                }
                        }

                } else if (events[i].events & MOS_EPOLLOUT) {
                        struct server_vars *sv = &ctx->svars[events[i].data.sock];
                        if (sv->rspheader_sent) {
                                SendUntilAvailable(ctx, events[i].data.sock, sv);
                        } else {
                                TRACE_APP("Socket %d: Response header not sent yet.\n",
                                                events[i].data.sock);
                        }

                } else {
                        assert(0);
                }
        }

        /* if do_accept flag is set, accept connections */
        if (do_accept) {
                while (1) {
                        ret = AcceptConnection(ctx, ctx->listener);
                        if (ret < 0)
                                break;
                }
        }

}

Here are some detailed explanations for each sub-function in the code above:

  • AcceptConnection() function accepts the connections from the listening queue (through the listening socket).

    c = mtcp_accept(mctx, listener, NULL, NULL);
    
    if (c >= 0) {
            TRACE_APP("New connection %d accepted.\n", c);
            ev.events = MOS_EPOLLIN;
            ev.data.sock = c;
            mtcp_setsock_nonblock(ctx->mctx, c);
            mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev);
            TRACE_APP("Socket %d registered.\n", c);
    } else {
            if (errno != EAGAIN) {
                    TRACE_ERROR("mtcp_accept() error %s\n", strerror(errno));
            }
    }
    
  • HandleReadEvent() function reads the HTTP request from the socket, and then responds to the request.

    /* HTTP request handling */
    rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
    if (rd <= 0) {
            return rd;
    }
    memcpy(sv->request + sv->recv_len,
                    (char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
    sv->recv_len += rd;
    
    sv->request_len = find_http_header(sv->request, sv->recv_len);
    if (sv->request_len <= 0) {
            TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
                            "read bytes: %d, recv_len: %d, "
                            "request_len: %d, strlen: %ld, request: \n%s\n",
                            sockid, rd, sv->recv_len,
                            sv->request_len, strlen(sv->request), sv->request);
            return rd;
    }
    
    http_get_url(sv->request, sv->request_len, url, URL_LEN);
    TRACE_APP("Socket %d URL: %s\n", sockid, url);
    sprintf(sv->fname, "%s%s", www_main, url);
    TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
    
    ...
    
    /* If the HTTP request is fully arrived, create HTTP a response header and transfer it */
    sprintf(response, "HTTP/1.1 %d %s\r\n"
                    "Date: %s\r\n"
                    "Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
                    "Content-Length: %ld\r\n"
                    "Connection: %s\r\n\r\n",
                    scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
    len = strlen(response);
    TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
    sent = mtcp_write(ctx->mctx, sockid, response, len);
    if (sent < len) {
            TRACE_ERROR("Socket %d: Sending HTTP response failed. "
                            "try: %d, sent: %d\n", sockid, len, sent);
            CloseConnection(ctx, sockid, sv);
    }
    TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
                    sockid, len, sent);
    
  • SendUntilAvailable() function sends the HTTP response to the client until the buffer is unavailable or the file reaches the end. As described earlier, there would be no disk I/O during this step, since all the files are already loaded onto the memory.

    sent = 0;
    ret = 1;
    while (ret > 0) {
            len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
            if (len <= 0) {
                    break;
            }
            ret = mtcp_write(ctx->mctx, sockid,
                            fcache[sv->fidx].file + sv->total_sent, len);
            if (ret < 0) {
                    if (errno != EAGAIN) {
                            TRACE_ERROR("Socket %d: Sending HTTP response body failed. "
                                            "try: %d, sent: %d\n", sockid, len, ret);
                    }
                    break;
            }
            TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
            sent += ret;
            sv->total_sent += ret;
    }
    
    /* if all the data sent,
            (1) wait for next request (keep-alive) or
            (2) close the socket */
    if (sv->total_sent >= fcache[sv->fidx].size) {
            struct mtcp_epoll_event ev;
            sv->done = TRUE;
            finished++;
    
            if (sv->keep_alive) {
                    /* if keep-alive connection, wait for the incoming request */
                    ev.events = MOS_EPOLLIN;
                    ev.data.sock = sockid;
                    mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
    
                    CleanServerVariable(sv);
            } else {
                    /* else, close connection */
                    CloseConnection(ctx, sockid, sv);
            }
    }
    

(4) Multi-process Version (DPDK-only)

You can also run epserver in multi-process (single-threaded) mode. This mode will only work with Intel DPDK driver. You can find epserver-mp placed in the same directory where epserver lies. The overall design of epserver-mp is similar to epserver (only pthreads are absent). One can run epserver-mp on a 4-core machine using the following script:

#!/bin/bash
./epserver-mp -f config/mos-master.conf -c 0 &
sleep 5
for i in {1..3}
do
./epserver-mp -f config/mos-slave.conf -c $i &
done

The -c switch is used to bind the process to a specific CPU core. Under DPDK settings, the master process (core 0 in the example above) is responsible for initializing the underlying DPDK-specific NIC resources one time. The slave processes (cores 1-3) share those initialized resources with the master process. The master process relies on the mos-master.conf file for configuration. It has only 1 new keyword: multiprocess = 0 master; where 0 stands for the CPU core id. The mos-slave.conf configuration file has an additional line: multiprocess = slave; which (as the line suggests) sets the process as a DPDK secondary (slave) instance. We employ a mandatory wait between the execution of the master and the slave processes. This is needed to avoid potential race conditions between the shared resources that are updated between them.