hc
2024-12-19 9370bb92b2d16684ee45cf24e879c93c509162da
kernel/net/tipc/bcast.c
....@@ -46,6 +46,7 @@
4646 #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
4747
4848 const char tipc_bclink_name[] = "broadcast-link";
49
+unsigned long sysctl_tipc_bc_retruni __read_mostly;
4950
5051 /**
5152 * struct tipc_bc_base - base structure for keeping broadcast send state
....@@ -54,7 +55,9 @@
5455 * @dests: array keeping number of reachable destinations per bearer
5556 * @primary_bearer: a bearer having links to all broadcast destinations, if any
5657 * @bcast_support: indicates if primary bearer, if any, supports broadcast
58
+ * @force_bcast: forces broadcast for multicast traffic
5759 * @rcast_support: indicates if all peer nodes support replicast
60
+ * @force_rcast: forces replicast for multicast traffic
5861 * @rc_ratio: dest count as percentage of cluster size where send method changes
5962 * @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast
6063 */
....@@ -64,7 +67,9 @@
6467 int dests[MAX_BEARERS];
6568 int primary_bearer;
6669 bool bcast_support;
70
+ bool force_bcast;
6771 bool rcast_support;
72
+ bool force_rcast;
6873 int rc_ratio;
6974 int bc_threshold;
7075 };
....@@ -80,12 +85,12 @@
8085 */
8186 int tipc_bcast_get_mtu(struct net *net)
8287 {
83
- return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
88
+ return tipc_link_mss(tipc_bc_sndlink(net));
8489 }
8590
86
-void tipc_bcast_disable_rcast(struct net *net)
91
+void tipc_bcast_toggle_rcast(struct net *net, bool supp)
8792 {
88
- tipc_bc_base(net)->rcast_support = false;
93
+ tipc_bc_base(net)->rcast_support = supp;
8994 }
9095
9196 static void tipc_bcbase_calc_bc_threshold(struct net *net)
....@@ -103,6 +108,8 @@
103108 {
104109 struct tipc_bc_base *bb = tipc_bc_base(net);
105110 int all_dests = tipc_link_bc_peers(bb->link);
111
+ int max_win = tipc_link_max_win(bb->link);
112
+ int min_win = tipc_link_min_win(bb->link);
106113 int i, mtu, prim;
107114
108115 bb->primary_bearer = INVALID_BEARER_ID;
....@@ -116,8 +123,12 @@
116123 continue;
117124
118125 mtu = tipc_bearer_mtu(net, i);
119
- if (mtu < tipc_link_mtu(bb->link))
126
+ if (mtu < tipc_link_mtu(bb->link)) {
120127 tipc_link_set_mtu(bb->link, mtu);
128
+ tipc_link_set_queue_limits(bb->link,
129
+ min_win,
130
+ max_win);
131
+ }
121132 bb->bcast_support &= tipc_bearer_bcast_support(net, i);
122133 if (bb->dests[i] < all_dests)
123134 continue;
....@@ -216,9 +227,24 @@
216227 }
217228 /* Can current method be changed ? */
218229 method->expires = jiffies + TIPC_METHOD_EXPIRE;
219
- if (method->mandatory || time_before(jiffies, exp))
230
+ if (method->mandatory)
220231 return;
221232
233
+ if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
234
+ time_before(jiffies, exp))
235
+ return;
236
+
237
+ /* Configuration as force 'broadcast' method */
238
+ if (bb->force_bcast) {
239
+ method->rcast = false;
240
+ return;
241
+ }
242
+ /* Configuration as force 'replicast' method */
243
+ if (bb->force_rcast) {
244
+ method->rcast = true;
245
+ return;
246
+ }
247
+ /* Configuration as 'autoselect' or default method */
222248 /* Determine method to use now */
223249 method->rcast = dests <= bb->bc_threshold;
224250 }
....@@ -230,8 +256,8 @@
230256 * Consumes the buffer chain.
231257 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
232258 */
233
-static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
234
- u16 *cong_link_cnt)
259
+int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
260
+ u16 *cong_link_cnt)
235261 {
236262 struct tipc_link *l = tipc_bc_sndlink(net);
237263 struct sk_buff_head xmitq;
....@@ -281,6 +307,64 @@
281307 return 0;
282308 }
283309
310
+/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
311
+ * @net: the applicable net namespace
312
+ * @skb: socket buffer to copy
313
+ * @method: send method to be used
314
+ * @dests: destination nodes for message.
315
+ * Returns 0 if success, otherwise errno
316
+ */
317
+static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
318
+ struct tipc_mc_method *method,
319
+ struct tipc_nlist *dests)
320
+{
321
+ struct tipc_msg *hdr, *_hdr;
322
+ struct sk_buff_head tmpq;
323
+ struct sk_buff *_skb;
324
+ u16 cong_link_cnt;
325
+ int rc = 0;
326
+
327
+ /* Is a cluster supporting with new capabilities ? */
328
+ if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
329
+ return 0;
330
+
331
+ hdr = buf_msg(skb);
332
+ if (msg_user(hdr) == MSG_FRAGMENTER)
333
+ hdr = msg_inner_hdr(hdr);
334
+ if (msg_type(hdr) != TIPC_MCAST_MSG)
335
+ return 0;
336
+
337
+ /* Allocate dummy message */
338
+ _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
339
+ if (!_skb)
340
+ return -ENOMEM;
341
+
342
+ /* Preparing for 'synching' header */
343
+ msg_set_syn(hdr, 1);
344
+
345
+ /* Copy skb's header into a dummy header */
346
+ skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
347
+ skb_orphan(_skb);
348
+
349
+ /* Reverse method for dummy message */
350
+ _hdr = buf_msg(_skb);
351
+ msg_set_size(_hdr, MCAST_H_SIZE);
352
+ msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
353
+ msg_set_errcode(_hdr, TIPC_ERR_NO_PORT);
354
+
355
+ __skb_queue_head_init(&tmpq);
356
+ __skb_queue_tail(&tmpq, _skb);
357
+ if (method->rcast)
358
+ rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt);
359
+ else
360
+ rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt);
361
+
362
+ /* This queue should normally be empty by now */
363
+ __skb_queue_purge(&tmpq);
364
+
365
+ return rc;
366
+}
367
+
284368 /* tipc_mcast_xmit - deliver message to indicated destination nodes
285369 * and to identified node local sockets
286370 * @net: the applicable net namespace
....@@ -296,6 +380,9 @@
296380 u16 *cong_link_cnt)
297381 {
298382 struct sk_buff_head inputq, localq;
383
+ bool rcast = method->rcast;
384
+ struct tipc_msg *hdr;
385
+ struct sk_buff *skb;
299386 int rc = 0;
300387
301388 skb_queue_head_init(&inputq);
....@@ -309,14 +396,33 @@
309396 /* Send according to determined transmit method */
310397 if (dests->remote) {
311398 tipc_bcast_select_xmit_method(net, dests->remote, method);
399
+
400
+ skb = skb_peek(pkts);
401
+ hdr = buf_msg(skb);
402
+ if (msg_user(hdr) == MSG_FRAGMENTER)
403
+ hdr = msg_inner_hdr(hdr);
404
+ msg_set_is_rcast(hdr, method->rcast);
405
+
406
+ /* Switch method ? */
407
+ if (rcast != method->rcast) {
408
+ rc = tipc_mcast_send_sync(net, skb, method, dests);
409
+ if (unlikely(rc)) {
410
+ pr_err("Unable to send SYN: method %d, rc %d\n",
411
+ rcast, rc);
412
+ goto exit;
413
+ }
414
+ }
415
+
312416 if (method->rcast)
313417 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
314418 else
315419 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
316420 }
317421
318
- if (dests->local)
422
+ if (dests->local) {
423
+ tipc_loopback_trace(net, &localq);
319424 tipc_sk_mcast_rcv(net, &localq, &inputq);
425
+ }
320426 exit:
321427 /* This queue should normally be empty by now */
322428 __skb_queue_purge(pkts);
....@@ -375,7 +481,7 @@
375481 __skb_queue_head_init(&xmitq);
376482
377483 tipc_bcast_lock(net);
378
- tipc_link_bc_ack_rcv(l, acked, &xmitq);
484
+ tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
379485 tipc_bcast_unlock(net);
380486
381487 tipc_bcbase_xmit(net, &xmitq);
....@@ -390,9 +496,11 @@
390496 * RCU is locked, no other locks set
391497 */
392498 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
393
- struct tipc_msg *hdr)
499
+ struct tipc_msg *hdr,
500
+ struct sk_buff_head *retrq)
394501 {
395502 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
503
+ struct tipc_gap_ack_blks *ga;
396504 struct sk_buff_head xmitq;
397505 int rc = 0;
398506
....@@ -402,8 +510,13 @@
402510 if (msg_type(hdr) != STATE_MSG) {
403511 tipc_link_bc_init_rcv(l, hdr);
404512 } else if (!msg_bc_ack_invalid(hdr)) {
405
- tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
406
- rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
513
+ tipc_get_gap_ack_blks(&ga, l, hdr, false);
514
+ if (!sysctl_tipc_bc_retruni)
515
+ retrq = &xmitq;
516
+ rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
517
+ msg_bc_gap(hdr), ga, &xmitq,
518
+ retrq);
519
+ rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
407520 }
408521 tipc_bcast_unlock(net);
409522
....@@ -456,10 +569,8 @@
456569 tipc_sk_rcv(net, inputq);
457570 }
458571
459
-int tipc_bclink_reset_stats(struct net *net)
572
+int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l)
460573 {
461
- struct tipc_link *l = tipc_bc_sndlink(net);
462
-
463574 if (!l)
464575 return -ENOPROTOOPT;
465576
....@@ -469,19 +580,70 @@
469580 return 0;
470581 }
471582
472
-static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
583
+static int tipc_bc_link_set_queue_limits(struct net *net, u32 max_win)
473584 {
474585 struct tipc_link *l = tipc_bc_sndlink(net);
475586
476587 if (!l)
477588 return -ENOPROTOOPT;
478
- if (limit < BCLINK_WIN_MIN)
479
- limit = BCLINK_WIN_MIN;
480
- if (limit > TIPC_MAX_LINK_WIN)
589
+ if (max_win < BCLINK_WIN_MIN)
590
+ max_win = BCLINK_WIN_MIN;
591
+ if (max_win > TIPC_MAX_LINK_WIN)
481592 return -EINVAL;
482593 tipc_bcast_lock(net);
483
- tipc_link_set_queue_limits(l, limit);
594
+ tipc_link_set_queue_limits(l, tipc_link_min_win(l), max_win);
484595 tipc_bcast_unlock(net);
596
+ return 0;
597
+}
598
+
599
+static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode)
600
+{
601
+ struct tipc_bc_base *bb = tipc_bc_base(net);
602
+
603
+ switch (bc_mode) {
604
+ case BCLINK_MODE_BCAST:
605
+ if (!bb->bcast_support)
606
+ return -ENOPROTOOPT;
607
+
608
+ bb->force_bcast = true;
609
+ bb->force_rcast = false;
610
+ break;
611
+ case BCLINK_MODE_RCAST:
612
+ if (!bb->rcast_support)
613
+ return -ENOPROTOOPT;
614
+
615
+ bb->force_bcast = false;
616
+ bb->force_rcast = true;
617
+ break;
618
+ case BCLINK_MODE_SEL:
619
+ if (!bb->bcast_support || !bb->rcast_support)
620
+ return -ENOPROTOOPT;
621
+
622
+ bb->force_bcast = false;
623
+ bb->force_rcast = false;
624
+ break;
625
+ default:
626
+ return -EINVAL;
627
+ }
628
+
629
+ return 0;
630
+}
631
+
632
+static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio)
633
+{
634
+ struct tipc_bc_base *bb = tipc_bc_base(net);
635
+
636
+ if (!bb->bcast_support || !bb->rcast_support)
637
+ return -ENOPROTOOPT;
638
+
639
+ if (bc_ratio > 100 || bc_ratio <= 0)
640
+ return -EINVAL;
641
+
642
+ bb->rc_ratio = bc_ratio;
643
+ tipc_bcast_lock(net);
644
+ tipc_bcbase_calc_bc_threshold(net);
645
+ tipc_bcast_unlock(net);
646
+
485647 return 0;
486648 }
487649
....@@ -489,6 +651,8 @@
489651 {
490652 int err;
491653 u32 win;
654
+ u32 bc_mode;
655
+ u32 bc_ratio;
492656 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
493657
494658 if (!attrs[TIPC_NLA_LINK_PROP])
....@@ -498,12 +662,28 @@
498662 if (err)
499663 return err;
500664
501
- if (!props[TIPC_NLA_PROP_WIN])
665
+ if (!props[TIPC_NLA_PROP_WIN] &&
666
+ !props[TIPC_NLA_PROP_BROADCAST] &&
667
+ !props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
502668 return -EOPNOTSUPP;
669
+ }
503670
504
- win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
671
+ if (props[TIPC_NLA_PROP_BROADCAST]) {
672
+ bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]);
673
+ err = tipc_bc_link_set_broadcast_mode(net, bc_mode);
674
+ }
505675
506
- return tipc_bc_link_set_queue_limits(net, win);
676
+ if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
677
+ bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]);
678
+ err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio);
679
+ }
680
+
681
+ if (!err && props[TIPC_NLA_PROP_WIN]) {
682
+ win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
683
+ err = tipc_bc_link_set_queue_limits(net, win);
684
+ }
685
+
686
+ return err;
507687 }
508688
509689 int tipc_bcast_init(struct net *net)
....@@ -518,8 +698,9 @@
518698 tn->bcbase = bb;
519699 spin_lock_init(&tipc_net(net)->bclock);
520700
521
- if (!tipc_link_bc_create(net, 0, 0,
522
- FB_MTU,
701
+ if (!tipc_link_bc_create(net, 0, 0, NULL,
702
+ one_page_mtu,
703
+ BCLINK_WIN_DEFAULT,
523704 BCLINK_WIN_DEFAULT,
524705 0,
525706 &bb->inputq,
....@@ -529,7 +710,7 @@
529710 goto enomem;
530711 bb->link = l;
531712 tn->bcl = l;
532
- bb->rc_ratio = 25;
713
+ bb->rc_ratio = 10;
533714 bb->rcast_support = true;
534715 return 0;
535716 enomem:
....@@ -576,3 +757,108 @@
576757 nl->remote = 0;
577758 nl->local = false;
578759 }
760
+
761
+u32 tipc_bcast_get_mode(struct net *net)
762
+{
763
+ struct tipc_bc_base *bb = tipc_bc_base(net);
764
+
765
+ if (bb->force_bcast)
766
+ return BCLINK_MODE_BCAST;
767
+
768
+ if (bb->force_rcast)
769
+ return BCLINK_MODE_RCAST;
770
+
771
+ if (bb->bcast_support && bb->rcast_support)
772
+ return BCLINK_MODE_SEL;
773
+
774
+ return 0;
775
+}
776
+
777
+u32 tipc_bcast_get_broadcast_ratio(struct net *net)
778
+{
779
+ struct tipc_bc_base *bb = tipc_bc_base(net);
780
+
781
+ return bb->rc_ratio;
782
+}
783
+
784
+void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
785
+ struct sk_buff_head *inputq)
786
+{
787
+ struct sk_buff *skb, *_skb, *tmp;
788
+ struct tipc_msg *hdr, *_hdr;
789
+ bool match = false;
790
+ u32 node, port;
791
+
792
+ skb = skb_peek(inputq);
793
+ if (!skb)
794
+ return;
795
+
796
+ hdr = buf_msg(skb);
797
+
798
+ if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
799
+ return;
800
+
801
+ node = msg_orignode(hdr);
802
+ if (node == tipc_own_addr(net))
803
+ return;
804
+
805
+ port = msg_origport(hdr);
806
+
807
+ /* Has the twin SYN message already arrived ? */
808
+ skb_queue_walk(defq, _skb) {
809
+ _hdr = buf_msg(_skb);
810
+ if (msg_orignode(_hdr) != node)
811
+ continue;
812
+ if (msg_origport(_hdr) != port)
813
+ continue;
814
+ match = true;
815
+ break;
816
+ }
817
+
818
+ if (!match) {
819
+ if (!msg_is_syn(hdr))
820
+ return;
821
+ __skb_dequeue(inputq);
822
+ __skb_queue_tail(defq, skb);
823
+ return;
824
+ }
825
+
826
+ /* Deliver non-SYN message from other link, otherwise queue it */
827
+ if (!msg_is_syn(hdr)) {
828
+ if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
829
+ return;
830
+ __skb_dequeue(inputq);
831
+ __skb_queue_tail(defq, skb);
832
+ return;
833
+ }
834
+
835
+ /* Queue non-SYN/SYN message from same link */
836
+ if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
837
+ __skb_dequeue(inputq);
838
+ __skb_queue_tail(defq, skb);
839
+ return;
840
+ }
841
+
842
+ /* Matching SYN messages => return the one with data, if any */
843
+ __skb_unlink(_skb, defq);
844
+ if (msg_data_sz(hdr)) {
845
+ kfree_skb(_skb);
846
+ } else {
847
+ __skb_dequeue(inputq);
848
+ kfree_skb(skb);
849
+ __skb_queue_tail(inputq, _skb);
850
+ }
851
+
852
+ /* Deliver subsequent non-SYN messages from same peer */
853
+ skb_queue_walk_safe(defq, _skb, tmp) {
854
+ _hdr = buf_msg(_skb);
855
+ if (msg_orignode(_hdr) != node)
856
+ continue;
857
+ if (msg_origport(_hdr) != port)
858
+ continue;
859
+ if (msg_is_syn(_hdr))
860
+ break;
861
+ __skb_unlink(_skb, defq);
862
+ __skb_queue_tail(inputq, _skb);
863
+ }
864
+}