summaryrefslogtreecommitdiff
path: root/net/tipc/group.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c47
1 files changed, 41 insertions, 6 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 985e0ce32e8e..eab862e047dc 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
u16 advertised;
u16 window;
u16 bc_rcv_nxt;
+ u16 bc_acked;
bool usr_pending;
};
@@ -87,6 +88,7 @@ struct tipc_group {
u32 portid;
u16 member_cnt;
u16 bc_snd_nxt;
+ u16 bc_ackers;
bool loopback;
bool events;
};
@@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
m->group = grp;
m->node = node;
m->port = port;
+ m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++;
tipc_group_add_to_tree(grp, m);
tipc_nlist_add(&grp->dests, m->node);
@@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp,
{
rb_erase(&m->tree_node, &grp->members);
grp->member_cnt--;
+
+ /* Check if we were waiting for replicast ack from this member */
+ if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
+ grp->bc_ackers--;
+
list_del_init(&m->list);
list_del_init(&m->congested);
@@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len)
list_add_tail(&m->congested, &grp->congested);
}
-void tipc_group_update_bc_members(struct tipc_group *grp, int len)
+void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
{
+ u16 prev = grp->bc_snd_nxt - 1;
struct tipc_member *m;
struct rb_node *n;
for (n = rb_first(&grp->members); n; n = rb_next(n)) {
m = container_of(n, struct tipc_member, tree_node);
- if (tipc_group_is_enabled(m))
+ if (tipc_group_is_enabled(m)) {
tipc_group_update_member(m, len);
+ m->bc_acked = prev;
+ }
}
+
+ /* Mark number of acknowledges to expect, if any */
+ if (ack)
+ grp->bc_ackers = grp->member_cnt;
grp->bc_snd_nxt++;
}
@@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
{
struct tipc_member *m = NULL;
+ /* If prev bcast was replicast, reject until all receivers have acked */
+ if (grp->bc_ackers)
+ return true;
+
if (list_empty(&grp->congested))
return false;
@@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
struct sk_buff *_skb, *tmp;
int mtyp = msg_type(hdr);
- /* Bcast may be bypassed by unicast, - sort it in */
+ /* Bcast may be bypassed by unicast or other bcast, - sort it in */
if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb);
@@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
+ bool ack, deliver, update;
struct sk_buff_head *defq;
struct tipc_member *m;
struct tipc_msg *hdr;
- bool deliver, update;
u32 node, port;
int mtyp, blks;
@@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
hdr = buf_msg(skb);
mtyp = msg_type(hdr);
deliver = true;
+ ack = false;
update = false;
if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
/* Fall thru */
case TIPC_GRP_BCAST_MSG:
m->bc_rcv_nxt++;
+ ack = msg_grp_bc_ack_req(hdr);
break;
case TIPC_GRP_UCAST_MSG:
break;
@@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
else
kfree_skb(skb);
+ if (ack)
+ tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
+
if (!update)
continue;
@@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
} else if (mtyp == GRP_ADV_MSG) {
msg_set_adv_win(hdr, adv);
m->advertised += adv;
+ } else if (mtyp == GRP_ACK_MSG) {
+ msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
}
__skb_queue_tail(xmitq, skb);
}
@@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
}
/* Otherwise deliver already received WITHDRAW event */
__skb_queue_tail(inputq, m->event_msg);
- *usr_wakeup = m->usr_pending;
+ *usr_wakeup = true;
tipc_group_delete_member(grp, m);
list_del_init(&m->congested);
return;
@@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
m->usr_pending = false;
list_del_init(&m->congested);
return;
+ case GRP_ACK_MSG:
+ if (!m)
+ return;
+ m->bc_acked = msg_grp_bc_acked(hdr);
+ if (--grp->bc_ackers)
+ break;
+ *usr_wakeup = true;
+ m->usr_pending = false;
+ return;
default:
pr_warn("Received unknown GROUP_PROTO message\n");
}
@@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
TIPC_SKB_CB(skb)->orig_member = m->instance;
- *usr_wakeup = m->usr_pending;
+ *usr_wakeup = true;
m->usr_pending = false;
/* Hold back event if more messages might be expected */