文章目录
  1. 1. channel->sink简单流程
  2. 2. sink解析
  3. 3. source解析
  4. 4. 事务
  5. 5. channel解析
  6. 6. lifecyle
  7. 7. 启动

本文是flume源码剖析系列的第一篇,主要关注flume的大概设计实现及主要流程,具体细节会在后续进行探究。

channel->sink简单流程

from TestKafkaSink.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void testDefaultTopic() {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();

String msg = "default-topic-test";
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes());
memoryChannel.put(event);
tx.commit();
tx.close();

try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}

String fetchedMsg = new String((byte[])
testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)
.message());
assertEquals(msg, fetchedMsg);
}

sink解析

以kafkaSink为例
三大要素:Proterties,Producer — 获取数据后实际写入操作;Counter — 发送耗时记录及发送数据量记录
start(),stop()仅为Producer和Counter的start,stop及生命周期状态的改变;
数据实际处理为process():在Channel事物中完成取数据及发送过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
try {
long processedEvents = 0;

transaction = channel.getTransaction();
transaction.begin();

messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();

if (event == null) {
// no events available in channel
break;
}

byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();

if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}

eventKey = headers.get(KEY_HDR);

// create a message and add to buffer
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);

}

// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}

transaction.commit();

}

source解析

以kafkaSource为例
三大要素:properties — 配置, consumer — 从源(kafka)订阅数据, counter — 状态计数;
start(),stop()仅为consumer,counter,lifecycle的启动及状态改变;
数据实际处理为process(),更进一步,实际为ChannelProcess过程,对订阅到的每一条数据在Channel事务中放进Channel中;

1
2
3
4
5
if (eventList.size() > 0) {
getChannelProcessor().processEventBatch(eventList);
counter.addToEventAcceptedCount(eventList.size());
eventList.clear();
}

事务

1
enum TransactionState {Started, Committed, RolledBack, Closed };

事务必须是线程安全的;
通常的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* Channel ch = ...
* Transaction tx = ch.getTransaction();
* try {
* tx.begin();
* ...
* // ch.put(event) or ch.take()
* ...
* tx.commit();
* } catch (ChannelException ex) {
* tx.rollback();
* ...
* } finally {
* tx.close();
* }

状态图:
open -> {put}
open -> {take}
new -> {begin} -> open
open -> {commit} -> completed
open -> {rollback} -> completed
new|completed -> {close} -> closed

channel解析

channel是主要组件实现,以memoryChannel为例;
主要实现接口:doPut(source),doTake(sink),doCommit(source&sink),doRollback(soruce&sink)
主要借助三个队列,putList(doPut),takeList(doTake),queue(main channel)
doPut里,主要负责将event放到putList里;
doTake里,先从queue里取出第一条,然后放到takeList里,最后返回event
doCommit里,先比较takeList.size() < putList.size()看是否需要增加queue大小,然后将putList里event添加到queue尾部,并清空takeList和putList,最后更新计数;
doRollback里,只需要将takeList里的event再放回queue里,putList直接清空;

lifecyle

1
enum LifecycleState {IDLE, START, STOP, ERROR;}

source,channel,sink均实现LifecycleAware接口;
LifecycleSupervisor:

1
2
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState)
{}

详细如何监控:略

启动

flume.node.Application.java

1
2
3
4
5
6
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}

因在flume里每一个组件均实现的LifecyleAware接口,所以都可以加上supervisor
以zk保存配置的方式:略

文章目录
  1. 1. channel->sink简单流程
  2. 2. sink解析
  3. 3. source解析
  4. 4. 事务
  5. 5. channel解析
  6. 6. lifecyle
  7. 7. 启动