freebsd-ports/net/openvswitch/files/threaded.diff
Bryan Drewery 23b5323f9d - Update to 1.7.1
- Add a regression-test target, but leave it commented
  out for now as it is failing
- Fix build with clang

PR:		ports/171544
Submitted by:	emaste (maintainer)
2012-09-11 17:24:59 +00:00

1292 lines
38 KiB
Diff
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

diff --git configure.ac configure.ac
index 5692b86..ff62627 100644
--- configure.ac
+++ configure.ac
@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
AC_SEARCH_LIBS([timer_create], [rt])
AC_SEARCH_LIBS([pcap_open_live], [pcap])
+OVS_CHECK_THREADED
OVS_CHECK_COVERAGE
OVS_CHECK_NDEBUG
OVS_CHECK_NETLINK
diff --git lib/automake.mk lib/automake.mk
index 13622b3..87bdd8d 100644
--- lib/automake.mk
+++ lib/automake.mk
@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
lib/daemon.c \
lib/daemon.h \
lib/dhcp.h \
+ lib/dispatch.h \
lib/dummy.c \
lib/dummy.h \
lib/dhparams.h \
diff --git lib/dispatch.h lib/dispatch.h
new file mode 100644
index 0000000..80ac9c7
--- /dev/null
+++ lib/dispatch.h
@@ -0,0 +1,9 @@
+#include <sys/types.h>
+#include "ofpbuf.h"
+
+#ifndef DISPATCH_H
+#define DISPATCH_H 1
+
+typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
+
+#endif /* DISPATCH_H */
diff --git lib/dpif-netdev.c lib/dpif-netdev.c
index cade79e..509e2ef 100644
--- lib/dpif-netdev.c
+++ lib/dpif-netdev.c
@@ -32,6 +32,15 @@
#include <sys/stat.h>
#include <unistd.h>
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
#include "csum.h"
#include "dpif.h"
#include "dpif-provider.h"
@@ -55,6 +64,16 @@
#include "vlog.h"
VLOG_DEFINE_THIS_MODULE(dpif_netdev);
+/* We could use these macros instead of using #ifdef and #endif every time we
+ * need to call the pthread_mutex_lock/unlock.
+#ifdef THREADED
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
+*/
/* Configuration parameters. */
enum { MAX_PORTS = 256 }; /* Maximum number of ports. */
@@ -82,6 +101,21 @@ struct dp_netdev {
int open_cnt;
bool destroyed;
+#ifdef THREADED
+ /* The pipe is used to signal the presence of a packet on the queue.
+ * - dpif_netdev_recv_wait() waits on p[0]
+ * - dpif_netdev_recv() extract from queue and read p[0]
+ * - dp_netdev_output_control() send to queue and write p[1]
+ */
+
+ int pipe[2]; /* signal a packet on the queue */
+ struct pollfd *pipe_fd;
+
+ pthread_mutex_t table_mutex; /* mutex for the flow table */
+ pthread_mutex_t port_list_mutex; /* port list mutex */
+
+ /* The access to this queue is protected by the table_mutex mutex */
+#endif
struct dp_netdev_queue queues[N_QUEUES];
struct hmap flow_table; /* Flow table. */
@@ -102,6 +136,9 @@ struct dp_netdev_port {
struct list node; /* Element in dp_netdev's 'port_list'. */
struct netdev *netdev;
char *type; /* Port type as requested by user. */
+#ifdef THREADED
+ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */
+#endif
};
/* A flow in dp_netdev's 'flow_table'. */
@@ -127,6 +164,11 @@ struct dpif_netdev {
unsigned int dp_serial;
};
+#ifdef THREADED
+/* XXX global Descriptor of the thread that manages the datapaths. */
+pthread_t thread_p;
+#endif
+
/* All netdev-based datapaths. */
static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
dp->class = class;
dp->name = xstrdup(name);
dp->open_cnt = 0;
+#ifdef THREADED
+ error = pipe(dp->pipe);
+ if (error) {
+ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno));
+ return errno;
+ }
+ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
+ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s",
+ strerror(errno));
+ return errno;
+ }
+ dp->pipe_fd = NULL;
+ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]);
+
+ pthread_mutex_init(&dp->table_mutex, NULL);
+ pthread_mutex_init(&dp->port_list_mutex, NULL);
+#endif
for (i = 0; i < N_QUEUES; i++) {
dp->queues[i].head = dp->queues[i].tail = 0;
}
@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
return 0;
}
+#ifdef THREADED
+static void * dp_thread_body(void *args OVS_UNUSED);
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dpif_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+ if (pthread_cancel(thread_p) == 0) {
+ pthread_join(thread_p, NULL);
+ }
+}
+
+static int
+dpif_netdev_init(void)
+{
+ static int error = -1;
+
+ if (error < 0) {
+ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
+ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+ if (error != 0) {
+ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
+ error = errno;
+ } else {
+ VLOG_DBG("Datapath thread started");
+ }
+ }
+ return error;
+}
+#endif
+
static int
dpif_netdev_open(const struct dpif_class *class, const char *name,
bool create, struct dpif **dpifp)
@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
}
*dpifp = create_dpif_netdev(dp);
+#ifdef THREADED
+ dpif_netdev_init();
+#endif
return 0;
}
+/* table_mutex must be locked in THREADED mode.
+ */
static void
dp_netdev_purge_queues(struct dp_netdev *dp)
{
@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp)
struct dp_netdev_port *port, *next;
dp_netdev_flow_flush(dp);
+#ifdef THREADED
+ pthread_mutex_lock(&dp->port_list_mutex);
+#endif
LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
do_del_port(dp, port->port_no);
}
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->port_list_mutex);
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
dp_netdev_purge_queues(dp);
hmap_destroy(&dp->flow_table);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+ pthread_mutex_destroy(&dp->table_mutex);
+ pthread_mutex_destroy(&dp->port_list_mutex);
+#endif
free(dp->name);
free(dp);
}
@@ -306,7 +414,13 @@ static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
stats->n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
stats->n_hit = dp->n_hit;
stats->n_missed = dp->n_missed;
stats->n_lost = dp->n_lost;
@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
port->port_no = port_no;
port->netdev = netdev;
port->type = xstrdup(type);
+#ifdef THREADED
+ port->poll_fd = NULL;
+#endif
error = netdev_get_mtu(netdev, &mtu);
if (!error) {
max_mtu = mtu;
}
+#ifdef THREADED
+ pthread_mutex_lock(&dp->port_list_mutex);
+#endif
list_push_back(&dp->port_list, &port->node);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
dp->ports[port_no] = port;
dp->serial++;
@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp,
{
struct dp_netdev_port *port;
+#ifdef THREADED
+ pthread_mutex_lock(&dp->port_list_mutex);
+#endif
LIST_FOR_EACH (port, node, &dp->port_list) {
if (!strcmp(netdev_get_name(port->netdev), devname)) {
*portp = port;
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
return 0;
}
}
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
return ENOENT;
}
+/* In THREADED mode, must be called with port_list_mutex held. */
static int
do_del_port(struct dp_netdev *dp, uint16_t port_no)
{
@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
static void
dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
{
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
hmap_remove(&dp->flow_table, &flow->node);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
free(flow->actions);
free(flow);
}
@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
}
static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
+#ifdef THREADED
+dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
+#else
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+#endif
{
struct dp_netdev_flow *flow;
@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
return NULL;
}
+#ifdef THREADED
+static struct dp_netdev_flow *
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+{
+ struct dp_netdev_flow *flow;
+
+ pthread_mutex_lock(&dp->table_mutex);
+ flow = dp_netdev_lookup_flow_locked(dp, key);
+ pthread_mutex_unlock(&dp->table_mutex);
+ return flow;
+}
+#endif
+
static void
get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
{
@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key,
return error;
}
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
return 0;
}
@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
struct dp_netdev_flow *flow;
struct flow key;
int error;
+ int n_flows;
error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
if (error) {
@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
flow = dp_netdev_lookup_flow(dp, &key);
if (!flow) {
if (put->flags & DPIF_FP_CREATE) {
- if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
+ n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
+ if (n_flows < MAX_FLOWS) {
if (put->stats) {
memset(put->stats, 0, sizeof *put->stats);
}
@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
struct dp_netdev_flow *flow;
struct hmap_node *node;
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
if (!node) {
return EOF;
}
@@ -949,7 +1125,13 @@ static int
dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
struct ofpbuf *buf)
{
- struct dp_netdev_queue *q = find_nonempty_queue(dpif);
+ struct dp_netdev_queue *q;
+#ifdef THREADED
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ char c;
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
+ q = find_nonempty_queue(dpif);
if (q) {
struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK];
*upcall = *u;
@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
ofpbuf_uninit(buf);
*buf = *upcall->packet;
+#ifdef THREADED
+ /* Read a byte from the pipe to signal that a packet has been
+ * received. */
+ if (read(dp->pipe[0], &c, 1) < 0) {
+ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno));
+ }
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
return 0;
} else {
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
return EAGAIN;
}
}
@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
static void
dpif_netdev_recv_wait(struct dpif *dpif)
{
+#ifdef THREADED
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ poll_fd_wait(dp->pipe[0], POLLIN);
+#else
if (find_nonempty_queue(dpif)) {
poll_immediate_wake();
} else {
/* No messages ready to be received, and dp_wait() will ensure that we
* wake up to queue new messages, so there is nothing to do. */
}
+#endif
}
static void
dpif_netdev_recv_purge(struct dpif *dpif)
{
struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
+#ifdef THREADED
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
dp_netdev_purge_queues(dpif_netdev->dp);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
}
static void
@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
return;
}
flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key);
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+ flow = dp_netdev_lookup_flow_locked(dp, &key);
+#else
flow = dp_netdev_lookup_flow(dp, &key);
+#endif
if (flow) {
dp_netdev_flow_used(flow, &key, packet);
dp_netdev_execute_actions(dp, packet, &key,
@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
dp->n_missed++;
dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
}
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
}
+#ifdef THREADED
+static void
+dpif_netdev_run(struct dpif *dpif OVS_UNUSED)
+{
+}
+
+static void
+dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
+{
+}
+#else
static void
dpif_netdev_run(struct dpif *dpif)
{
@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif)
netdev_recv_wait(port->netdev);
}
}
+#endif
+
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+ struct dp_netdev *dp; /* update statistics */
+ struct dp_netdev_port *port; /* argument to flow identifier function */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *user, struct ofpbuf *buf)
+{
+ struct dispatch_arg *arg = (struct dispatch_arg *)user;
+
+ ofpbuf_padto(buf, ETH_TOTAL_MIN);
+ dp_netdev_port_input(arg->dp, arg->port, buf);
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+ struct dp_netdev *dp;
+ struct dp_netdev_port *port;
+ struct dispatch_arg arg;
+ int error;
+ int n_fds;
+ uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+ int processed; /* actual number of pkts processed by the dispatch */
+ char readbuf[1024];
+
+ sigset_t sigmask;
+
+ /*XXX Since the poll involves all ports of all datapaths, the right fds
+ * size should be MAX_PORTS * max_number_of_datapaths */
+ struct pollfd fds[MAX_PORTS + 1];
+
+ /* mask the fatal signals. In this way the main thread is delegate to
+ * manage this them. */
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGTERM);
+ sigaddset(&sigmask, SIGALRM);
+ sigaddset(&sigmask, SIGINT);
+ sigaddset(&sigmask, SIGHUP);
+
+ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
+ }
+
+ for(;;) {
+ struct shash_node *node;
+ n_fds = 0;
+ /* build the structure for poll */
+ SHASH_FOR_EACH(node, &dp_netdevs) {
+ dp = (struct dp_netdev *)node->data;
+ fds[n_fds].fd = dp->pipe[1];
+ fds[n_fds].events = POLLIN;
+ dp->pipe_fd = &fds[n_fds];
+ n_fds++;
+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+ VLOG_ERR("Too many fds for poll adding pipe_fd");
+ break;
+ }
+ pthread_mutex_lock(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, node, &dp->port_list) {
+ /* insert an element in the fds structure */
+ fds[n_fds].fd = netdev_get_fd(port->netdev);
+ fds[n_fds].events = POLLIN;
+ port->poll_fd = &fds[n_fds];
+ n_fds++;
+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+ VLOG_ERR("Too many fds for poll adding port fd");
+ break;
+ }
+ }
+ pthread_mutex_unlock(&dp->port_list_mutex);
+ }
+
+ error = poll(fds, n_fds, 2000);
+ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error);
+
+ if (error < 0) {
+ if (errno == EINTR) {
+ /* XXX get this case in detach mode */
+ continue;
+ }
+ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
+ /* XXX terminating the thread is probably not right */
+ break;
+ }
+ pthread_testcancel();
+
+ SHASH_FOR_EACH (node, &dp_netdevs) {
+ dp = (struct dp_netdev *)node->data;
+ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) {
+ VLOG_DBG("Signalled from main thread");
+ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0)
+ ;
+ if (error < 0 && errno != EAGAIN) {
+ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno));
+ }
+ }
+ arg.dp = dp;
+ pthread_mutex_lock(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, node, &dp->port_list) {
+ arg.port = port;
+ if (port->poll_fd) {
+ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
+ }
+ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+ /* call the dispatch and process the packet into
+ * its callback. We process 'batch' packets at time */
+ processed = netdev_dispatch(port->netdev, batch,
+ process_pkt, (u_char *)&arg);
+ if (processed < 0) { /* pcap returns error */
+ static struct vlog_rate_limit rl =
+ VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl,
+ "error receiving data from XXX \n");
+ }
+ } /* end of if poll */
+ } /* end of port loop */
+ pthread_mutex_unlock(&dp->port_list_mutex);
+ } /* end of dp loop */
+ } /* for ;; */
+
+ return NULL;
+}
+
+#endif /* THREADED */
static void
dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
uint16_t out_port)
{
struct dp_netdev_port *p = dp->ports[out_port];
+ char c = 0;
+
if (p) {
netdev_send(p->netdev, packet);
+#ifdef THREADED
+ if (write(dp->pipe[0], &c, 1) < 0) {
+ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
+ }
+#endif
}
}
+/* In THREADED mode, must be called with table_lock_mutex held. */
static int
dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
int queue_no, const struct flow *flow, uint64_t arg)
@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
struct dpif_upcall *upcall;
struct ofpbuf *buf;
size_t key_len;
+#ifdef THREADED
+ char c = 0;
+#endif
if (q->head - q->tail >= MAX_QUEUE_LEN) {
dp->n_lost++;
@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
upcall->userdata = arg;
q->upcalls[q->head++ & QUEUE_MASK] = upcall;
+#ifdef THREADED
+ /* Write a byte on the pipe to advertise that a packet is ready. */
+ if (write(dp->pipe[1], &c, 1) < 0) {
+ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno));
+ }
+#endif
return 0;
}
@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
+#ifdef THREADED
+ pthread_mutex_lock(&dp->table_mutex);
+#endif
dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
+#ifdef THREADED
+ pthread_mutex_unlock(&dp->table_mutex);
+#endif
}
static void
diff --git lib/netdev-bsd.c lib/netdev-bsd.c
index 0b1a37c..ff79367 100644
--- lib/netdev-bsd.c
+++ lib/netdev-bsd.c
@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
}
}
+#ifdef THREADED
+
+struct dispatch_arg {
+ pkt_handler h;
+ u_char *user;
+};
+
+static void
+dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
+{
+ struct ofpbuf buf;
+ struct dispatch_arg *parg = (struct dispatch_arg*)user;
+
+ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
+ buf.size = phdr->caplen;
+ (*parg->h)(parg->user, &buf);
+ ofpbuf_uninit(&buf);
+}
+
+static int
+netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ struct dispatch_arg arg;
+
+ arg.h = h;
+ arg.user = user;
+ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
+ return ret;
+}
+
+static int
+netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ int i;
+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+ OFPBUF_STACK_BUFFER(buf_, size);
+
+ struct ofpbuf buf;
+ ofpbuf_use_stub(&buf, buf_, size);
+ for (i = 0; i < batch; i++) {
+ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
+ if (ret >= 0) {
+ buf.size += ret;
+ h(user, &buf);
+ } else if (ret != -EAGAIN) {
+ return -1;
+ } else { /* ret = EAGAIN */
+ break;
+ }
+ ofpbuf_clear(&buf);
+ }
+ ofpbuf_uninit(&buf);
+ return i;
+}
+
+static int
+netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+ struct netdev_dev_bsd * netdev_dev =
+ netdev_dev_bsd_cast(netdev_get_dev(netdev_));
+
+ if (!strcmp(netdev_get_type(netdev_), "tap") &&
+ netdev->netdev_fd == netdev_dev->tap_fd) {
+ return netdev_bsd_dispatch_tap(netdev, batch, h, user);
+ } else {
+ return netdev_bsd_dispatch_system(netdev, batch, h, user);
+ }
+}
+
+static int
+netdev_bsd_get_fd(struct netdev *netdev_)
+{
+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+ return netdev->netdev_fd;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
static int
netdev_bsd_drain(struct netdev *netdev_)
@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
netdev_bsd_recv,
netdev_bsd_recv_wait,
+#ifdef THREADED
+ netdev_bsd_dispatch,
+ netdev_bsd_get_fd,
+#endif
netdev_bsd_drain,
netdev_bsd_send,
@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
netdev_bsd_recv,
netdev_bsd_recv_wait,
+#ifdef THREADED
+ netdev_bsd_dispatch,
+ netdev_bsd_get_fd,
+#endif
netdev_bsd_drain,
netdev_bsd_send,
diff --git lib/netdev-dummy.c lib/netdev-dummy.c
index b8c23c5..4e4801c 100644
--- lib/netdev-dummy.c
+++ lib/netdev-dummy.c
@@ -20,6 +20,12 @@
#include <errno.h>
+#ifdef THREADED
+#include <pthread.h>
+#include <unistd.h>
+#include "socket-util.h"
+#endif
+
#include "flow.h"
#include "list.h"
#include "netdev-provider.h"
@@ -51,6 +57,10 @@ struct netdev_dummy {
struct list node; /* In netdev_dev_dummy's "devs" list. */
struct list recv_queue;
bool listening;
+#ifdef THREADED
+ pthread_mutex_t queue_mutex;
+ int s_pipe[2]; /* used to signal packet arrivals */
+#endif
};
static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp)
{
struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_);
struct netdev_dummy *netdev;
+#ifdef THREADED
+ int error;
+#endif
netdev = xmalloc(sizeof *netdev);
netdev_init(&netdev->netdev, netdev_dev_);
list_init(&netdev->recv_queue);
netdev->listening = false;
+#ifdef THREADED
+ error = pipe(netdev->s_pipe);
+ if (error) {
+ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
+ free(netdev);
+ return errno;
+ }
+ if (set_nonblocking(netdev->s_pipe[0]) ||
+ set_nonblocking(netdev->s_pipe[1])) {
+ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
+ strerror(errno));
+ free(netdev);
+ return errno;
+ }
+ pthread_mutex_init(&netdev->queue_mutex, NULL);
+#endif
*netdevp = &netdev->netdev;
list_push_back(&netdev_dev->devs, &netdev->node);
@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_)
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
list_remove(&netdev->node);
ofpbuf_list_delete(&netdev->recv_queue);
+#ifdef THREADED
+ if (netdev->listening) {
+ close(netdev->s_pipe[0]);
+ close(netdev->s_pipe[1]);
+ }
+ pthread_mutex_destroy(&netdev->queue_mutex);
+#endif
free(netdev);
}
@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
struct ofpbuf *packet;
size_t packet_size;
+#ifdef THREADED
+ char c;
+#endif
+#ifdef THREADED
+ pthread_mutex_lock(&netdev->queue_mutex);
+#endif
if (list_is_empty(&netdev->recv_queue)) {
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
return -EAGAIN;
}
+#ifdef THREADED
+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
+ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
+ }
+#endif
packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
if (packet->size > size) {
return -EMSGSIZE;
}
@@ -179,11 +232,60 @@ static void
netdev_dummy_recv_wait(struct netdev *netdev_)
{
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
- if (!list_is_empty(&netdev->recv_queue)) {
+ int empty;
+
+#ifdef THREADED
+ pthread_mutex_lock(&netdev->queue_mutex);
+#endif
+ empty = list_is_empty(&netdev->recv_queue);
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
+ if (!empty) {
poll_immediate_wake();
}
}
+#ifdef THREADED
+static int
+netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ int i;
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ struct ofpbuf *packet;
+ VLOG_DBG("dispatch %d", batch);
+
+ for (i = 0; i < batch; i++) {
+ char c;
+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
+ if (errno == EAGAIN)
+ break;
+ VLOG_ERR("%s: error reading from the pipe: %s",
+ netdev_get_name(netdev_), strerror(errno));
+ return -1;
+ }
+ pthread_mutex_lock(&netdev->queue_mutex);
+ if (list_is_empty(&netdev->recv_queue)) {
+ pthread_mutex_unlock(&netdev->queue_mutex);
+ return -EAGAIN;
+ }
+ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+ pthread_mutex_unlock(&netdev->queue_mutex);
+ h(user, packet);
+ ofpbuf_delete(packet);
+ }
+ return i;
+}
+
+static int
+netdev_dummy_get_fd(struct netdev *netdev_)
+{
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ return netdev->s_pipe[0];
+}
+#endif
+
static int
netdev_dummy_drain(struct netdev *netdev_)
{
@@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = {
netdev_dummy_listen,
netdev_dummy_recv,
netdev_dummy_recv_wait,
+#ifdef THREADED
+ netdev_dummy_dispatch, /* dispatch */
+ netdev_dummy_get_fd, /* get_fd */
+#endif
netdev_dummy_drain,
NULL, /* send */
@@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn,
struct netdev_dev_dummy *dummy_dev;
int n_listeners;
int i;
+#ifdef THREADED
+ char c = 0;
+#endif
dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]);
if (!dummy_dev) {
@@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn,
return;
}
+
n_listeners = 0;
for (i = 2; i < argc; i++) {
struct netdev_dummy *dev;
@@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn,
LIST_FOR_EACH (dev, node, &dummy_dev->devs) {
if (dev->listening) {
struct ofpbuf *copy = ofpbuf_clone(packet);
+#ifdef THREADED
+ pthread_mutex_lock(&dev->queue_mutex);
+#endif
list_push_back(&dev->recv_queue, &copy->list_node);
+#ifdef THREADED
+ pthread_mutex_unlock(&dev->queue_mutex);
+ if (write(dev->s_pipe[1], &c, 1) < 0) {
+ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno));
+ }
+#endif
n_listeners++;
}
}
diff --git lib/netdev-linux.c lib/netdev-linux.c
index 4d2f3ac..c33a801 100644
--- lib/netdev-linux.c
+++ lib/netdev-linux.c
@@ -891,6 +891,43 @@ netdev_linux_recv_wait(struct netdev *netdev_)
}
}
+#ifdef THREADED
+static int
+netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ int i;
+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+ OFPBUF_STACK_BUFFER(buf_, size);
+ struct ofpbuf buf;
+ VLOG_DBG("dispatch %d", batch);
+
+ ofpbuf_use_stub(&buf, buf_, size);
+ for (i = 0; i < batch; i++) {
+ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf));
+ if (ret >= 0) {
+ buf.size += ret;
+ h(user, &buf);
+ } else if (ret != -EAGAIN) {
+ return -1;
+ } else {
+ break;
+ }
+ ofpbuf_clear(&buf);
+ }
+ ofpbuf_uninit(&buf);
+ return i;
+}
+
+static int
+netdev_linux_get_fd(struct netdev *netdev_)
+{
+ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
+ return netdev->fd;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
static int
netdev_linux_drain(struct netdev *netdev_)
@@ -2376,6 +2413,12 @@ netdev_linux_change_seq(const struct netdev *netdev)
return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq;
}
+#ifdef THREADED
+#define THREADED_NULL NULL, NULL,
+#else
+#define THREADED_NULL
+#endif
+
#define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \
GET_FEATURES, GET_STATUS) \
{ \
@@ -2396,6 +2439,7 @@ netdev_linux_change_seq(const struct netdev *netdev)
netdev_linux_listen, \
netdev_linux_recv, \
netdev_linux_recv_wait, \
+ THREADED_NULL \
netdev_linux_drain, \
\
netdev_linux_send, \
diff --git lib/netdev-provider.h lib/netdev-provider.h
index 2a91f05..ee4e757 100644
--- lib/netdev-provider.h
+++ lib/netdev-provider.h
@@ -24,6 +24,9 @@
#include "netdev.h"
#include "list.h"
#include "shash.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
#ifdef __cplusplus
extern "C" {
@@ -190,6 +193,22 @@ struct netdev_class {
* implement packet reception through the 'recv' member function. */
void (*recv_wait)(struct netdev *netdev);
+#ifdef THREADED
+ /* Attempts to receive 'batch' packets from 'netdev' and process them
+ * through the 'handler' callback. This function is used in the 'THREADED'
+ * version in order to optimize the forwarding process, since it permits to
+ * process packets directly in the netdev memory.
+ *
+ * Returns the number of packets processed on success; this can be 0 if no
+ * packets are available to be read. Returns -1 if an error occurred.
+ */
+ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
+ u_char *user);
+
+ /* Return the file descriptor of the device */
+ int (*get_fd)(struct netdev *netdev);
+#endif
+
/* Discards all packets waiting to be received from 'netdev'.
*
* May be null if not needed, such as for a network device that does not
diff --git lib/netdev-vport.c lib/netdev-vport.c
index 1721f6b..39b26de 100644
--- lib/netdev-vport.c
+++ lib/netdev-vport.c
@@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
return 0;
}
+
+#ifdef THREADED
+# define THREADED_NULL NULL, NULL,
+#else
+# define THREADED_NULL
+#endif
+
#define VPORT_FUNCTIONS(GET_STATUS) \
NULL, \
netdev_vport_run, \
@@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
NULL, /* listen */ \
NULL, /* recv */ \
NULL, /* recv_wait */ \
+ THREADED_NULL \
NULL, /* drain */ \
\
netdev_vport_send, /* send */ \
diff --git lib/netdev.c lib/netdev.c
index be7cdd2..0c54e1e 100644
--- lib/netdev.c
+++ lib/netdev.c
@@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev)
}
}
+#ifdef THREADED
+/* Attempts to receive and process 'batch' packets from 'netdev'. */
+int
+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
+{
+ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
+
+ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
+ return dispatch ? dispatch(netdev, batch, h, user) : 0;
+}
+
+/* Returns the file descriptor */
+int
+netdev_get_fd(struct netdev *netdev)
+{
+ int (*get_fd)(struct netdev *);
+
+ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
+ return get_fd ? get_fd(netdev) : 0;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
int
netdev_drain(struct netdev *netdev)
diff --git lib/netdev.h lib/netdev.h
index 4b86c21..4fad796 100644
--- lib/netdev.h
+++ lib/netdev.h
@@ -21,6 +21,9 @@
#include <stddef.h>
#include <stdint.h>
#include "openvswitch/types.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
#ifdef __cplusplus
extern "C" {
@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *);
int netdev_listen(struct netdev *);
int netdev_recv(struct netdev *, struct ofpbuf *);
void netdev_recv_wait(struct netdev *);
+#ifdef THREADED
+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
+int netdev_get_fd(struct netdev *);
+#endif
int netdev_drain(struct netdev *);
int netdev_send(struct netdev *, const struct ofpbuf *);
diff --git lib/vlog.c lib/vlog.c
index 899072e..b6bd4ef 100644
--- lib/vlog.c
+++ lib/vlog.c
@@ -34,6 +34,9 @@
#include "timeval.h"
#include "unixctl.h"
#include "util.h"
+#ifdef THREADED
+#include <pthread.h>
+#endif
VLOG_DEFINE_THIS_MODULE(vlog);
@@ -89,6 +92,10 @@ static FILE *log_file;
/* vlog initialized? */
static bool vlog_inited;
+#ifdef THREADED
+static pthread_mutex_t vlog_mutex;
+#endif
+
static void format_log_message(const struct vlog_module *, enum vlog_level,
enum vlog_facility, unsigned int msg_num,
const char *message, va_list, struct ds *)
@@ -484,6 +491,9 @@ vlog_init(void)
return;
}
vlog_inited = true;
+#ifdef THREADED
+ pthread_mutex_init(&vlog_mutex, NULL);
+#endif
/* openlog() is allowed to keep the pointer passed in, without making a
* copy. The daemonize code sometimes frees and replaces 'program_name',
@@ -691,6 +701,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
ds_init(&s);
ds_reserve(&s, 1024);
+#ifdef THREADED
+ pthread_mutex_lock(&vlog_mutex);
+#endif
msg_num++;
if (log_to_console) {
@@ -721,6 +734,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
fflush(log_file);
}
+#ifdef THREADED
+ pthread_mutex_unlock(&vlog_mutex);
+#endif
ds_destroy(&s);
errno = save_errno;
}
diff --git m4/openvswitch.m4 m4/openvswitch.m4
index dca9f5f..084ceff 100644
--- m4/openvswitch.m4
+++ m4/openvswitch.m4
@@ -14,6 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+dnl Check for --enable-threaded and updates CFLAGS.
+AC_DEFUN([OVS_CHECK_THREADED],
+ [AC_REQUIRE([AC_PROG_CC])
+ AC_ARG_ENABLE(
+ [threaded],
+ [AC_HELP_STRING([--enable-threaded],
+ [Enable threaded version of userspace implementation])],
+ [case "${enableval}" in
+ (yes) threaded=true ;;
+ (no) threaded=false ;;
+ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
+ esac],
+ [threaded=false])
+ if $threaded; then
+ AC_DEFINE([THREADED], [1],
+ [Define to 1 if the threaded version of userspace
+ implementation is enabled.])
+ fi])
+
dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
AC_DEFUN([OVS_CHECK_COVERAGE],
[AC_REQUIRE([AC_PROG_CC])