-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdisruptor.drawio
759 lines (759 loc) · 155 KB
/
disruptor.drawio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
<mxfile host="Electron" modified="2024-10-28T18:23:47.417Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/21.6.5 Chrome/114.0.5735.243 Electron/25.3.1 Safari/537.36" etag="sZo5zWSHy_cI_9s3xL13" version="21.6.5" type="device">
<diagram name="第 1 页" id="EZIj7stLJ7IglIkDNbZx">
<mxGraphModel dx="4168" dy="1034" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-67" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.75;entryY=1;entryDx=0;entryDy=0;dashed=1;fillColor=#60a917;strokeColor=#2D7600;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-55" target="ZpwxWgh1Q2fkPOIGbowT-20">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="2341" y="1710" />
<mxPoint x="1770" y="1710" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-68" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.75;exitY=0;exitDx=0;exitDy=0;entryX=0.75;entryY=0;entryDx=0;entryDy=0;dashed=1;fillColor=#60a917;strokeColor=#2D7600;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-55" target="ZpwxWgh1Q2fkPOIGbowT-19">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-1" value="<h1 style="font-size: 16px;"><font style="font-size: 16px;">Disruptor (4.0.0)</font></h1><div style=""><font style="">LMAX Disruptor 的定位是 线程间高性能的消息库。<br></font></div><div style="">核心源码 7K 行(70个源码文件),另外配了大量的测试代码(2W行)。</div><div style="">这里结合官方的几个Demo过一下源码流程。</div><div style="">注意 Disruptor 4.0.0 要求JDK版本最低为11,另外 4.x 和 3.x 变化还是挺大的。</div>" style="text;html=1;strokeColor=none;fillColor=none;spacing=5;spacingTop=-20;whiteSpace=wrap;overflow=hidden;rounded=0;" parent="1" vertex="1">
<mxGeometry x="40" y="10" width="440" height="90" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-4" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-2" target="fhfUE1HdMHP-IcORmnVZ-3" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-2" value="<b>LongEventMain</b><br><font color="#007fff">官方提供的第一个例子</font>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="40" y="230" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-9" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.45;exitDx=0;exitDy=0;exitPerimeter=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-3" target="fhfUE1HdMHP-IcORmnVZ-8" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="740" y="246" />
<mxPoint x="740" y="770" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-10" value="3" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-9" vertex="1" connectable="0">
<mxGeometry x="0.4417" y="-3" relative="1" as="geometry">
<mxPoint x="3" y="137" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-31" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.25;exitDx=0;exitDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-3" target="fhfUE1HdMHP-IcORmnVZ-32" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="750" y="160" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-63" value="1" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-31" vertex="1" connectable="0">
<mxGeometry x="0.555" y="-3" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-60" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1.002;exitY=0.376;exitDx=0;exitDy=0;exitPerimeter=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-3" target="fhfUE1HdMHP-IcORmnVZ-59" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="750" y="225" />
<mxPoint x="750" y="330" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-64" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-60" vertex="1" connectable="0">
<mxGeometry x="0.8247" y="-2" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-67" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.998;exitY=0.888;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitPerimeter=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-3" target="fhfUE1HdMHP-IcORmnVZ-66" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="730" y="369" />
<mxPoint x="730" y="1030" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-70" value="4" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-67" vertex="1" connectable="0">
<mxGeometry x="0.9327" y="1" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-3" value="<div><font color="#007fff">// 事件工厂,生产事件</font></div><div>LongEventFactory factory = new LongEventFactory();</div><div>int bufferSize = 1024;</div><div><font color="#007fff">// 1&nbsp;</font></div><div><span style="background-color: initial;">Disruptor&lt;LongEvent&gt; disruptor =</span><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; new <b>Disruptor</b>&lt;&gt;(factory, bufferSize, DaemonThreadFactory.INSTANCE);</div><div><font color="#007fff">// 2 绑定事件消费者 (这里实际是创建了两组消费者,第一组包含两个消费者)但从后面处理模式看这种实现只是<b>广播</b></font></div><div>disruptor.<b>handleEventsWith</b>(new LongEventHandler(1), new LongEventHandler(2));</div><div>disruptor.<b>handleEventsWith</b>(new LongEventHandler(3));</div><div><b style="color: rgb(0, 127, 255); background-color: initial;">// 3 启动 Disruptor,为每个消费者创建线程并启动</b><br></div><div>disruptor.<b>start</b>();</div><div><br></div><div>RingBuffer&lt;LongEvent&gt; ringBuffer = disruptor.<b>getRingBuffer</b>();</div><div>LongEventProducer producer = new <b>LongEventProducer</b>(ringBuffer);</div><div>ByteBuffer bb = ByteBuffer.allocate(8);</div><div>for (long l = 0; true; l++)&nbsp; <font color="#007fff">// 每隔1s发布一次 LongEvent</font></div><div>{</div><div>&nbsp; &nbsp; bb.putLong(0, l);</div><div><font color="#007fff"><b>&nbsp; &nbsp; // 4 发布 LongEvent</b></font></div><div>&nbsp; &nbsp; producer.<b>onData</b>(bb);</div><div>&nbsp; &nbsp; Thread.sleep(1000);</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=3;" parent="1" vertex="1">
<mxGeometry x="280" y="120" width="440" height="280" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-14" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-7" target="fhfUE1HdMHP-IcORmnVZ-13" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-39" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-7" target="fhfUE1HdMHP-IcORmnVZ-38" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-7" value="<p style="margin: 4px 0px 0px; text-align: center; font-size: 11px;"><b style="font-size: 11px;">Disruptor&lt;T&gt;</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">//消息发布的环形缓冲</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;">private final <b>RingBuffer</b>&lt;T&gt; <b>ringBuffer</b>;</p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 自定义线程工厂</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;">private final ThreadFactory threadFactory;</p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 里面主要是消费者集合</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;">private final <b>ConsumerRepository</b> <b>consumerRepository</b>;</p><p style="margin: 0px 0px 0px 4px; font-size: 11px;">private final AtomicBoolean started;</p><p style="margin: 0px 0px 0px 4px; font-size: 11px;">private ExceptionHandler&lt;? super T&gt; exceptionHandler;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">// 注册Event消费者组</font></p><p style="margin: 0px 0px 0px 4px;">public final <b>EventHandlerGroup</b>&lt;T&gt; <b>handleEventsWith</b>(EventHandler&lt;? super T&gt;... handlers)<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1">
<mxGeometry x="-440" y="520" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-12" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-8" target="fhfUE1HdMHP-IcORmnVZ-11" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-8" value="<div>public RingBuffer&lt;T&gt; start() {</div><div><font color="#007fff">&nbsp; &nbsp; // CAS + AtomicBoolean</font><font color="#007fff">状态字段防止重复启动</font></div><div>&nbsp; &nbsp; this.checkOnlyStartedOnce();</div><div><font color="#007fff">&nbsp; &nbsp; // 即遍历消费者集合为每个消费者通过&nbsp; threadFactory 创建线程并启动</font></div><div>&nbsp; &nbsp; this.<b>consumerRepository</b>.<b>startAll</b>(<span style="background-color: initial;">this.</span><b style="background-color: initial;">threadFactory</b><span style="background-color: initial;">);</span></div><div>&nbsp; &nbsp; return this.ringBuffer;</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=3;" parent="1" vertex="1">
<mxGeometry x="760" y="720" width="440" height="100" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-28" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-11" target="fhfUE1HdMHP-IcORmnVZ-27" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-11" value="<div>this.<b>consumerInfos</b>.forEach((c) -&gt; {</div><div>&nbsp; &nbsp; c.<b>start</b>(threadFactory);</div><div>});</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=9;" parent="1" vertex="1">
<mxGeometry x="1240" y="740" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-17" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-13" target="fhfUE1HdMHP-IcORmnVZ-16" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-13" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>ConsumerRepository</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff">消费者仓库</font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff"><br></font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">//这两个Map分别用于从 EventHandlerIndentity、Sequence 查找对应的EventProcessor</font></p><p style="margin: 0px 0px 0px 4px;">private final Map&lt;EventHandlerIdentity, EventProcessorInfo&gt; <br><b>&nbsp; &nbsp; eventProcessorInfoByEventHandler</b> = new IdentityHashMap();</p><p style="margin: 0px 0px 0px 4px;">private final Map&lt;Sequence, ConsumerInfo&gt; <b>eventProcessorInfoBySequence</b>&nbsp; <br>&nbsp; &nbsp; = new IdentityHashMap();</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 消费者集合</b></font></p><p style="margin: 0px 0px 0px 4px;">private final Collection&lt;ConsumerInfo&gt; <b>consumerInfos</b> = new ArrayList();</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#fad7ac;strokeColor=#b46504;" parent="1" vertex="1">
<mxGeometry x="-880" y="1000" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-15" value="ConsumerRepository" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="1270" y="710" width="140" height="30" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-16" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>ConsumerInfo (I)</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><b>消费者接口</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><br></font></p><p style="margin: 0px 0px 0px 4px;">Sequence[] getSequences();</p><p style="margin: 0px 0px 0px 4px;">SequenceBarrier getBarrier();</p><p style="margin: 0px 0px 0px 4px;">boolean isEndOfChain();</p><p style="margin: 0px 0px 0px 4px;">void start(ThreadFactory var1);</p><p style="margin: 0px 0px 0px 4px;">void halt();</p><p style="margin: 0px 0px 0px 4px;">void markAsUsedInBarrier();</p><p style="margin: 0px 0px 0px 4px;">boolean isRunning();</p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-1320" y="1000" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-19" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;dashed=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-18" target="fhfUE1HdMHP-IcORmnVZ-16" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-21" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-18" target="fhfUE1HdMHP-IcORmnVZ-20" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-26" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-18" target="fhfUE1HdMHP-IcORmnVZ-22" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-18" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>EventProcessorInfo</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><b>ConsumerInfo 唯一的实现类,定义消费者实现,包括事件处理器和序列屏障</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><br></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">//<b>事件处理器</b></font></p><p style="margin: 0px 0px 0px 4px;">private final <b>EventProcessor</b> <b>eventprocessor</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//<b>序列屏障</b>,保存消费者消费记录信息&nbsp;</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//每组消费者(EventHandlerGroup)的 SequcenBarrier 实例相同</font></p><p style="margin: 0px 0px 0px 4px;">private final <b>SequenceBarrier</b> <b>barrier</b>;</p><p style="margin: 0px 0px 0px 4px;">private boolean endOfChain = true;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="-1320" y="1300" width="400" height="200" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-20" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>EventProcessor</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="color: rgb(0, 127, 255); background-color: initial;"><b>事件处理器,真正处理事件的处理器</b></span></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="color: rgb(0, 127, 255); background-color: initial;"><br></span></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="color: rgb(0, 127, 255); background-color: initial;">重要方法:</span><br></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px;">Sequence getSequence();</p><p style="margin: 0px 0px 0px 4px;">void halt();</p><p style="margin: 0px 0px 0px 4px;">boolean isRunning();</p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-1760" y="1320" width="400" height="160" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-22" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>SequenceBarrier (I)</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font style="" color="#007fff"><b>序列屏障,</b>不太明白为何叫这个名字,看内部逻辑是用于<b>借助 WaitStrategy 实现消费者线程对事件的等待与唤醒</b>,以及<b>记录与消费者绑定的 RingBuffer 的 cursor 指针当被唤醒后提取可消费的消息,还有支持多消费者顺序消费</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;"><br></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 内部调用 WaitStrategy 等待方法</font></p><p style="margin: 0px 0px 0px 4px;">long waitFor(long var1) throws AlertException, InterruptedException, TimeoutException;</p><p style="margin: 0px 0px 0px 4px;">long getCursor();</p><p style="margin: 0px 0px 0px 4px;">boolean isAlerted();</p><p style="margin: 0px 0px 0px 4px;">void alert();</p><p style="margin: 0px 0px 0px 4px;">void clearAlert();</p><p style="margin: 0px 0px 0px 4px;">void checkAlert() throws AlertException;</p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-880" y="1280" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-25" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;dashed=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-23" target="fhfUE1HdMHP-IcORmnVZ-20" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-118" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.001;exitY=0.805;exitDx=0;exitDy=0;endArrow=open;endFill=0;exitPerimeter=0;" edge="1" parent="1" source="fhfUE1HdMHP-IcORmnVZ-23" target="ZpwxWgh1Q2fkPOIGbowT-117">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-23" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>BatchEventProcessor&lt;T&gt;</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff">事件批处理器,消费线程被唤醒后会取所有可用的事件进行遍历处理</font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 处理器状态</font></p><p style="margin: 0px 0px 0px 4px;">private static final int IDLE = 0;</p><p style="margin: 0px 0px 0px 4px;">private static final int HALTED = 1;</p><p style="margin: 0px 0px 0px 4px;">private static final int RUNNING = 2;</p><p style="margin: 0px 0px 0px 4px;">private final AtomicInteger running = new AtomicInteger(0);</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 处理除了超时、告警外的异常</font></p><p style="margin: 0px 0px 0px 4px;">private ExceptionHandler&lt;? super T&gt; <b>exceptionHandler</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 事件数据提供者,即绑定的 RingBuffer 实例</b></font></p><p style="margin: 0px 0px 0px 4px;">private final DataProvider&lt;T&gt; <b>dataProvider</b>;</p><p style="margin: 0px 0px 0px 4px;">private final SequenceBarrier <b>sequenceBarrier</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//&nbsp;<span style="background-color: initial;">RewindableEventHandler&lt;T&gt;&nbsp;&nbsp;</span></font><span style="background-color: initial;"><font color="#007fff">EventHandler&lt;T&gt;</font></span></p><p style="margin: 0px 0px 0px 4px;">private final EventHandlerBase&lt;? super T&gt; <b>eventHandler</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 批处理事件数量上限,默认 Integer.MAX_VALUE</font></p><p style="margin: 0px 0px 0px 4px;">private final int <b>batchLimitOffset</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 每个事件处理器自己的消费指针</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 13px;"><font style="font-size: 13px;">private final Sequence <b style="">sequence</b> = new Sequence(-1L);</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff" style="font-size: 11px;">// 倒带处理器,要么返回要到带到的索引要么抛出异常</font></p><p style="margin: 0px 0px 0px 4px;"><font style="font-size: 11px;">private final RewindHandler <b style="">rewindHandler</b>;</font></p><p style="margin: 0px 0px 0px 4px;">private int retriesAttempted = 0;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1">
<mxGeometry x="-1760" y="1520" width="400" height="360" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-84" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;dashed=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-27" target="fhfUE1HdMHP-IcORmnVZ-83" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-27" value="Thread thread = threadFactory<br>.<b>newThread</b>(this.eventprocessor);<br>...<br>thread.<b>start</b>();" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=9;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1480" y="740" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-29" value="EventProcessorInfo" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="1515" y="710" width="130" height="30" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-95" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-30" target="fhfUE1HdMHP-IcORmnVZ-94" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-30" value="MultiProducerSequencer sequencer = new <b>MultiProducerSequencer</b>(bufferSize, waitStrategy);<br>return new <b>RingBuffer</b>(factory, sequencer);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1720" y="120" width="440" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-33" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-80" target="fhfUE1HdMHP-IcORmnVZ-30" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-35" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-32" target="fhfUE1HdMHP-IcORmnVZ-34" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-32" value="this(RingBuffer.<b>createMultiProducer</b>(eventFactory, ringBufferSize), threadFactory);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;" parent="1" vertex="1">
<mxGeometry x="760" y="120" width="440" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-34" value="<div>this.consumerRepository = new <b>ConsumerRepository</b>();</div><div>this.started = new AtomicBoolean(false);</div><div>this.exceptionHandler = new ExceptionHandlerWrapper();</div><div>this.ringBuffer = <b>ringBuffer</b>;</div><div>this.threadFactory = threadFactory;</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1241" y="200" width="440" height="80" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-42" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-36" target="fhfUE1HdMHP-IcORmnVZ-41" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-46" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-36" target="fhfUE1HdMHP-IcORmnVZ-45" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-36" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>RingBufferFields&lt;E&gt;</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px;">private static final int BUFFER_PAD = 32;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 索引掩码</font></p><p style="margin: 0px 0px 0px 4px;">private final long indexMask;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 环形缓冲元素节点容器,E是存储的元素的数据类型, 发布的事件对象是先放到这里,这个数组的大小是 bufferSize + 2 * BUFFER_PAD, 因为要在前后都加上32字节的PAD留白</b></font></p><p style="margin: 0px 0px 0px 4px;">private final E[] <b>entries</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//缓存的容量,必须是2的幂指数</font></p><p style="margin: 0px 0px 0px 4px;">protected final int <b>bufferSize</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 意译为序列化器,用于标记缓冲当前<b>写指针、</b>各个消费者的<b>消费指针</b>、可用元素节点等信息</font></p><p style="margin: 0px 0px 0px 4px;">protected final <b>Sequencer</b> <b>sequencer</b>;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-880" y="240" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-40" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-38" target="fhfUE1HdMHP-IcORmnVZ-36" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-93" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.75;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;dashed=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-38" target="fhfUE1HdMHP-IcORmnVZ-92" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-38" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>RingBuffer</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff">环形缓冲</font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff"><br></font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">//&nbsp; 8 * 7 个字段(一共56个字节),TODO</font></p><p style="margin: 0px 0px 0px 4px;">protected byte&nbsp;p10, ..., p17, p20, ..., p77;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#fad7ac;strokeColor=#b46504;" parent="1" vertex="1">
<mxGeometry x="-880" y="520" width="400" height="160" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-41" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>RingBufferPad</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px;"><font style="" color="#007fff"><b>//&nbsp; 8 * 7 个字段(一共56个字节),TODO</b><br></font></p><p style="margin: 0px 0px 0px 4px;">protected byte p10, ..., p17, p20, ..., p77;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-880" y="120" width="400" height="80" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-44" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;dashed=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-43" target="fhfUE1HdMHP-IcORmnVZ-22" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-75" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-43" target="fhfUE1HdMHP-IcORmnVZ-74" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-43" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>ProcessingSequenceBarrier</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 等待策略,默认 BlockingWaitStrategy</font></p><p style="margin: 0px 0px 0px 4px;">private final WaitStrategy <b>waitStrategy</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 依赖的缓冲序列索引,默认和&nbsp;</font><font color="#007fff">&nbsp;cursorSequence 一致</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 当和cursorSequence不同时用于消费者的顺序消费</b></font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 当有多个依赖时使用 FixedSequenceGroup 包装(本质是 Sequence数组)</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 13px;"><font style="font-size: 13px;">private final Sequence <b style="">dependentSequence</b>;</font></p><p style="margin: 0px 0px 0px 4px;"><font style="font-size: 11px;" color="#007fff">// 是否有告警,反向搜索代码发现处理器抛异常会告警,Disruptor关闭时会告警</font></p><p style="margin: 0px 0px 0px 4px;">private volatile boolean <b>alerted</b> = false;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>// 即与当前消费者绑定的 RingBuffer 的 sequencer.cursor</b></font></p><p style="margin: 0px 0px 0px 4px;">private final Sequence <b>cursorSequence</b>;</p><p style="margin: 0px 0px 0px 4px;">private final Sequencer sequencer;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1">
<mxGeometry x="-880" y="1560" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-45" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Sequencer</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px;">void claim(long sequence);<br></p><p style="margin: 0px 0px 0px 4px;">boolean isAvailable(long sequence);<br></p><p style="margin: 0px 0px 0px 4px;">void addGatingSequences(Sequence... gatingSequences);<br></p><p style="margin: 0px 0px 0px 4px;"><b>SequenceBarrier</b> <b>newBarrier</b>(Sequence... sequencesToTrack);<br></p><p style="margin: 0px 0px 0px 4px;">long getMinimumSequence();<br></p><p style="margin: 0px 0px 0px 4px;">long getHighestPublishedSequence(long nextSequence, long availableSequence);<br></p><p style="margin: 0px 0px 0px 4px;">&lt;T&gt; EventPoller&lt;T&gt; newPoller(DataProvider&lt;T&gt; provider, Sequence... gatingSequences);<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-1320" y="280" width="400" height="160" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-48" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;dashed=1;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-47" target="fhfUE1HdMHP-IcORmnVZ-45" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-52" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;endArrow=open;endFill=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-47" target="fhfUE1HdMHP-IcORmnVZ-51" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-103" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.002;entryY=0.206;entryDx=0;entryDy=0;entryPerimeter=0;shape=link;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-47" target="fhfUE1HdMHP-IcORmnVZ-43" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-106" value="<font color="#007fff">等待唤醒通道</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-103" vertex="1" connectable="0">
<mxGeometry x="-0.6853" y="4" relative="1" as="geometry">
<mxPoint x="16" y="-13" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-47" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>AbstractSequencer</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px;">private static final AtomicReferenceFieldUpdater&lt;AbstractSequencer, Sequence[]&gt; SEQUENCE_UPDATER = ...</p><p style="margin: 0px 0px 0px 4px;">protected final int bufferSize;</p><p style="margin: 0px 0px 0px 4px;"><b><font color="#007fff">// 这个添加这个字段是为了在RingBuffer中有数据可消费时可以及时通知与RingBuffer绑定的所有消费者组进行消费</font></b></p><p style="margin: 0px 0px 0px 4px;">protected final WaitStrategy <b>waitStrategy</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// <b>当前缓冲写指针</b>,初始为-1是因为获取RingBuffer下一个元素指针能是通过next()获取的,</font><span style="color: rgb(0, 127, 255); background-color: initial;">next() 中通过 VarHandle 并发安全的执行 CAS getAndAdd(this, 1) 方法后再加1</span></p><p style="margin: 0px 0px 0px 4px; font-size: 13px;"><font style="font-size: 13px;">protected final <b style="">Sequence cursor</b> = new Sequence(-1L);</font></p><p style="margin: 0px 0px 0px 4px;"><font style="font-size: 11px;" color="#007fff">// 存储消费者的<b style="">消费指针</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 12px;"><font style="font-size: 12px;">protected volatile <b style="">Sequence[] gatingSequences</b> = new Sequence[0];</font></p><hr style="font-size: 12px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-1320" y="500" width="400" height="220" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-50" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-49" target="fhfUE1HdMHP-IcORmnVZ-47" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-49" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>MultiProducerSequencer</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="background-color: initial;">private static final VarHandle </span><b style="background-color: initial;">AVAILABLE_ARRAY</b><span style="background-color: initial;"> =&nbsp;</span><br></p><p style="margin: 0px 0px 0px 4px;">&nbsp; &nbsp; MethodHandles.arrayElementVarHandle(int[].class);</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>//存储所有消费者中消费最慢的消费者的消费值指针</b></font></p><p style="margin: 0px 0px 0px 4px;">private final Sequence <b>gatingSequenceCache</b> = new Sequence(-1L);</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">// 用于标记环形缓冲中哪些位置的元素是可用的,每往RingBuffer中添加一个值都要标记一下</font></p><font color="#007fff">&nbsp;// cursor 在生产者抢占位置成功后就移动了,但是发布数据可能失败,所以还需要这个标记下是否发布成功</font><p style="margin: 0px 0px 0px 4px;">private final int[] <b>availableBuffer</b>;</p><p style="margin: 0px 0px 0px 4px;">private final int indexMask;</p><p style="margin: 0px 0px 0px 4px;">private final int indexShift;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="-1320" y="760" width="400" height="200" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-56" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-51" target="fhfUE1HdMHP-IcORmnVZ-53" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-51" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Sequence <font color="#007fff">用作索引(指针)</font></b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">官方说使用Sequence作为一种手段来<b>识别特定组件的位置</b>。每个消费者(事件处理器)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前特性。事实上,两者之间<b>唯一的真实的区别是Sequence包含了额外的功能来防止Sequence和其他值之间的错误共享</b>。</font><br></p><p style="margin: 0px 0px 0px 4px;"><br></p><p style="margin: 0px 0px 0px 4px;">static final long INITIAL_VALUE = -1L;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//&nbsp;VarHandle 提供了一种类型安全的方式来访问和修改内存中的变量,避免了 Unsafe 类中的一些不安全操作</font></p><p style="margin: 0px 0px 0px 4px;">private static final <b>VarHandle</b> VALUE_FIELD;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="-1760" y="500" width="400" height="220" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-57" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-53" target="fhfUE1HdMHP-IcORmnVZ-55" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-53" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>RhsPadding</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff">RhsPadding 中也是56个字节字段, p90-p157,用于防止内存伪共享</font></b></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-1760" y="380" width="400" height="80" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-54" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>LhsPadding</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><font color="#007fff"><b>LhsPadding</b></font><b style="background-color: initial;"><font color="#007fff">中也是56个字节字段, p10-p77,用于防止内存伪共享<br></font></b><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-1760" y="140" width="400" height="80" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-58" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-55" target="fhfUE1HdMHP-IcORmnVZ-54" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-55" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Value</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><b style=""><font color="#007fff">// 3.x 版本是借助 volatile + UNSAFE<br>// 4.x 版本借助 VarHandle&nbsp;</font><br>protected long value;</b><br><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-1760" y="260" width="400" height="80" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-62" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-59" target="fhfUE1HdMHP-IcORmnVZ-61" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-59" value="<font color="#007fff">// 这里传空 Sequence 数组,后面创建 SequenceBarrier 实例的时候就是使用RingBuffer sequencer<br>实例的 cursor 作为 dependentSequence</font><br>return createEventProcessors(<b>new Sequence[0]</b>, handlers);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;" parent="1" vertex="1">
<mxGeometry x="761" y="300" width="440" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-82" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.999;exitY=0.721;exitDx=0;exitDy=0;entryX=0;entryY=0.25;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;exitPerimeter=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-61" target="fhfUE1HdMHP-IcORmnVZ-78" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-97" value="<font color="#007fff">这里多个事件处理器<br>用同一个 SequenceBarrier</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-82" vertex="1" connectable="0">
<mxGeometry x="-0.8473" y="2" relative="1" as="geometry">
<mxPoint x="47" y="-17" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-98" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.999;exitY=0.848;exitDx=0;exitDy=0;exitPerimeter=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-61" target="fhfUE1HdMHP-IcORmnVZ-96" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="1690" y="605" />
<mxPoint x="1690" y="630" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-61" value="<div>EventHandlerGroup&lt;T&gt; createEventProcessors(</div><div>&nbsp; &nbsp; &nbsp; &nbsp; final Sequence[] <b>barrierSequences</b>,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; final EventHandler&lt;? super T&gt;[] eventHandlers)&nbsp;<span style="background-color: initial;">{</span></div><div>&nbsp; &nbsp; ...</div><div><font color="#007fff">&nbsp; &nbsp; // 每个EventHandler 对应一个Sequence, 即<b>每个 EventHandler 都有自己的消费指针</b></font></div><div><font color="#007fff"><b>&nbsp; &nbsp; </b>// 也就是说每个 EventHandler 对事件的消费互不影响,这种消费方式就是<b>广播</b></font></div><div>&nbsp; &nbsp; final Sequence[] processorSequences = new Sequence[eventHandlers.length];</div><div><font color="#007fff">&nbsp; &nbsp; // 使用 barrierSequences&nbsp;创建序列屏障,如果为空数组就用RingBuffer的 cursor</font></div><div><span style="background-color: initial;">&nbsp; &nbsp; final SequenceBarrier </span><b style="background-color: initial;">barrier</b><span style="background-color: initial;"> = ringBuffer.</span><b style="background-color: initial;">newBarrier</b><span style="background-color: initial;">(barrierSequences);</span><br></div><div><b><font color="#007fff">&nbsp; &nbsp; // 遍历将每个 EventHandler 封装成 BatchEventProcessor 注册到 ConsumerRepository</font></b></div><div><b><font color="#007fff">&nbsp; &nbsp; // 同时绑定的多个消费者中的 SequenceBarrier 实例相同(即监听同一个 mutex)&nbsp;</font></b></div><div>&nbsp; &nbsp; for (int i = 0, eventHandlersLength = eventHandlers.length; i &lt; eventHandlersLength; i++)&nbsp;<span style="background-color: initial;">{</span></div><div>&nbsp; &nbsp; &nbsp; &nbsp; final EventHandler&lt;? super T&gt; eventHandler = eventHandlers[i];</div><div>&nbsp; &nbsp; &nbsp; &nbsp; final BatchEventProcessor&lt;T&gt; <b>batchEventProcessor</b> =</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new BatchEventProcessorBuilder().build(ringBuffer, <b>barrier</b>, eventHandler);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (exceptionHandler != null)&nbsp;<span style="background-color: initial;">{</span></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; batchEventProcessor.setExceptionHandler(exceptionHandler);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><b><span style=""><span style="">&nbsp; &nbsp; &nbsp; &nbsp; </span></span><font color="#007fff">//注册到消费者集合</font></b><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; <b>consumerRepository</b>.<b>add</b>(batchEventProcessor, eventHandler, <b>barrier</b>);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; <b>processorSequences</b>[i] = batchEventProcessor.getSequence();</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; <font color="#007fff">// 将&nbsp;processorSequences 加入到 RingBuffer 内部 Sequencer 的&nbsp;gatingSequences</font></div><div>&nbsp; &nbsp; <b>updateGatingSequencesForNextInChain</b>(barrierSequences, processorSequences);</div><div><br></div><div>&nbsp; &nbsp; return new <b>EventHandlerGroup</b>&lt;&gt;(this, consumerRepository, processorSequences);</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=3;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1241" y="300" width="440" height="360" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-65" value="<div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">disruptor = {Disruptor@860}</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp;<b>ringBuffer</b> = {RingBuffer@863}</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; p10 = 0</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; ...</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; p77 = 0</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; indexMask = 1023</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; <b>entries</b> = {Object[1088]@1038}&nbsp; &nbsp;<b>1088 = bufferSize + 2*BUFFER_PAD</b></font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; bufferSize = 1024</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; <b>sequencer</b> = {MultiProducerSequencer@1039}</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;gatingSequenceCache = {Sequence@1041} "-1"</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;<b>availableBuffer</b> = {int[1024]@1042} [-1, ...]</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;indexMask = 1023</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;indexShift = 10</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;bufferSize = 1024</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;waitStrategy = {BlockingWaitStrategy@1043}</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;cursor = {Sequence@1044} "-1"</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;<b>gatingSequences</b> = {Sequence[1]@1045}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; 0 = {Sequence@1009} "-1"</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; RingBufferPad.p10 = 0</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; ...</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; RingBufferPad.p77 = 0</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp;threadFactory = {DaemonThreadFactory@871} "INSTANCE"</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp;<b>consumerRepository</b> = {ConsumerRepository@865}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; eventProcessorInfoByEventHandler = {IdentityHashMap@1016}&nbsp; size = 1</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;{LongEventHandler@980}&nbsp; -&gt; {EventProcessorInfo@1054}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; eventProcessorInfoBySequence = {IdentityHashMap@1017}&nbsp; size = 1</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;{Sequence@1009} "-1" -&gt; {EventProcessorInfo@1054}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; <b>consumerInfos</b> = {ArrayList@1018}&nbsp; size = 1</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp;0 = {EventProcessorInfo@1054}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; eventprocessor = {BatchEventProcessor@991}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; <b>barrier</b> = {ProcessingSequenceBarrier@977}&nbsp;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; endOfChain = true</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp;started = {AtomicBoolean@872} "false"</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff">&nbsp;exceptionHandler = {ExceptionHandlerWrapper@864}&nbsp;</font></div>" style="text;html=1;align=left;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="2200" y="280" width="350" height="420" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-69" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-66" target="fhfUE1HdMHP-IcORmnVZ-68" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="1220" y="1030" />
<mxPoint x="1220" y="1150" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-100" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1.003;exitY=0.112;exitDx=0;exitDy=0;exitPerimeter=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-66" target="fhfUE1HdMHP-IcORmnVZ-99" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-101" value="..." style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-100" vertex="1" connectable="0">
<mxGeometry x="0.0445" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-66" value="<div>long sequence = ringBuffer.<b>next</b>(); <font color="#007fff">//获取 cursor 指针的下一可写索引,本质是CAS getAndAdd(1)</font>&nbsp;<br></div><div>try {</div><div><font color="#007fff">&nbsp; &nbsp; //获取 sequence 指针指向的元素节点</font></div><div>&nbsp; &nbsp; LongEvent event = ringBuffer.<b>get</b>(sequence);</div><div>&nbsp; &nbsp; event.<b>set</b>(bb.getLong(0));</div><div>} finally {</div><div>&nbsp; &nbsp; ringBuffer.<b>publish</b>(sequence);</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=7;" parent="1" vertex="1">
<mxGeometry x="761" y="980" width="440" height="100" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-73" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-68" target="fhfUE1HdMHP-IcORmnVZ-72" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-68" value="sequencer.publish(sequence);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=center;arcSize=7;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1241" y="1120" width="199" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-71" value="看源码过程中的疑问:<br><ul><li>Disruptor 生产消费逻辑<br><font color="#007fff">整体和其他地方见到的环形缓冲、滑动窗口操作类似,通过“双”指针读写。</font><br><font color="#007fff">事件生产:<br>生产者首先向 RingBuffer 中申请占位,多个生产者都通过 RingBuffer next() 占位,内部通过 CAS 抢占位置,如果缓冲写满就根据选择的等待策略处理;<br>生产者占位成功后,RingBuffer cursor 指针+1, 生产者向对应位置写数据成功后在&nbsp;availableBuffer 标记此位置数据可用;然后通过 Sequencer 中的 waitStrategy 唤醒对应的消费者组进行消费<br>事件消费:<br>消费者线程被唤醒后,结合 RingBuffer 当前可读位置和自己的消费进度,读取所有可以被消费的事件,遍历事件进行消费,消费完成后更新自己的消费进度。</font></li><li>RingBuffer 存满后怎么处理的?<br><font color="#007fff">修改测试将bufferSize设置为4, 消费者线程全部通过断点阻塞,然后生产者不断往RingBuffer 发布数据,当写指针比消费指针快一轮之后会根据阻塞策略选择不同方式处理,比如默认的BlockingWaitStrategy会<b>自旋等待</b>,直到有空位可写。<br>实际使用时需要根据业务量调整缓冲大小,尽量不要让缓冲占尽,否则后面写入会被阻塞。</font></li><li>为何要每个消费者都创建一个线程?不会造成线程数量太多么?<br><font color="#007fff">感觉是受等待策略的限制导致的,比如使用 BlockingWaitStrategy,使用的 Java 等待唤醒机制,就没法再使用线程池了。<br>不过这个问题确实存在。不知能否使用 JDK21的虚线程优化,TODO。</font></li><li>为何 Sequence value 前后要填充 56 字节?<br><font color="#007fff">填充字节是为了解决伪共享内存问题,但是为何要前后分别填充 56 字节?因为 value 本身是long类型占8个字节,缓冲行的长度是64字节,所以前后分别填充56字节可以与其他数据拉开足够的距离避免出现伪共享。<br>RingBuffer 中的填充方式呢?</font></li><li>多生产者与多消费者怎么保持 Sequence 指针线程安全的?<br><font color="#007fff">多个生产者发布数据到同一个 RingBuffer&nbsp; 实例前,需要先通过 RingBuffer next() 方法占位,这个方法内部基于CAS是线程安全的;成功占位后再在 RingBuffer 相应的位置写数据;<br>对于多消费者由于当前测试每个消费者维护自己的消费进度(即<b>广播</b>模式,官方称<b>组播</b>),根本不存在并发问题;<br>那么Disruptor支持分摊消费的模式么?<br>从当前版本事件处理器唯一的实现类&nbsp;BatchEventProcessor 看暂时不支持分摊消费,因为<br>其记录消费进度是靠私有成员变量 sequence,而这个值不会被外部修改,只有消费者本身消费事件后会更新,要实现分摊消费,这些消费者需要共享一个消费进度。</font></li></ul><div>Disruptor 实现高性能的原因 (并没有什么新的技术,都是之前碰到过的优化方式):</div><div><ul><li>事件存储预分配<br><font color="#007fff">存储事件数据的 RingBuffer 元素数组是预先分配好的,且数组元素对象不会被回收,避免了频繁的GC,且是固定大小的(一旦指定不能运行时修改)不存在扩容复制。<br>数组契合CPU的高速缓存空间局部性原理。<br></font></li><li>无锁设计<br><font color="#007fff">生产者发布事件前需要先通过 RingBuffer next() 占位,内部借助 CAS 实现,占位成功后再写数据。Disruptor 团队测试两个线程并发CAS性能比加锁性能大概快1个数量级。</font></li><li>消除伪共享问题,避免因为缓存失效重新读主内存引起的性能损失<br></li><li>数组长度为2的指数幂,可以用位运算计算索引<br></li><li>可选单生产者模式<br><font color="#007fff">如果业务中只需要一个生产者就在 Disruptor 中配置 ProducerType.SINGLE,这样生产过程没有并发问题连CAS也不需要事件发布性能最高。</font></li></ul></div><div><font color="#007fff"><br></font></div>" style="text;html=1;strokeColor=none;fillColor=none;align=left;verticalAlign=top;whiteSpace=wrap;rounded=0;" parent="1" vertex="1">
<mxGeometry x="40" y="420" width="600" height="720" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-79" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-72" target="fhfUE1HdMHP-IcORmnVZ-78" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-72" value="<div><font color="#007fff">//在 Sequencer <b>availableBuffer</b> 中标记此位置的元素可用</font></div><div><b>setAvailable</b>(sequence);</div><div><font color="#007fff">//唤醒所有等待此 mutex 的线程</font></div><div>waitStrategy.<b>signalAllWhenBlocking</b>();</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=7;" parent="1" vertex="1">
<mxGeometry x="1480" y="1120" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-74" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>WaitStrategy (I)</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><b>定义线程没有事件消费时的等待策略,有8种实现</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;"><br></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-1320" y="1560" width="400" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-77" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-76" target="fhfUE1HdMHP-IcORmnVZ-74" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-76" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>BlockingWaitStrategy</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><b>阻塞等待策略,借助 Java 对象的等待唤醒机制</b></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff"><br></font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 借助对象的 wait() notifyXxx() 实现</font></p><p style="margin: 0px 0px 0px 4px;">private final Object <b>mutex</b> = new Object();<br></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-1320" y="1880" width="400" height="120" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-89" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;entryX=0;entryY=0.25;entryDx=0;entryDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-78" target="fhfUE1HdMHP-IcORmnVZ-87" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="2420" y="850" as="targetPoint" />
<Array as="points">
<mxPoint x="2410" y="1150" />
<mxPoint x="2410" y="835" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-90" value="<font color="#007fff">唤醒 (BlockingWaitStrategy)</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="fhfUE1HdMHP-IcORmnVZ-89" vertex="1" connectable="0">
<mxGeometry x="-0.9342" y="-1" relative="1" as="geometry">
<mxPoint x="67" y="-1" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-78" value="<div>synchronized (mutex){</div><div>&nbsp; &nbsp; mutex.<b>notifyAll</b>();</div><div>}</div><div><font color="#007fff">从前面可以看到事件消费者绑定逻辑里面,<b>一起绑定的多个消费者监听同一个 mutext</b></font></div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=7;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="1721" y="1120" width="199" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-81" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-32" target="fhfUE1HdMHP-IcORmnVZ-80" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="1200" y="150" as="sourcePoint" />
<mxPoint x="1720" y="150" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-80" value="return createMultiProducer(factory, bufferSize, new <b>BlockingWaitStrategy</b>());<br><b><font color="#007fff">默认使用&nbsp; BlockingWaitStrategy 策略,即Java对象的 wait notify 机制</font></b>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;" parent="1" vertex="1">
<mxGeometry x="1241" y="120" width="440" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-86" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-83" target="fhfUE1HdMHP-IcORmnVZ-85" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-83" value="BatchEventProcessor#run()" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=center;arcSize=9;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1">
<mxGeometry x="1720" y="740" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-88" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-85" target="fhfUE1HdMHP-IcORmnVZ-87" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-85" value="<div>int witnessValue = running.compareAndExchange(IDLE, RUNNING);</div><div>if (witnessValue == IDLE) { // Successful CAS</div><div>&nbsp; &nbsp; sequenceBarrier.<b>clearAlert</b>(); <font color="#007fff">// 将 BatchEventProcess alerted 设置为 false</font></div><div>&nbsp; &nbsp; <b>notifyStart</b>(); <font color="#007fff">//回调事件处理器启动钩子,做些处理器初始化</font></div><div>&nbsp; &nbsp; try {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (running.get() == RUNNING)&nbsp;<span style="background-color: initial;">{</span></div><div><span style="background-color: initial;"><font color="#007fff"><span style="white-space: pre;">	</span>&nbsp; &nbsp; // 内部死循环,等待下个索引的事件可用,有事件会被唤醒<br></font></span></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <b>processEvents</b>();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; } finally {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; <b>notifyShutdown</b>(); <font color="#007fff">// 回调关闭钩子,后事处理</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; running.set(IDLE);</div><div>&nbsp; &nbsp; }</div><div>} else {</div><div>&nbsp; &nbsp; if (witnessValue == RUNNING) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; throw new IllegalStateException("Thread is already running");</div><div>&nbsp; &nbsp; } else {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; earlyExit();</div><div>&nbsp; &nbsp; }</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=3;" parent="1" vertex="1">
<mxGeometry x="1960" y="740" width="440" height="240" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-86" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="fhfUE1HdMHP-IcORmnVZ-87" target="ZpwxWgh1Q2fkPOIGbowT-85">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-87" value="<div style="font-size: 10px;"><font style="font-size: 10px;">private void <b style="">processEvents</b>() {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; T event = null;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; long nextSequence = sequence.get() + 1L;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><br style=""></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; <b>while</b> (true) {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; final long <b>startOfBatchSequence</b> = nextSequence;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; try {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><span style="">&nbsp;&nbsp;&nbsp;&nbsp;</span><span style="">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</span><font style="font-size: 10px;" color="#007fff">// 等待被唤醒,被<b style="">唤醒后可能缓冲中已经填充了多个事件(比如被事件1唤醒但是读</b><b>dependentSequence前又有事件2、事件3插入了环形缓冲</b><b style="">),返回当前可用的最大的 Sequence 的索引值</b></font><br></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final long availableSequence = sequenceBarrier.<b>waitFor</b>(nextSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff"><b>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // batchLimitOffset 默认是 Integer.MAX_VALUE</b></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (nextSequence &lt;= endOfBatchSequence) {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff"><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //调用自定义 EventHandler 的批处理前置处理</font><br></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; eventHandler.<b>onBatchStart</b>(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><font style="font-size: 10px;" color="#007fff"><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp; </span></span>// 轮询取这些事件的事件对象,回调自定义 EventHandler onEvent 事件处理器方法</font><br style=""></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (nextSequence &lt;= endOfBatchSequence) {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; event = <b>dataProvider</b>.get(nextSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; eventHandler.<b>onEvent</b>(event, nextSequence, nextSequence == endOfBatchSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextSequence++;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><br style=""></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; retriesAttempted = 0;</font></div><div style="font-size: 10px;"><b><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //消费者更新自己的消费进度</font></b></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sequence.set(endOfBatchSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;<span style="background-color: initial;">catch (final <b>RewindableException</b> e) { <font style="font-size: 10px;" color="#007fff">// RewindableEventHandler 可以抛出 RewindableException</font></span></font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><span style="background-color: initial;"><font style="font-size: 10px;" color="#007fff">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // RewindHandler 处理 RewindableException REWIND: 返回倒带的索引 THROW: 抛出异常</font></span></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextSequence = rewindHandler.<b>attemptRewindGetNextSequence</b>(e, <b>startOfBatchSequence</b>);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;<span style="background-color: initial;">catch (final <b>TimeoutException</b> e) {</span></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <b>notifyTimeout</b>(sequence.get());</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;<span style="background-color: initial;">catch (final <b>AlertException</b> ex) {</span></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (running.get() != RUNNING) {</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;<span style="background-color: initial;">catch (final Throwable ex) {</span></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; handleEventException(ex, nextSequence, event);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sequence.set(nextSequence);</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextSequence++;</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; &nbsp; &nbsp; }</font></div><div style="font-size: 10px;"><font style="font-size: 10px;">&nbsp; &nbsp; }<font style="font-size: 10px;" color="#007fff"> //while end</font></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">}</font></div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=9;align=left;arcSize=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="2440" y="700" width="520" height="540" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-92" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Cursored, EventSequencer&lt;E&gt;, EventSink&lt;E&gt;</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px;">Cursored 接口:</p><p style="margin: 0px 0px 0px 4px;">long getCursor();<br></p><p style="margin: 0px 0px 0px 4px;"><br></p><p style="margin: 0px 0px 0px 4px;">EventSequencer&lt;E&gt; 接口继承&nbsp;DataProvider&lt;T&gt;, Sequenced<br></p><p style="margin: 0px 0px 0px 4px;">T get(long sequence);<br></p><p style="margin: 0px 0px 0px 4px;">long next();<br></p><p style="margin: 0px 0px 0px 4px;">void publish(long sequence);<br></p><p style="margin: 0px 0px 0px 4px;">...</p><p style="margin: 0px 0px 0px 4px;"><br></p><p style="margin: 0px 0px 0px 4px;">EventSink&lt;E&gt; 接口:<br></p><p style="margin: 0px 0px 0px 4px;">定义了一批事件发布方法</p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="-440" y="280" width="400" height="200" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-94" value="<div>super(bufferSize, waitStrategy);</div><div>availableBuffer = new int[bufferSize];</div><div>Arrays.fill(availableBuffer, -1);</div><div>indexMask = bufferSize - 1;</div><div>indexShift = Util.log2(bufferSize);</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;" parent="1" vertex="1">
<mxGeometry x="2200" y="120" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-96" value="<div>ringBuffer.<b>addGatingSequences</b>(processorSequences);</div><div>for (final Sequence barrierSequence : barrierSequences) {</div><div>&nbsp; &nbsp; ringBuffer.<b>removeGatingSequence</b>(barrierSequence);</div><div>}</div><div>consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="1720" y="600" width="440" height="60" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-99" value="<div>long current = cursor.getAndAdd(n);</div><div>long <b>nextSequence</b> = current + n;<span style="white-space: pre;">	</span><span style="white-space: pre;">	</span><font color="#007fff">//下一个写指针</font></div><div><font color="#007fff">//</font></div><div>long wrapPoint = nextSequence - bufferSize;</div><div>long cachedGatingSequence = gatingSequenceCache.get();</div><div><b><font color="#007fff">//&nbsp;写指针比最慢的消费者消费指针快了一轮 || 消费指针比当前已写指针还大?什么时候会出现这种情况?</font></b></div><div>if (<b>wrapPoint &gt; cachedGatingSequence || cachedGatingSequence &gt; current</b>)</div><div>{</div><div>&nbsp; &nbsp; long gatingSequence;</div><div>&nbsp; &nbsp; <font color="#007fff">//&nbsp; <b>自旋等待</b></font></div><div>&nbsp; &nbsp; while (wrapPoint &gt; (gatingSequence = Util.getMinimumSequence(gatingSequences, current)))</div><div>&nbsp; &nbsp; {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; LockSupport.<b>parkNanos</b>(1L); // TODO, should we spin based on the wait strategy?</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; gatingSequenceCache.<b>set</b>(gatingSequence);</div><div>}</div><div><br></div><div style="">return nextSequence;</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=3;" parent="1" vertex="1">
<mxGeometry x="1240" y="840" width="440" height="240" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-102" value="MultiProducerSequencer" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="1380" y="810" width="160" height="30" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-105" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=1;" parent="1" source="fhfUE1HdMHP-IcORmnVZ-104" target="fhfUE1HdMHP-IcORmnVZ-74" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="fhfUE1HdMHP-IcORmnVZ-104" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>LiteTimeoutBlockingWaitStrategy</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><font color="#007fff"><b>&nbsp;基于CAS + 带超时的等待唤醒机制<br></b></font><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="background-color: initial;"><br></span></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><span style="background-color: initial;">private final Object mutex = new Object();</span><br></p><p style="margin: 0px 0px 0px 4px;">private final AtomicBoolean signalNeeded = new AtomicBoolean(false);</p><p style="margin: 0px 0px 0px 4px;">private final long timeoutInNanos;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="-880" y="1880" width="400" height="120" as="geometry" />
</mxCell>
<mxCell id="JMJFPlz6VKTEmIA0QOwe-1" value="Text" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;" parent="1" vertex="1">
<mxGeometry x="480" y="170" width="60" height="30" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-3" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-1" target="ZpwxWgh1Q2fkPOIGbowT-2">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-4" value="..." style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-3">
<mxGeometry x="0.0118" y="-3" relative="1" as="geometry">
<mxPoint y="-3" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-88" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-1" target="ZpwxWgh1Q2fkPOIGbowT-11">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="250" y="1270" />
<mxPoint x="250" y="2020" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-89" value="..." style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-88">
<mxGeometry x="0.8785" y="1" relative="1" as="geometry">
<mxPoint x="9" y="18" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-1" value="<b>多消费者竞争模式<br><font color="#007fff">3.x 版本, 大体和上面一致,只是有些小差异</font><br></b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="40" y="1240" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-6" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-2" target="ZpwxWgh1Q2fkPOIGbowT-7">
<mxGeometry relative="1" as="geometry">
<mxPoint x="520" y="1270" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-2" value="<b>事件处理器<br>WorkProcessor</b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="280" y="1240" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-10" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-7" target="ZpwxWgh1Q2fkPOIGbowT-9">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-7" value="WorkProcessor#run()<br><font color="#007fff">每个事件处理器同样单线程启动</font>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;" vertex="1" parent="1">
<mxGeometry x="520" y="1240" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-9" value="<div style="font-size: 11px;">if (!running.compareAndSet(false, true)) {</div><div style="font-size: 11px;">&nbsp; &nbsp; throw new IllegalStateException("Thread is already running");</div><div style="font-size: 11px;">}</div><div style="font-size: 11px;">sequenceBarrier.clearAlert();</div><div style="font-size: 11px;">notifyStart();</div><div style="font-size: 11px;"><br style="font-size: 11px;"></div><div style="font-size: 11px;">boolean processedSequence = true;</div><div style="font-size: 11px;">long cachedAvailableSequence = Long.MIN_VALUE;</div><div style="font-size: 11px;"><font color="#007fff" style="font-size: 11px;">// sequence 是 WorkProcessor 初始处理进度,nextSequence 是分配给此处理器的待处理的RingBuffer元素索引,workSequence 的值为何要是所有处理器sequence最大值+1, 是因为初始的几个元素已经分配好了消费者,比如3个消费者,索引 0 1 2 的元素给谁处理已经定好了</font></div><div style="font-size: 11px;">long nextSequence = sequence.get();</div><div style="font-size: 11px;">T event = null;</div><div style="font-size: 11px;"><b style="font-size: 11px;">while</b> (true) {</div><div style="font-size: 11px;">&nbsp; &nbsp; try {</div><div style="font-size: 11px;"><font color="#007fff" style="font-size: 11px;"><b>&nbsp; &nbsp; &nbsp; &nbsp; // 处理器处理完一个事件再去竞争下一个未分配的元素</b></font></div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; <b style="font-size: 11px;">if</b> (processedSequence) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processedSequence = false;</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; do {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; nextSequence = <b style="font-size: 11px;">workSequence</b>.get() + 1L;</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sequence.set(nextSequence - 1L);</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } <font color="#007fff" style="font-size: 11px;">// 通过CAS竞争下一个待分配的事件, 返回true 的处理器获取对nextSequence 元素的处理权</font></div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (!workSequence.<b style="font-size: 11px;">compareAndSet</b>(nextSequence - 1L, nextSequence));</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; }</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div style="font-size: 11px;"><b><font color="#007fff">&nbsp; &nbsp; &nbsp; &nbsp; // cachedAvaliableSequence 是</font><font color="#007fff">处理器线程被唤醒后可处理最大元素索引</font></b></div><div style=""><font style="" color="#007fff"><b>&nbsp; &nbsp; &nbsp; &nbsp; // nextSequence 是分配给此处理器的待处理的RingBuffer元素索引</b><br></font></div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; <b style="font-size: 11px;">if</b> (cachedAvailableSequence &gt;= <b>nextSequence</b>) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; event = ringBuffer.get(nextSequence);</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; workHandler.<b>onEvent</b>(event); <font color="#007fff">// 回调 onEvent() 处理事件</font></div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processedSequence = true;</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; } else {</div><div style="font-size: 11px;"><font color="#007fff"><b>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 没有更多可处理的事件就继续等待</b></font></div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cachedAvailableSequence = sequenceBarrier.<b>waitFor</b>(nextSequence);</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; }</div><div style="font-size: 11px;">&nbsp; &nbsp; } catch (final TimeoutException e) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; notifyTimeout(sequence.get());</div><div style="font-size: 11px;">&nbsp; &nbsp; } catch (final AlertException ex) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; if (!running.get()) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; }</div><div style="font-size: 11px;">&nbsp; &nbsp; } catch (final Throwable ex) {</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; // handle, mark as processed, unless the exception handler threw an exception</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; exceptionHandler.handleEventException(ex, nextSequence, event);</div><div style="font-size: 11px;">&nbsp; &nbsp; &nbsp; &nbsp; processedSequence = true;</div><div style="font-size: 11px;">&nbsp; &nbsp; }</div><div style="font-size: 11px;">}</div><div style="font-size: 11px;"><br style="font-size: 11px;"></div><div style="font-size: 11px;">notifyShutdown();</div><div style="font-size: 11px;">running.set(false);</div>" style="rounded=1;whiteSpace=wrap;html=1;align=left;arcSize=2;fontSize=11;" vertex="1" parent="1">
<mxGeometry x="760" y="1240" width="520" height="680" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-13" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-14" target="ZpwxWgh1Q2fkPOIGbowT-12">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-11" value="<b>唤醒处理器线程</b><br><b>WaitStrategy</b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="280" y="1990" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-16" value="<font color="#007fff">唤醒</font>" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.662;entryDx=0;entryDy=0;entryPerimeter=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-12" target="ZpwxWgh1Q2fkPOIGbowT-9">
<mxGeometry x="-0.8666" relative="1" as="geometry">
<Array as="points">
<mxPoint x="1300" y="2020" />
<mxPoint x="1300" y="1690" />
</Array>
<mxPoint x="1" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-12" value="<div>lock.lock();</div><div>try {</div><div>&nbsp; &nbsp; processorNotifyCondition.<b>signalAll</b>();</div><div>} finally {</div><div>&nbsp; &nbsp; lock.unlock();</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;align=left;" vertex="1" parent="1">
<mxGeometry x="760" y="1960" width="440" height="120" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-15" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-11" target="ZpwxWgh1Q2fkPOIGbowT-14">
<mxGeometry relative="1" as="geometry">
<mxPoint x="480" y="2020" as="sourcePoint" />
<mxPoint x="760" y="2020" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-14" value="<b>BlockingWaitStrategy</b>#<br>signalAllWhenBlocking()<br><font color="#007fff">举一个例子</font>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="520" y="1990" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-30" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-18" target="ZpwxWgh1Q2fkPOIGbowT-19">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-60" value="分配" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];fontColor=#007FFF;" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-30">
<mxGeometry x="-0.176" y="-1" relative="1" as="geometry">
<mxPoint x="41" y="-1" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-18" value="0" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1400" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-35" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-19" target="ZpwxWgh1Q2fkPOIGbowT-33">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-19" value="消费者A<br>初始 squence=-1" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="1680.5" y="1400" width="120" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-34" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-20" target="ZpwxWgh1Q2fkPOIGbowT-33">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-20" value="消费者B<br>初始 squence=0" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="1680" y="1520" width="120" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-36" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=1;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-21" target="ZpwxWgh1Q2fkPOIGbowT-33">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-21" value="消费者C<br>初始 squence=1" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="1680" y="1640" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-31" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-22" target="ZpwxWgh1Q2fkPOIGbowT-20">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-61" value="分配" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];fontColor=#007FFF;rotation=0;" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-31">
<mxGeometry x="-0.2271" y="2" relative="1" as="geometry">
<mxPoint x="43" y="31" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-22" value="1" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1440" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-32" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-23" target="ZpwxWgh1Q2fkPOIGbowT-21">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="1640" y="1570" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-62" value="分配" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];fontColor=#007FFF;" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-32">
<mxGeometry x="-0.3076" relative="1" as="geometry">
<mxPoint x="44" y="71" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-23" value="2" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1480" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-70" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;dashed=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-24" target="ZpwxWgh1Q2fkPOIGbowT-19">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-75" value="事件<br>可能分配<br>给ABC<br>任一一个" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];fontColor=#007FFF;" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-70">
<mxGeometry x="0.1163" relative="1" as="geometry">
<mxPoint x="-46" y="97" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-71" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;dashed=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-24" target="ZpwxWgh1Q2fkPOIGbowT-20">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-72" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;dashed=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-24" target="ZpwxWgh1Q2fkPOIGbowT-21">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-24" value="3" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1520" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-25" value="4" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1560" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-26" value="0" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="1445" y="1405" width="30" height="30" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-27" value="1" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="1445" y="1445" width="30" height="30" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-29" value="5" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1600" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-76" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-33" target="ZpwxWgh1Q2fkPOIGbowT-37">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-77" value="Y" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-76">
<mxGeometry x="-0.1647" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-78" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-33" target="ZpwxWgh1Q2fkPOIGbowT-40">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-33" value="之前分配的元素事件<br>已处理完毕?" style="rhombus;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1880" y="1510" width="160" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-57" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-37" target="ZpwxWgh1Q2fkPOIGbowT-24">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="2260" y="1540" />
<mxPoint x="2260" y="1720" />
<mxPoint x="1460" y="1720" />
<mxPoint x="1460" y="1540" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-37" value="CAS 竞争下一个可处理待分配元素<br><font color="#007fff">CAS 返回true的消费者线程获取元素处理权</font>" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="2080" y="1510" width="160" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-80" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-40" target="ZpwxWgh1Q2fkPOIGbowT-79">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-81" value="Y" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-80">
<mxGeometry x="-0.0471" y="-3" relative="1" as="geometry">
<mxPoint y="-3" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-82" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-40" target="ZpwxWgh1Q2fkPOIGbowT-44">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-83" value="N" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-82">
<mxGeometry x="-0.9289" y="2" relative="1" as="geometry">
<mxPoint x="12" y="2" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-40" value="是否还有可处理<br>已分配元素事件?" style="rhombus;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1880" y="1760" width="160" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-47" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=1;entryY=0.25;entryDx=0;entryDy=0;dashed=1;fillColor=#fa6800;strokeColor=#C73500;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-44" target="ZpwxWgh1Q2fkPOIGbowT-19">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-63" value="<font color="#007fff">消费者睡眠</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-47">
<mxGeometry x="-0.8268" y="-2" relative="1" as="geometry">
<mxPoint x="-2" y="16" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-48" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.75;entryY=0;entryDx=0;entryDy=0;dashed=1;fillColor=#fa6800;strokeColor=#C73500;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-44" target="ZpwxWgh1Q2fkPOIGbowT-20">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-64" value="<font color="#007fff">消费者睡眠</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-48">
<mxGeometry x="-0.8332" y="-2" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-49" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.25;entryDx=0;entryDy=0;dashed=1;fillColor=#fa6800;strokeColor=#C73500;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-44" target="ZpwxWgh1Q2fkPOIGbowT-21">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-65" value="<font color="#007fff">消费者睡眠</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-49">
<mxGeometry x="-0.9156" y="-1" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-44" value="消费者睡眠" style="whiteSpace=wrap;html=1;rounded=1;" vertex="1" parent="1">
<mxGeometry x="2281" y="1510" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-52" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-50" target="ZpwxWgh1Q2fkPOIGbowT-51">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-50" value="生产者" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="1679.5" y="1884.12" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-54" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-51" target="ZpwxWgh1Q2fkPOIGbowT-53">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-59" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-51" target="ZpwxWgh1Q2fkPOIGbowT-58">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="1980" y="1914" />
<mxPoint x="1980" y="1870" />
<mxPoint x="1440" y="1870" />
<mxPoint x="1440" y="1660" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-66" value="<font color="#007fff">写元素值</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-59">
<mxGeometry x="-0.8797" y="-1" relative="1" as="geometry">
<mxPoint x="12" y="1" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-51" value="写RingBuffer" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1839" y="1884.12" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-56" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-53" target="ZpwxWgh1Q2fkPOIGbowT-55">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-53" value="发布事件" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1839" y="1964.12" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-69" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;dashed=1;fillColor=#60a917;strokeColor=#2D7600;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-55" target="ZpwxWgh1Q2fkPOIGbowT-21">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-73" value="<font color="#007fff">&nbsp;消费者唤醒</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="ZpwxWgh1Q2fkPOIGbowT-69">
<mxGeometry x="-0.9278" y="4" relative="1" as="geometry">
<mxPoint x="34" as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-55" value="唤醒所有消费者" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="2280" y="1964.12" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-58" value="6" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1480" y="1640" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-74" value="RingBuffer" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="1480" y="1370" width="80" height="30" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-79" value="事件处理" style="rounded=1;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="2080" y="1760" width="160" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-84" value="dependentSequence 主要用于实现<b>消费者的顺序消费<br>菱形消费是由顺序消费和广播消费组成<br></b>" style="text;html=1;align=left;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="-470" y="1600" width="310" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-85" value="<div style=""><div style="">public long <b>waitFor</b>(final long sequence, final Sequence cursorSequence, final Sequence dependentSequence, <span style="">&nbsp;</span>final <span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span>SequenceBarrier barrier)&nbsp;<span style="background-color: initial;">throws AlertException, InterruptedException {</span></div><div style="">&nbsp; &nbsp; long availableSequence;</div><div style="">&nbsp; &nbsp; if (cursorSequence.get() &lt; sequence) {</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; synchronized (mutex) {</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (cursorSequence.get() &lt; sequence) {</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; barrier.checkAlert();</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mutex.wait();</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; }</div><div style="">&nbsp; &nbsp; }</div><div style=""><span style=""><font color="#007fff"><b><span style=""><span style="">&nbsp;&nbsp;&nbsp;&nbsp;</span></span>// 被唤醒后还需要当前消费者的消费进度(sequence)不大于依赖进度(dependentSequence)才能跳出等待去处理事件,否则会自旋等待</b></font></span></div><div style="">&nbsp; &nbsp; while ((availableSequence = <b>dependentSequence</b>.get()) &lt; <b>sequence</b>) { <font color="#007fff">//如果依赖多个进度dependentSequence.get()返回最小的,变相实现等待所有依赖的进度</font></div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; barrier.checkAlert();</div><div style="">&nbsp; &nbsp; &nbsp; &nbsp; Thread.onSpinWait();</div><div style="">&nbsp; &nbsp; }</div><div style=""><br></div><div style="">&nbsp; &nbsp; return availableSequence;</div><div style="">}</div></div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=9;align=left;arcSize=4;" vertex="1" parent="1">
<mxGeometry x="3000" y="860" width="520" height="220" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-87" value="<b>多消费者顺序消费模式<br></b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="40" y="2120" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-90" value="<div><font color="#007fff">原理是<b>将上一个消费者的消费进度作为下一个消费者的依赖进度</b>,在等待策略 waitFor() 方法中则要求</font></div><div><span style="background-color: initial;"><font color="#007fff"><b>当前消费者消费进度不大于依赖进度才能继续执行处理否则会自旋等待</b>依赖进度满足条件</font></span><br></div><div><span style="background-color: initial;"><font color="#007fff">原理较简单不展示详细流程了</font></span></div>" style="text;html=1;align=left;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="250" y="2120" width="560" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-92" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>EventHandlerGroup&lt;T&gt;</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><b><font color="#007fff">消费者分组,用于实现消费者编排</font></b></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px;">private final Disruptor&lt;T&gt; disruptor;</p><p style="margin: 0px 0px 0px 4px;">private final ConsumerRepository consumerRepository;</p><p style="margin: 0px 0px 0px 4px;"><b><font color="#007fff">// 消费者组中所有消费者的消费进度,用于实现消费者编排,由于有些消费者可能依赖多个消费者,所以设置成数组;比如菱形消费</font></b></p><p style="margin: 0px 0px 0px 4px;">private final Sequence[] sequences;</p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="-440" y="1280" width="400" height="200" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-93" value="<b>多消费者菱形消费模式<br></b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="40" y="2240" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-94" value="<div><font color="#007fff"><b>属于广播和顺序消费的组合,</b></font></div><div><font color="#007fff">理解了前面的模式原理,这个就很简单了</font></div><div><font color="#007fff">不过貌似<b>不支持复杂的编排</b>,比如右边这种就支持不了</font></div>" style="text;html=1;align=left;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="250" y="2240" width="310" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-101" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-95" target="ZpwxWgh1Q2fkPOIGbowT-96">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-102" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-95" target="ZpwxWgh1Q2fkPOIGbowT-97">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-103" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-95" target="ZpwxWgh1Q2fkPOIGbowT-98">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-95" value="事件" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="560" y="2260" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-104" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-96" target="ZpwxWgh1Q2fkPOIGbowT-99">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-96" value="消费者11" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="680" y="2200" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-105" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-97" target="ZpwxWgh1Q2fkPOIGbowT-99">
<mxGeometry relative="1" as="geometry">
<mxPoint x="800" y="2240" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-97" value="消费者12" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="680" y="2260" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-106" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-98" target="ZpwxWgh1Q2fkPOIGbowT-100">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-98" value="事件13" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="680" y="2320" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-108" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-99" target="ZpwxWgh1Q2fkPOIGbowT-107">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-112" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-99" target="ZpwxWgh1Q2fkPOIGbowT-111">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-99" value="消费者21" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="800" y="2230" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-109" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-100" target="ZpwxWgh1Q2fkPOIGbowT-110">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="1000" y="2340" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-100" value="消费者22" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="800" y="2320" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-113" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-107" target="ZpwxWgh1Q2fkPOIGbowT-110">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-107" value="消费者31" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="920" y="2200" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-110" value="消费者41" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="1040" y="2260" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-114" style="rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="ZpwxWgh1Q2fkPOIGbowT-111" target="ZpwxWgh1Q2fkPOIGbowT-110">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-111" value="消费者32" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="920" y="2260" width="80" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-115" value="<b>多消费者链式并行消费模式<br></b>" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="40" y="2360" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-116" value="<div><font color="#007fff"><b>就是设置多组顺序消费者</b></font></div><div><font color="#007fff">理解了前面的模式原理,这个也很简单</font></div>" style="text;html=1;align=left;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="250" y="2370" width="230" height="40" as="geometry" />
</mxCell>
<mxCell id="ZpwxWgh1Q2fkPOIGbowT-117" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>TryRewindHandler</b><br style="font-size: 11px;"></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><br></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff">// 批量倒带策略,三种实现 <b>SimpleBatchRewindStrategy </b>出现 RewindableException&nbsp;</font><font color="#007fff">总是倒带,<b>EventuallyGiveUpBatchRewindStrategy</b> 先尝试倒带,超过一定重试次数还是异常就放弃倒带抛异常,</font><span style="background-color: initial;"><font color="#007fff"><b>NanosecondPauseBatchRewindStrategy</b> 先等待一段时间再倒带</font><br>private final BatchRewindStrategy </span><b style="background-color: initial;">batchRewindStrategy</b><span style="background-color: initial;">;</span></p><hr style="font-size: 11px;"><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">重要方法:</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">// 根据倒带策略返回某个索引重新处理(REWIND)或者直接抛出异常(THROW</font><font color="#007fff" style="font-size: 11px;">)&nbsp;</font></p><p style="margin: 0px 0px 0px 4px; font-size: 11px;"><font color="#007fff" style="font-size: 11px;">// <b>startOfBatchSequence</b> 是每批(每次被唤醒)事件的最小索引</font></p><p style="margin: 0px 0px 0px 4px;">public long <b>attemptRewindGetNextSequence</b>(final RewindableException e, final long startOfBatchSequence) throws RewindableException<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=11;fontFamily=Helvetica;html=1;whiteSpace=wrap;" vertex="1" parent="1">
<mxGeometry x="-2200" y="1680" width="400" height="200" as="geometry" />
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>