hc
2024-01-05 071106ecf68c401173c58808b1cf5f68cc50d390
kernel/net/tipc/name_distr.c
....@@ -102,7 +102,8 @@
102102 pr_warn("Publication distribution failure\n");
103103 return NULL;
104104 }
105
-
105
+ msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
106
+ msg_set_non_legacy(buf_msg(skb));
106107 item = (struct distr_item *)msg_data(buf_msg(skb));
107108 publ_to_item(item, publ);
108109 return skb;
....@@ -114,8 +115,8 @@
114115 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
115116 {
116117 struct name_table *nt = tipc_name_table(net);
117
- struct sk_buff *buf;
118118 struct distr_item *item;
119
+ struct sk_buff *skb;
119120
120121 write_lock_bh(&nt->cluster_scope_lock);
121122 list_del(&publ->binding_node);
....@@ -123,15 +124,16 @@
123124 if (publ->scope == TIPC_NODE_SCOPE)
124125 return NULL;
125126
126
- buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
127
- if (!buf) {
127
+ skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
128
+ if (!skb) {
128129 pr_warn("Withdrawal distribution failure\n");
129130 return NULL;
130131 }
131
-
132
- item = (struct distr_item *)msg_data(buf_msg(buf));
132
+ msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
133
+ msg_set_non_legacy(buf_msg(skb));
134
+ item = (struct distr_item *)msg_data(buf_msg(skb));
133135 publ_to_item(item, publ);
134
- return buf;
136
+ return skb;
135137 }
136138
137139 /**
....@@ -141,14 +143,15 @@
141143 * @pls: linked list of publication items to be packed into buffer chain
142144 */
143145 static void named_distribute(struct net *net, struct sk_buff_head *list,
144
- u32 dnode, struct list_head *pls)
146
+ u32 dnode, struct list_head *pls, u16 seqno)
145147 {
146148 struct publication *publ;
147149 struct sk_buff *skb = NULL;
148150 struct distr_item *item = NULL;
149
- u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0) - INT_H_SIZE) /
151
+ u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
150152 ITEM_SIZE) * ITEM_SIZE;
151153 u32 msg_rem = msg_dsz;
154
+ struct tipc_msg *hdr;
152155
153156 list_for_each_entry(publ, pls, binding_node) {
154157 /* Prepare next buffer: */
....@@ -159,8 +162,11 @@
159162 pr_warn("Bulk publication failure\n");
160163 return;
161164 }
162
- msg_set_bc_ack_invalid(buf_msg(skb), true);
163
- item = (struct distr_item *)msg_data(buf_msg(skb));
165
+ hdr = buf_msg(skb);
166
+ msg_set_bc_ack_invalid(hdr, true);
167
+ msg_set_bulk(hdr);
168
+ msg_set_non_legacy(hdr);
169
+ item = (struct distr_item *)msg_data(hdr);
164170 }
165171
166172 /* Pack publication into message: */
....@@ -176,24 +182,35 @@
176182 }
177183 }
178184 if (skb) {
179
- msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
185
+ hdr = buf_msg(skb);
186
+ msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
180187 skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
181188 __skb_queue_tail(list, skb);
182189 }
190
+ hdr = buf_msg(skb_peek_tail(list));
191
+ msg_set_last_bulk(hdr);
192
+ msg_set_named_seqno(hdr, seqno);
183193 }
184194
185195 /**
186196 * tipc_named_node_up - tell specified node about all publications by this node
187197 */
188
-void tipc_named_node_up(struct net *net, u32 dnode)
198
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
189199 {
190200 struct name_table *nt = tipc_name_table(net);
201
+ struct tipc_net *tn = tipc_net(net);
191202 struct sk_buff_head head;
203
+ u16 seqno;
192204
193205 __skb_queue_head_init(&head);
206
+ spin_lock_bh(&tn->nametbl_lock);
207
+ if (!(capabilities & TIPC_NAMED_BCAST))
208
+ nt->rc_dests++;
209
+ seqno = nt->snd_nxt;
210
+ spin_unlock_bh(&tn->nametbl_lock);
194211
195212 read_lock_bh(&nt->cluster_scope_lock);
196
- named_distribute(net, &head, dnode, &nt->cluster_scope);
213
+ named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
197214 tipc_node_xmit(net, &head, dnode, 0);
198215 read_unlock_bh(&nt->cluster_scope_lock);
199216 }
....@@ -245,13 +262,21 @@
245262 spin_unlock_bh(&tn->nametbl_lock);
246263 }
247264
248
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
265
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
266
+ u32 addr, u16 capabilities)
249267 {
268
+ struct name_table *nt = tipc_name_table(net);
269
+ struct tipc_net *tn = tipc_net(net);
270
+
250271 struct publication *publ, *tmp;
251272
252273 list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
253274 tipc_publ_purge(net, publ, addr);
254275 tipc_dist_queue_purge(net, addr);
276
+ spin_lock_bh(&tn->nametbl_lock);
277
+ if (!(capabilities & TIPC_NAMED_BCAST))
278
+ nt->rc_dests--;
279
+ spin_unlock_bh(&tn->nametbl_lock);
255280 }
256281
257282 /**
....@@ -295,29 +320,70 @@
295320 return false;
296321 }
297322
323
+static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
324
+ u16 *rcv_nxt, bool *open)
325
+{
326
+ struct sk_buff *skb, *tmp;
327
+ struct tipc_msg *hdr;
328
+ u16 seqno;
329
+
330
+ spin_lock_bh(&namedq->lock);
331
+ skb_queue_walk_safe(namedq, skb, tmp) {
332
+ if (unlikely(skb_linearize(skb))) {
333
+ __skb_unlink(skb, namedq);
334
+ kfree_skb(skb);
335
+ continue;
336
+ }
337
+ hdr = buf_msg(skb);
338
+ seqno = msg_named_seqno(hdr);
339
+ if (msg_is_last_bulk(hdr)) {
340
+ *rcv_nxt = seqno;
341
+ *open = true;
342
+ }
343
+
344
+ if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
345
+ __skb_unlink(skb, namedq);
346
+ spin_unlock_bh(&namedq->lock);
347
+ return skb;
348
+ }
349
+
350
+ if (*open && (*rcv_nxt == seqno)) {
351
+ (*rcv_nxt)++;
352
+ __skb_unlink(skb, namedq);
353
+ spin_unlock_bh(&namedq->lock);
354
+ return skb;
355
+ }
356
+
357
+ if (less(seqno, *rcv_nxt)) {
358
+ __skb_unlink(skb, namedq);
359
+ kfree_skb(skb);
360
+ continue;
361
+ }
362
+ }
363
+ spin_unlock_bh(&namedq->lock);
364
+ return NULL;
365
+}
366
+
298367 /**
299368 * tipc_named_rcv - process name table update messages sent by another node
300369 */
301
-void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
370
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
371
+ u16 *rcv_nxt, bool *open)
302372 {
303
- struct tipc_net *tn = net_generic(net, tipc_net_id);
304
- struct tipc_msg *msg;
373
+ struct tipc_net *tn = tipc_net(net);
305374 struct distr_item *item;
306
- uint count;
307
- u32 node;
375
+ struct tipc_msg *hdr;
308376 struct sk_buff *skb;
309
- int mtype;
377
+ u32 count, node;
310378
311379 spin_lock_bh(&tn->nametbl_lock);
312
- for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
313
- skb_linearize(skb);
314
- msg = buf_msg(skb);
315
- mtype = msg_type(msg);
316
- item = (struct distr_item *)msg_data(msg);
317
- count = msg_data_sz(msg) / ITEM_SIZE;
318
- node = msg_orignode(msg);
380
+ while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
381
+ hdr = buf_msg(skb);
382
+ node = msg_orignode(hdr);
383
+ item = (struct distr_item *)msg_data(hdr);
384
+ count = msg_data_sz(hdr) / ITEM_SIZE;
319385 while (count--) {
320
- tipc_update_nametbl(net, item, node, mtype);
386
+ tipc_update_nametbl(net, item, node, msg_type(hdr));
321387 item++;
322388 }
323389 kfree_skb(skb);
....@@ -345,6 +411,6 @@
345411 publ->node = self;
346412 list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
347413 publ->node = self;
348
-
414
+ nt->rc_dests = 0;
349415 spin_unlock_bh(&tn->nametbl_lock);
350416 }