本文共 5028 字,大约阅读时间需要 16 分钟。
日志无处不在,它作为记录世间万物变化的载体,在运维、研发、运营、安全、BI、审计等领域有着广泛的应用场景。
是日志类数据的一站式服务平台,其核心组件 LogHub 凭借着高吞吐、低延迟、可自动伸缩等特性,逐渐成为大数据处理领域特别是实时数据处理场景下的基础设施。那些运行在 、、 等大数据计算引擎中的任务往往会将数据处理结果或中间结果实时写入 LogHub,下游系统基于 LogHub 中的数据提供查询分析、监控告警、机器学习、迭代计算等能力。下图展示了面向 LogHub 的大数据处理系统架构图。
要让整个系统稳定地运行,提供便捷高效的数据写入手段是前提。直接使用 API 或 SDK 往往无法满足大数据场景下对数据写入能力的要求,在这样的背景下 应运而生。
Aliyun LOG Java Producer 是一个易于使用且高度可配置的 Java 类库,它有如下功能特点:
使用 producer 相对于直接通过 API 或 SDK 向 LogHub 写数据会有如下优势。
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer 实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
在可用内存充足的前提下,producer 会对发往 LogHub 的数据进行缓存,因此用户调用 send 方法时能够立即返回,不会阻塞,达到计算与 I/O 逻辑分离的目的。稍后,用户可以通过返回的 future 对象或传入的 callback 获得数据发送的结果。
可以通过参数控制 producer 用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样做一方面避免了 producer 无限制地消耗资源,另一方面可以让您根据实际情况平衡资源消耗和写入吞吐量。
综上所述,producer 在给您带来诸多优势的同时只暴露了简单的接口,为您屏蔽了复杂的底层细节;另外,您也无须担心它会影响到上层业务的正常运行,大大降低了数据接入门槛。
Producer 的实现比较复杂,但使用起来却非常简单。想了解 producer 的正确打开方式请参考文章 。
为了让您更好地理解 producer 的表现行为,本章将带您探究它的实现原理,包括数据写入逻辑、核心组件的实现方式以及如何优雅地关闭 producer 中的各个组件。Producer 的整体架构如下图所示。
Producer 的数据写入逻辑如下:
用户调用producer.send()
方法发送数据,数据会被加入到 LogAccumulator 中的某个 ProducerBatch 里。通常情况下 send 方法会立即返回,但如果该 producer 实例没有足够空间容纳当前数据,此方法会被阻塞直到下列任意一个条件被满足。
如果数据发送失败,且满足下列任意一个条件,会将该 ProducerBatch 写入失败队列。
Producer 包含 、、、、、 等核心组件。
为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送,本小节介绍的 LogAccumulator 的主要作用便是合并待发送的数据。由于服务端要求具有相同 project、logstore、topic、source、shardHash 的数据才能组装成一个大包,LogAccumulator 会根据数据的这些属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述五元组,value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用 ConcurrentMap 作为 map 的实现。
LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。
RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用 DelayQueue 存放这些 batch。DelayQueue 是一种按时间排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。
Mover 是一个独立的线程,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 IOThreadPool 里。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。
IOThreadPool 中的工作线程用于真正执行数据发送任务,该线程池的大小可通过参数 ioThreadCount 指定,默认为可用处理器个数乘以 2。
SendProducerBatchTask 封装了 batch 发送逻辑。为了避免阻塞 IO 线程,不论当前 batch 发送成功与否都会将其投递到队列中交由独立线程去执行回调。另外,如果某个发送失败的 batch 满足重试条件,不会在当前 IO 线程中立即重试(立即重试通常也会失败),而是根据指数退避策略将其投递到 RetryQueue 中。
Producer 会启动一个 SuccessBatchHandler 和一个 FailureBatchHandler 分别用来处理发送成功或失败的 batch。Handler 在执行完 batch 的 callback、设置好 batch 的 future 后便会“释放”该 batch 占用的内存空间,供新的数据使用。分开处理的原因是为了隔离发送成功和发送失败的 batch,保持 producer 整体的流动性。
要实现优雅关闭,需要做到以下几点:
为了达到上述目标,producer 的关闭逻辑设计如下:
可以看到,这里按照数据流动方向依次关闭队列和线程来达到优雅关闭、安全退出的目的。
Aliyun LOG Java Producer 是对老版 producer 的全面升级,解决了上一版存在的多个问题,包括网络异常情况下 CPU 占用率过高、关闭 producer 可能出现少量数据丢失等问题。另外,在容错方面也进行了加强,就算用户使用不合理,在资源、吞吐、隔离上都有较好的保证。
转载地址:http://hcgul.baihongyu.com/