tipc: convert topology server to use new server facility

As the new TIPC server infrastructure has been introduced, we can
now convert the TIPC topology server to it.  We get two benefits
from doing this:

1) It simplifies the topology server locking policy.  In the
original locking policy, we placed one spin lock pointer in the
tipc_subscriber structure to reuse the lock of the subscriber's
server port, controlling access to members of tipc_subscriber
instance.  That is, we only used one lock to ensure both
tipc_port and tipc_subscriber members were safely accessed.

Now we introduce another spin lock for tipc_subscriber structure
only protecting themselves, to get a finer granularity locking
policy.  Moreover, the change will allow us to make the topology
server code more readable and maintainable.

2) It fixes a bug where sent subscription events may be lost when
the topology port is congested.  Using the new service, the
topology server now queues sent events into an outgoing buffer,
and then wakes up a sender process which has been blocked in
workqueue context.  The process will keep picking events from the
buffer and send them to their respective subscribers, using the
kernel socket interface, until the buffer is empty. Even if the
socket is congested during transmission there is no risk that
events may be dropped, since the sender process may block when
needed.

Some minor reordering of initialization is done, since we now
have a scenario where the topology server must be started after
socket initialization has taken place, as the former depends
on the latter.  And overall, we see a simplification of the
TIPC subscriber code in making this changeover.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Ying Xue 2013-06-17 10:54:40 -04:00 committed by David S. Miller
parent c5fa7b3cf3
commit 13a2e89873
4 changed files with 104 additions and 247 deletions

View file

@ -2,7 +2,7 @@
* net/tipc/core.c: TIPC module code
*
* Copyright (c) 2003-2006, Ericsson AB
* Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* Copyright (c) 2005-2006, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -136,8 +136,6 @@ static int tipc_core_start(void)
res = tipc_ref_table_init(tipc_max_ports, tipc_random);
if (!res)
res = tipc_nametbl_init();
if (!res)
res = tipc_subscr_start();
if (!res)
res = tipc_cfg_init();
if (!res)
@ -146,6 +144,8 @@ static int tipc_core_start(void)
res = tipc_socket_init();
if (!res)
res = tipc_register_sysctl();
if (!res)
res = tipc_subscr_start();
if (res)
tipc_core_stop();

View file

@ -402,7 +402,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
return -EAFNOSUPPORT;
if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
(addr->addr.nameseq.type != TIPC_TOP_SRV))
return -EACCES;
return (addr->scope > 0) ?

View file

@ -2,7 +2,7 @@
* net/tipc/subscr.c: TIPC network topology service
*
* Copyright (c) 2000-2006, Ericsson AB
* Copyright (c) 2005-2007, 2010-2011, Wind River Systems
* Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -41,33 +41,42 @@
/**
* struct tipc_subscriber - TIPC network topology subscriber
* @port_ref: object reference to server port connecting to subscriber
* @lock: pointer to spinlock controlling access to subscriber's server port
* @subscriber_list: adjacent subscribers in top. server's list of subscribers
* @conid: connection identifier to server connecting to subscriber
* @lock: controll access to subscriber
* @subscription_list: list of subscription objects for this subscriber
*/
struct tipc_subscriber {
u32 port_ref;
spinlock_t *lock;
struct list_head subscriber_list;
int conid;
spinlock_t lock;
struct list_head subscription_list;
};
/**
* struct top_srv - TIPC network topology subscription service
* @setup_port: reference to TIPC port that handles subscription requests
* @subscription_count: number of active subscriptions (not subscribers!)
* @subscriber_list: list of ports subscribing to service
* @lock: spinlock govering access to subscriber list
*/
struct top_srv {
u32 setup_port;
atomic_t subscription_count;
struct list_head subscriber_list;
spinlock_t lock;
static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
void *usr_data, void *buf, size_t len);
static void *subscr_named_msg_event(int conid);
static void subscr_conn_shutdown_event(int conid, void *usr_data);
static atomic_t subscription_count = ATOMIC_INIT(0);
static struct sockaddr_tipc topsrv_addr __read_mostly = {
.family = AF_TIPC,
.addrtype = TIPC_ADDR_NAMESEQ,
.addr.nameseq.type = TIPC_TOP_SRV,
.addr.nameseq.lower = TIPC_TOP_SRV,
.addr.nameseq.upper = TIPC_TOP_SRV,
.scope = TIPC_NODE_SCOPE
};
static struct top_srv topsrv;
static struct tipc_server topsrv __read_mostly = {
.saddr = &topsrv_addr,
.imp = TIPC_CRITICAL_IMPORTANCE,
.type = SOCK_SEQPACKET,
.max_rcvbuf_size = sizeof(struct tipc_subscr),
.name = "topology_server",
.tipc_conn_recvmsg = subscr_conn_msg_event,
.tipc_conn_new = subscr_named_msg_event,
.tipc_conn_shutdown = subscr_conn_shutdown_event,
};
/**
* htohl - convert value to endianness used by destination
@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap)
return swap ? swab32(in) : in;
}
/**
* subscr_send_event - send a message containing a tipc_event to the subscriber
*
* Note: Must not hold subscriber's server port lock, since tipc_send() will
* try to take the lock if the message is rejected and returned!
*/
static void subscr_send_event(struct tipc_subscription *sub,
u32 found_lower,
u32 found_upper,
u32 event,
u32 port_ref,
static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
u32 node)
{
struct iovec msg_sect;
struct tipc_subscriber *subscriber = sub->subscriber;
struct kvec msg_sect;
int ret;
msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event);
@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub,
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len);
ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
msg_sect.iov_base, msg_sect.iov_len);
if (ret < 0)
pr_err("Sending subscription event failed, no memory\n");
}
/**
@ -147,21 +152,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
}
/**
* subscr_timeout - subscription timeout has occurred
*/
static void subscr_timeout(struct tipc_subscription *sub)
{
struct tipc_port *server_port;
struct tipc_subscriber *subscriber = sub->subscriber;
/* Validate server port reference (in case subscriber is terminating) */
server_port = tipc_port_lock(sub->server_ref);
if (server_port == NULL)
/* The spin lock per subscriber is used to protect its members */
spin_lock_bh(&subscriber->lock);
/* Validate if the connection related to the subscriber is
* closed (in case subscriber is terminating)
*/
if (subscriber->conid == 0) {
spin_unlock_bh(&subscriber->lock);
return;
}
/* Validate timeout (in case subscription is being cancelled) */
if (sub->timeout == TIPC_WAIT_FOREVER) {
tipc_port_unlock(server_port);
spin_unlock_bh(&subscriber->lock);
return;
}
@ -171,8 +179,7 @@ static void subscr_timeout(struct tipc_subscription *sub)
/* Unlink subscription from subscriber */
list_del(&sub->subscription_list);
/* Release subscriber's server port */
tipc_port_unlock(server_port);
spin_unlock_bh(&subscriber->lock);
/* Notify subscriber of timeout */
subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@ -181,64 +188,54 @@ static void subscr_timeout(struct tipc_subscription *sub)
/* Now destroy subscription */
k_term_timer(&sub->timer);
kfree(sub);
atomic_dec(&topsrv.subscription_count);
atomic_dec(&subscription_count);
}
/**
* subscr_del - delete a subscription within a subscription list
*
* Called with subscriber port locked.
* Called with subscriber lock held.
*/
static void subscr_del(struct tipc_subscription *sub)
{
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscription_list);
kfree(sub);
atomic_dec(&topsrv.subscription_count);
atomic_dec(&subscription_count);
}
/**
* subscr_terminate - terminate communication with a subscriber
*
* Called with subscriber port locked. Routine must temporarily release lock
* to enable subscription timeout routine(s) to finish without deadlocking;
* the lock is then reclaimed to allow caller to release it upon return.
* (This should work even in the unlikely event some other thread creates
* a new object reference in the interim that uses this lock; this routine will
* simply wait for it to be released, then claim it.)
* Note: Must call it in process context since it might sleep.
*/
static void subscr_terminate(struct tipc_subscriber *subscriber)
{
u32 port_ref;
tipc_conn_terminate(&topsrv, subscriber->conid);
}
static void subscr_release(struct tipc_subscriber *subscriber)
{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
/* Invalidate subscriber reference */
port_ref = subscriber->port_ref;
subscriber->port_ref = 0;
spin_unlock_bh(subscriber->lock);
spin_lock_bh(&subscriber->lock);
/* Sever connection to subscriber */
tipc_shutdown(port_ref);
tipc_deleteport(port_ref);
/* Invalidate subscriber reference */
subscriber->conid = 0;
/* Destroy any existing subscriptions for subscriber */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
if (sub->timeout != TIPC_WAIT_FOREVER) {
spin_unlock_bh(&subscriber->lock);
k_cancel_timer(&sub->timer);
k_term_timer(&sub->timer);
spin_lock_bh(&subscriber->lock);
}
subscr_del(sub);
}
/* Remove subscriber from topology server's subscriber list */
spin_lock_bh(&topsrv.lock);
list_del(&subscriber->subscriber_list);
spin_unlock_bh(&topsrv.lock);
/* Reclaim subscriber lock */
spin_lock_bh(subscriber->lock);
spin_unlock_bh(&subscriber->lock);
/* Now destroy subscriber */
kfree(subscriber);
@ -247,7 +244,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
/**
* subscr_cancel - handle subscription cancellation request
*
* Called with subscriber port locked. Routine must temporarily release lock
* Called with subscriber lock held. Routine must temporarily release lock
* to enable the subscription timeout routine to finish without deadlocking;
* the lock is then reclaimed to allow caller to release it upon return.
*
@ -274,10 +271,10 @@ static void subscr_cancel(struct tipc_subscr *s,
/* Cancel subscription timer (if used), then delete subscription */
if (sub->timeout != TIPC_WAIT_FOREVER) {
sub->timeout = TIPC_WAIT_FOREVER;
spin_unlock_bh(subscriber->lock);
spin_unlock_bh(&subscriber->lock);
k_cancel_timer(&sub->timer);
k_term_timer(&sub->timer);
spin_lock_bh(subscriber->lock);
spin_lock_bh(&subscriber->lock);
}
subscr_del(sub);
}
@ -285,7 +282,7 @@ static void subscr_cancel(struct tipc_subscr *s,
/**
* subscr_subscribe - create subscription for subscriber
*
* Called with subscriber port locked.
* Called with subscriber lock held.
*/
static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
@ -304,7 +301,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
/* Refuse subscription if global limit exceeded */
if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
pr_warn("Subscription rejected, limit reached (%u)\n",
TIPC_MAX_SUBSCRIPTIONS);
subscr_terminate(subscriber);
@ -335,10 +332,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
INIT_LIST_HEAD(&sub->nameseq_list);
list_add(&sub->subscription_list, &subscriber->subscription_list);
sub->server_ref = subscriber->port_ref;
sub->subscriber = subscriber;
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
atomic_inc(&topsrv.subscription_count);
atomic_inc(&subscription_count);
if (sub->timeout != TIPC_WAIT_FOREVER) {
k_init_timer(&sub->timer,
(Handler)subscr_timeout, (unsigned long)sub);
@ -348,196 +345,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
return sub;
}
/**
* subscr_conn_shutdown_event - handle termination request from subscriber
*
* Called with subscriber's server port unlocked.
*/
static void subscr_conn_shutdown_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
unsigned char const *data,
unsigned int size,
int reason)
/* Handle one termination request for the subscriber */
static void subscr_conn_shutdown_event(int conid, void *usr_data)
{
struct tipc_subscriber *subscriber = usr_handle;
spinlock_t *subscriber_lock;
if (tipc_port_lock(port_ref) == NULL)
return;
subscriber_lock = subscriber->lock;
subscr_terminate(subscriber);
spin_unlock_bh(subscriber_lock);
subscr_release((struct tipc_subscriber *)usr_data);
}
/**
* subscr_conn_msg_event - handle new subscription request from subscriber
*
* Called with subscriber's server port unlocked.
*/
static void subscr_conn_msg_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
const unchar *data,
u32 size)
/* Handle one request to create a new subscription for the subscriber */
static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
void *usr_data, void *buf, size_t len)
{
struct tipc_subscriber *subscriber = usr_handle;
spinlock_t *subscriber_lock;
struct tipc_subscriber *subscriber = usr_data;
struct tipc_subscription *sub;
/*
* Lock subscriber's server port (& make a local copy of lock pointer,
* in case subscriber is deleted while processing subscription request)
*/
if (tipc_port_lock(port_ref) == NULL)
return;
subscriber_lock = subscriber->lock;
if (size != sizeof(struct tipc_subscr)) {
subscr_terminate(subscriber);
spin_unlock_bh(subscriber_lock);
} else {
sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
spin_unlock_bh(subscriber_lock);
if (sub != NULL) {
/*
* We must release the server port lock before adding a
* subscription to the name table since TIPC needs to be
* able to (re)acquire the port lock if an event message
* issued by the subscription process is rejected and
* returned. The subscription cannot be deleted while
* it is being added to the name table because:
* a) the single-threading of the native API port code
* ensures the subscription cannot be cancelled and
* the subscriber connection cannot be broken, and
* b) the name table lock ensures the subscription
* timeout code cannot delete the subscription,
* so the subscription object is still protected.
*/
tipc_nametbl_subscribe(sub);
}
}
spin_lock_bh(&subscriber->lock);
sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
if (sub)
tipc_nametbl_subscribe(sub);
spin_unlock_bh(&subscriber->lock);
}
/**
* subscr_named_msg_event - handle request to establish a new subscriber
*/
static void subscr_named_msg_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
const unchar *data,
u32 size,
u32 importance,
struct tipc_portid const *orig,
struct tipc_name_seq const *dest)
/* Handle one request to establish a new subscriber */
static void *subscr_named_msg_event(int conid)
{
struct tipc_subscriber *subscriber;
u32 server_port_ref;
/* Create subscriber object */
subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
if (subscriber == NULL) {
pr_warn("Subscriber rejected, no memory\n");
return;
return NULL;
}
INIT_LIST_HEAD(&subscriber->subscription_list);
INIT_LIST_HEAD(&subscriber->subscriber_list);
subscriber->conid = conid;
spin_lock_init(&subscriber->lock);
/* Create server port & establish connection to subscriber */
tipc_createport(subscriber,
importance,
NULL,
NULL,
subscr_conn_shutdown_event,
NULL,
NULL,
subscr_conn_msg_event,
NULL,
&subscriber->port_ref);
if (subscriber->port_ref == 0) {
pr_warn("Subscriber rejected, unable to create port\n");
kfree(subscriber);
return;
}
tipc_connect(subscriber->port_ref, orig);
/* Lock server port (& save lock address for future use) */
subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
/* Add subscriber to topology server's subscriber list */
spin_lock_bh(&topsrv.lock);
list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
spin_unlock_bh(&topsrv.lock);
/* Unlock server port */
server_port_ref = subscriber->port_ref;
spin_unlock_bh(subscriber->lock);
/* Send an ACK- to complete connection handshaking */
tipc_send(server_port_ref, 0, NULL, 0);
/* Handle optional subscription request */
if (size != 0) {
subscr_conn_msg_event(subscriber, server_port_ref,
buf, data, size);
}
return (void *)subscriber;
}
int tipc_subscr_start(void)
{
struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
int res;
spin_lock_init(&topsrv.lock);
INIT_LIST_HEAD(&topsrv.subscriber_list);
res = tipc_createport(NULL,
TIPC_CRITICAL_IMPORTANCE,
NULL,
NULL,
NULL,
NULL,
subscr_named_msg_event,
NULL,
NULL,
&topsrv.setup_port);
if (res)
goto failed;
res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
if (res) {
tipc_deleteport(topsrv.setup_port);
topsrv.setup_port = 0;
goto failed;
}
return 0;
failed:
pr_err("Failed to create subscription service\n");
return res;
return tipc_server_start(&topsrv);
}
void tipc_subscr_stop(void)
{
struct tipc_subscriber *subscriber;
struct tipc_subscriber *subscriber_temp;
spinlock_t *subscriber_lock;
if (topsrv.setup_port) {
tipc_deleteport(topsrv.setup_port);
topsrv.setup_port = 0;
list_for_each_entry_safe(subscriber, subscriber_temp,
&topsrv.subscriber_list,
subscriber_list) {
subscriber_lock = subscriber->lock;
spin_lock_bh(subscriber_lock);
subscr_terminate(subscriber);
spin_unlock_bh(subscriber_lock);
}
}
tipc_server_stop(&topsrv);
}

View file

@ -2,7 +2,7 @@
* net/tipc/subscr.h: Include file for TIPC network topology service
*
* Copyright (c) 2003-2006, Ericsson AB
* Copyright (c) 2005-2007, Wind River Systems
* Copyright (c) 2005-2007, 2012-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -37,10 +37,14 @@
#ifndef _TIPC_SUBSCR_H
#define _TIPC_SUBSCR_H
#include "server.h"
struct tipc_subscription;
struct tipc_subscriber;
/**
* struct tipc_subscription - TIPC network topology subscription object
* @subscriber: pointer to its subscriber
* @seq: name sequence associated with subscription
* @timeout: duration of subscription (in ms)
* @filter: event filtering to be done for subscription
@ -52,13 +56,13 @@ struct tipc_subscription;
* @evt: template for events generated by subscription
*/
struct tipc_subscription {
struct tipc_subscriber *subscriber;
struct tipc_name_seq seq;
u32 timeout;
u32 filter;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscription_list;
u32 server_ref;
int swap;
struct tipc_event evt;
};