如果还没看过Flume-ng源码解析之启动流程,可以点击 查看
1 接口介绍
组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent
1.1 LifecycleAware
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface LifecycleAware { public void start(); public void stop(); public LifecycleState getLifecycleState(); }
非常简单就是三个方法,start()、stop()和getLifecycleState,这个接口是flume好多类都要实现的接口,包括
所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命周期的都会实现该接口,当然组件们也是要实现的!1.2 NamedComponent
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface NamedComponent { public void setName(String name); public String getName(); }
这个没什么好讲的,就是用来设置名字的。
2 Channel
作为Flume三大核心组件之一的Channel,我们有必要来看看它的构成:
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); }
那么从上面的接口中我们可以看到Channel的主要功能就是put()和take(),那么我们就来看一下它的具体实现。这里我们选择MemoryChannel作为例子,但是MemoryChannel太长了,我们就截取一小段来看看
public class MemoryChannel extends BasicChannelSemantics { private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class); private static final Integer defaultCapacity = Integer.valueOf(100); private static final Integer defaultTransCapacity = Integer.valueOf(100); public MemoryChannel() { } ... }
我们又看到它继承了BasicChannelSemantics ,从名字我们可以看出它是一个基础的Channel,我们继续看看看它的实现
@InterfaceAudience.Public@InterfaceStability.Stablepublic abstract class BasicChannelSemantics extends AbstractChannel { private ThreadLocalcurrentTransaction = new ThreadLocal (); private boolean initialized = false; protected void initialize() {} protected abstract BasicTransactionSemantics createTransaction(); @Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); } @Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; } }
找了许久,终于发现了put()和take(),但是仔细一看,它们内部调用的是BasicTransactionSemantics 的put()和take(),有点失望,继续来看看BasicTransactionSemantics
public abstract class BasicTransactionSemantics implements Transaction { private State state; private long initialThreadId; protected void doBegin() throws InterruptedException {} protected abstract void doPut(Event event) throws InterruptedException; protected abstract Event doTake() throws InterruptedException; protected abstract void doCommit() throws InterruptedException; protected abstract void doRollback() throws InterruptedException; protected void doClose() {} protected BasicTransactionSemantics() { state = State.NEW; initialThreadId = Thread.currentThread().getId(); } protected void put(Event event) { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "put() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "put() called when transaction is %s!", state); Preconditions.checkArgument(event != null, "put() called with null event!"); try { doPut(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ChannelException(e.toString(), e); } } protected Event take() { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "take() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "take() called when transaction is %s!", state); try { return doTake(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } protected State getState() { return state; } ...//我们这里只是讨论put和take,所以一些暂时不涉及的方法就被我干掉,有兴趣恩典朋友可以自行阅读 protected static enum State { NEW, OPEN, COMPLETED, CLOSED } }
又是一个抽象类,put()和take()内部调用的还是抽象方法doPut()和doTake(),看到这里,我相信没有耐心的同学已经崩溃了,但是就差最后一步了,既然是抽象类,那么最终Channel所使用的肯定是它的一个实现类,这时候我们可以回到一开始使用的MemoryChannel,到里面找找有没有线索,一看,MemoryChannel中就藏着个内部类
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDequetakeList; private LinkedBlockingDeque putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque (transCapacity); takeList = new LinkedBlockingDeque (transCapacity); channelCounter = counter; } @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } @Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); if (takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized (queueLock) { event = queue.poll(); } Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); takeByteCounter += eventByteSize; return event; } //...依然删除暂时不需要的方法 }
在这个类中我们可以看到doPut()和doTake()的实现方法,也明白MemoryChannel的put()和take()最终调用的是MemoryTransaction 的doPut()和doTake()。
有朋友看到这里以为这次解析就要结束了,其实好戏还在后头,Channel中还有两个重要的类ChannelProcessor和ChannelSelector,耐心地听我慢慢道来。
3 ChannelProcessor
ChannelProcessor 的作用就是执行put操作,将数据放到channel里面。每个ChannelProcessor实例都会配备一个ChannelSelector来决定event要put到那个channl当中
public class ChannelProcessor implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class); private final ChannelSelector selector; private final InterceptorChain interceptorChain; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; this.interceptorChain = new InterceptorChain(); } public void initialize() { this.interceptorChain.initialize(); } public void close() { this.interceptorChain.close(); } public void configure(Context context) { this.configureInterceptors(context); } private void configureInterceptors(Context context) { //配置拦截器 } public ChannelSelector getSelector() { return this.selector; } public void processEventBatch(Listevents) { ... while(i$.hasNext()) { Event optChannel = (Event)i$.next(); List tx = this.selector.getRequiredChannels(optChannel); ...//将event放到Required队列 t1 = this.selector.getOptionalChannels(optChannel); Object eventQueue; ...//将event放到Optional队列 } ...//event的分配操作 } public void processEvent(Event event) { event = this.interceptorChain.intercept(event); if(event != null) { List requiredChannels = this.selector.getRequiredChannels(event); Iterator optionalChannels = requiredChannels.iterator(); ...//event的分配操作 List optionalChannels1 = this.selector.getOptionalChannels(event); Iterator i$1 = optionalChannels1.iterator(); ...//event的分配操作 } } }
为了简化代码,我进行了一些删除,只保留需要讲解的部分,说白了Channel中的两个写入方法,都是需要从作为参数传入的selector中获取对应的channel来执行event的put操作。接下来我们来看看ChannelSelector
4 ChannelSelector
ChannelSelector是一个接口,我们可以通过ChannelSelectorFactory来创建它的子类,Flume提供了两个实现类MultiplexingChannelSelector和ReplicatingChannelSelector。
public interface ChannelSelector extends NamedComponent, Configurable { void setChannels(Listvar1); List getRequiredChannels(Event var1); List getOptionalChannels(Event var1); List getAllChannels(); }
通过ChannelSelectorFactory 的create来创建,create中调用getSelectorForType来获得一个selector,通过配置文件中的type来创建相应的子类
public class ChannelSelectorFactory { private static final Logger LOGGER = LoggerFactory.getLogger( ChannelSelectorFactory.class); public static ChannelSelector create(Listchannels, Map config) { ... } public static ChannelSelector create(List channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); if (conf != null) { type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); selector.setChannels(channels); Configurables.configure(selector, conf); return selector; } private static ChannelSelector getSelectorForType(String type) { if (type == null || type.trim().length() == 0) { return new ReplicatingChannelSelector(); } String selectorClassName = type; ChannelSelectorType selectorType = ChannelSelectorType.OTHER; try { selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH)); } catch (IllegalArgumentException ex) { LOGGER.debug("Selector type {} is a custom type", type); } if (!selectorType.equals(ChannelSelectorType.OTHER)) { selectorClassName = selectorType.getChannelSelectorClassName(); } ChannelSelector selector = null; try { @SuppressWarnings("unchecked") Class selectorClass = (Class ) Class.forName(selectorClassName); selector = selectorClass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to load selector type: " + type + ", class: " + selectorClassName, ex); } return selector; } }
对于这两种Selector简单说一下:
1)MultiplexingChannelSelector
下面是一个channel selector 配置文件agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexingagent_foo.sources.avro-AppSrv-source1.selector.header = Stateagent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
MultiplexingChannelSelector类中定义了三个属性,用于存储不同类型的channel
private Map> channelMapping; private Map > optionalChannels; private List defaultChannels;
那么具体分配原则如下:
- 如果设置了maping,那么会event肯定会给指定的channel,如果同时设置了optional,也会发送给optionalchannel
- 如果没有设置maping,设置default,那么event会发送给defaultchannel,如果还同时设置了optional,那么也会发送给optionalchannel
- 如果maping和default都没指定,如果有指定option,那么会发送给optionalchannel,但是发送给optionalchannel不会进行失败重试
2)ReplicatingChannelSelector
分配原则比较简单
- 如果是replicating的话,那么如果没有指定optional,那么全部channel都有,如果某个channel指定为option的话,那么就要从requiredChannel移除,只发送给optionalchannel
5 总结:
作为一个承上启下的组件,Channel的作用就是将source来的数据通过自己流向sink,那么ChannelProcessor就起到将event put到分配好的channel中,而分配的规则是由selector决定的,flume提供的selector有multiplexing和replicating两种。所以ChannelProcessor一般都是在Source中被调用。那么Channel的take()肯定是在Sink中调用的。