flume源码剖析之关键数据结构
本文是flume源码剖析系列的第一篇,主要关注flume的大概设计实现及主要流程,具体细节会在后续进行探究。
channel->sink简单流程
from TestKafkaSink.java
1 | public void testDefaultTopic() { |
sink解析
以kafkaSink为例
三大要素:Proterties,Producer — 获取数据后实际写入操作;Counter — 发送耗时记录及发送数据量记录
start(),stop()仅为Producer和Counter的start,stop及生命周期状态的改变;
数据实际处理为process():在Channel事物中完成取数据及发送过程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43try {
long processedEvents = 0;
transaction = channel.getTransaction();
transaction.begin();
messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in channel
break;
}
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR);
// create a message and add to buffer
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
}
// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}
transaction.commit();
}
source解析
以kafkaSource为例
三大要素:properties — 配置, consumer — 从源(kafka)订阅数据, counter — 状态计数;
start(),stop()仅为consumer,counter,lifecycle的启动及状态改变;
数据实际处理为process(),更进一步,实际为ChannelProcess过程,对订阅到的每一条数据在Channel事务中放进Channel中;1
2
3
4
5if (eventList.size() > 0) {
getChannelProcessor().processEventBatch(eventList);
counter.addToEventAcceptedCount(eventList.size());
eventList.clear();
}
事务
1 | enum TransactionState {Started, Committed, RolledBack, Closed }; |
事务必须是线程安全的;
通常的使用方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14* Channel ch = ...
* Transaction tx = ch.getTransaction();
* try {
* tx.begin();
* ...
* // ch.put(event) or ch.take()
* ...
* tx.commit();
* } catch (ChannelException ex) {
* tx.rollback();
* ...
* } finally {
* tx.close();
* }
状态图:
open -> {put}
open -> {take}
new -> {begin} -> open
open -> {commit} -> completed
open -> {rollback} -> completed
new|completed -> {close} -> closed
channel解析
channel是主要组件实现,以memoryChannel为例;
主要实现接口:doPut(source),doTake(sink),doCommit(source&sink),doRollback(soruce&sink)
主要借助三个队列,putList(doPut),takeList(doTake),queue(main channel)
doPut里,主要负责将event放到putList里;
doTake里,先从queue里取出第一条,然后放到takeList里,最后返回event
doCommit里,先比较takeList.size() < putList.size()看是否需要增加queue大小,然后将putList里event添加到queue尾部,并清空takeList和putList,最后更新计数;
doRollback里,只需要将takeList里的event再放回queue里,putList直接清空;
lifecyle
1 | enum LifecycleState {IDLE, START, STOP, ERROR;} |
source,channel,sink均实现LifecycleAware接口;
LifecycleSupervisor:1
2public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {}
详细如何监控:略
启动
flume.node.Application.java1
2
3
4
5
6public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
因在flume里每一个组件均实现的LifecyleAware接口,所以都可以加上supervisor
以zk保存配置的方式:略