Aeron 是什么?
Aeron 是一个开源高性能消息传输机制(单向),支持高效可靠的 UDP 单播、UDP 多播和 IPC 消息传输。
1. 架构
Aeron 主要由三部分组成:Media Driver、Publications 和 Subscriptions
1.1 Media Driver
Media Driver 负责管理 publications 和 subscriptions 所使用的用来发送和接收数据的 Media(UDP 或 IPC)。
各组件作用:
-
Driver ConductorDriver Conductor 负责接收来自 Aeron 客户端的 publishers 和 subscribers 的指令,并编排 Media Driver 的操作。另外还负责域名解析任务
-
ReceiverReceiver 管理所有从 media 中接收到的数据,数据传输轮询器接收 UDP 数据,并使用 Java NIO 与操作系统的网络堆栈进行交互。除了从 media 接收数据外,接收器还管理接收到的 images,根据需要发送 NAK 和状态消息。
-
SenderSender 通过 media 管理数据的传输
-
Client ConductorClient Conductor 负责与 Driver Conductor 进行沟通
Media Driver 会在本地管理一个目录,在生产环境中这个目录应该放到 /dev/shm(共享内存)上。
Media Driver 线程模型:
-
DEDICATED(默认模式) 在专用线程模式中(ThreadingMode.DEDICATED)Media Driver 将使用 3 个线程,每个线程都有特定的空闲策略:Sender 使用 senderIdleStrategy;Receiver 使用 receiverIdleStrategy;driver-conductor 使用 conductorIdleStrategy
-
Shared Network 以共享网络模式(ThreadingMode.SHARED_NETWORK)运行将线程数减少到两个 Sender 和 Receiver 在一个复合代理中,使用 sharedNetworkIdleStrategydriver-conductor 使用 conductorIdleStrategy
-
Shared 以共享模式运行(ThreadingMode.SHARED),线程数减少到 1,使用 sharedIdleStrategy(适合资源较少的环境)
-
Invoker
1.2 Publications
Publications 是应用程序用来发送数据的主要 Aeron 对象,发送数据有两种方法:offer 和 tryClaim,这两种方法都是非阻塞的,具体有什么不同,在下面介绍。
Publications 有两种类型:
-
ConcurrentPublication 使用 Aeron.addPublication 方法创建 Publication 时,返回的就是 ConcurrentPublication,它是线程安全的,可以由多个 sender 共同使用
-
ExclusivePublicationExclusivePublication 是一个独占的 Publication,通过 Aeron.addExclusivePublication 来创建。ExclusivePublication 是线程不安全的,但是由于减少了很多内部锁,所以吞吐量会提高一些,如果想要使用一个初始 position 来重播的话,必须使用 ExclusivePublication 来实现。
Offer 使用 offer 方法发送数据时,应用程序可以提供一个 Byte Buffer 来发送给 Publication subscribers。offer 将数据从 Byte Buffer 复制到本地 Log Buffer 时,会发生数据拷贝,而使用 tryClaim 方法可以避免数据拷贝。
注意事项:
-
单条数据的大小不能超过 min(term buffer size / 8, 16M),term buffer size 默认是 16M;
-
单条数据大小超过 maxPayloadLength()(默认 1376 字节)时,Aeron 会自动对数据进行分段。(在 Subscriptions 端,Aeron 并不会自动组合数据,需要使用 FragmentAssembler 来实现)
TryClaimTryClaim 必须适合性能敏感的环境,它允许开发人员提前声明 Log Buffer 的一部分,然后直接写入数据到申请到的这段区域。这样就不需要在发送数据时复制数据了。TryClaim 写入数据后需要调用 commit 方法来完成。
注意事项:
-
消息的最大长度不能超过 maxPayloadLength()(默认 1376 字节);
-
tryClaim 不支持数据分段
1.3. Subscriptions
Subscriptions 用来接收消息,通过 addSubscription 来创建一个针对于指定 channel 和 stream 的 subscription,然后还需要提供一个实现了 FragmentHandler 接口的对象来处理接收到的消息。
final Subscription subscription = aeron.addSubscription(channel, streamId);
通过调用 subscription.poll(fragmentHandler, 1) 来接收数据。
2. 其他核心概念
2.1. Channels, Streams and Sessions
创建 Publication & Subscription 组合时,必须指定 Channel 和 Stream ID,Channel 就很像 TCP/IP 的配置:IP 地址 + 端口号;但是 Aeron 可以在相同的 Channel 中发送不同的数据流–通过指定 Stream ID。sessionId 是一个随机的整数,可以是负值。
2.2. Log Buffers
Log Buffers 是一个内存映射文件,由四部分构成:
-
三个大小相同的部分:term,每个 term 都有自己的 term id, term 用来保存 header 和消息数据
-
元数据部分,位于文件末尾
term 有三种逻辑状态:
-
clean term 尚未写入数据
-
active term 正在写入数据
-
dirty 保存不处于活动状态但暂时可用于重新传输的数据
Term Buffer Lengthsterm 大小设置(默认 16M):系统属性 aeron.term.buffer.length 或 aeron.ipc.term.buffer.length 也可以在 Media Driver context 中用 publicationTermBufferLength 和 ipcTermBufferLength 配置可以针对单个 publication 配置大小:aeron:ipc?term-length=128k 或 aeron:udp?endpoint=192.168.0.1:12345|term-length=2m
条件限制:
-
term buffer 的长度最小限制为 65536 字节(64k)
-
term buffer 的长度最大限制为 1,073,741,824 字节(1G)
-
大小必须是 2 的幂
-
最大消息长度是 term buffer length/8 和 16M 中的较小值
-
如何发送超过 8kb 的消息?
-
term buffer length 直接影响着最大消息长度,设置越大就可以发送更大的消息
-
tryClaim 不支持任何超出 maxPayloadLength(默认为 1376 字节)的内容
-
注意 /dev/shm 的大小,需要进行相应的配置
3. Demo
Publisher:
Subscriber:
-
MediaDriver 可以作为一个独立程序启动,也可以嵌入到应用程序中启动,这里是通过调用 MediaDriver.launch 方法在应用程序中启动。
-
定义 channle 和 streamId,这里是通过 IPC 来传输消息。
-
发送消息:发送端创建 publication,使用 tryClaim 方法发送消息
-
接收消息:接收端创建 subscription,实现了 FragmentHandler 接口来处理接收到的数据,通过调用 subscription.poll 来接收数据。
参考资料
https://github.com/real-logic/aeron/wiki
本文文字及图片出自 InfoQ
你也许感兴趣的:
- 【外评】电脑从哪里获取时间?
- 【外评】为什么 Stack Overflow 正在消失?
- Android 全力押注 Rust,Linux 却在原地踏步?谷歌:用 Rust 重写固件太简单了!
- 【外评】哪些开源项目被广泛使用,但仅由少数人维护?
- 【外评】好的重构与不好的重构
- C 语言老将从中作梗,Rust for Linux 项目内讧升级!核心维护者愤然离职:不受尊重、热情被消耗光
- 【外评】代码审查反模式
- 我受够了维护 AI 生成的代码
- 【外评】Linux 桌面市场份额升至 4.45
- 【外评】作为全栈开发人员如何跟上 AI/ML 的发展?
你对本文的反应是: