flume-ng 源码分析 - 核心组件分析

本贴最后更新于 1919 天前,其中的信息可能已经时移世异

从第一篇分析可知,flume 中所有的组件都会实现 LifecycleAware 接口。该接口定义如下:

public interface LifecycleAware {
  public void start();
  public void stop();
  public LifecycleState getLifecycleState();
}

在组件启动的时候会调用 start 方法,当有异常时调用 stop 方法。getLifecycleState 方法返回该组件的状态。包含 ** IDLE, START, STOP, ERROR; **

当在组件开发中需要配置一些属性的时候可以实现 ** Configurable ** 接口

public interface Configurable {
  public void configure(Context context);

}

下面开始分析 Agent 中各个组件的实现

source 实现

source 定义

public interface Source extends LifecycleAware, NamedComponent {
 public void setChannelProcessor(ChannelProcessor channelProcessor);
 public ChannelProcessor getChannelProcessor();
}

可以看到 Source 继承了 LifecycleAware 接口,并且提供了 ** ChannelProcessor ** 的 getter 和 setter 方法,channelProcessor 在** 常用架构篇中 **降到提供了日志过滤链,和 channel 选择的功能。所以 Source 的逻辑应该都在 LifecycleAware 中的 start,stop 方法中.

source 创建

启动篇中,我们讲到了 flume 是如何启动的。大致流程就是读取配置文件,生成 flume 的各种组件,执行各个组件的 start()方法。在** getConfiguration() 方法中调用了 loadSources() **方法。

可以看到在 loadSources 方法中如何创建 Source 的

private void loadSources(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap,
      Map<String, SourceRunner> sourceRunnerMap)
      throws InstantiationException {

    Set<String> sourceNames = agentConf.getSourceSet();//获取所有的Source
    Map<String, ComponentConfiguration> compMap =agentConf.getSourceConfigMap(); //获取所有Source对应的配置
    for (String sourceName : sourceNames) {
      ComponentConfiguration comp = compMap.get(sourceName);//获取该source对应的配置
      if(comp != null) {
        SourceConfiguration config = (SourceConfiguration) comp; //转化为source配置

        Source source = sourceFactory.create(comp.getComponentName(),
            comp.getType());//通过sourceFactory 创建source
        try {
          Configurables.configure(source, config);//配置组件的其他属性
          Set<String> channelNames = config.getChannels()//获取该source的所有channel名称
          List<Channel> sourceChannels = new ArrayList<Channel>();
          for (String chName : channelNames) {//遍历所有的额channle,如果该channel已经实例化过了并且在channelComponentMap中已经存储了,那么将该channel放入sourceChannels
            ChannelComponent channelComponent = channelComponentMap.get(chName);
            if(channelComponent != null) {
              sourceChannels.add(channelComponent.channel);
            }
          }
          if(sourceChannels.isEmpty()) {//如果这个source没有关联到任何channel那么直接抛出异常
            String msg = String.format("Source %s is not connected to a " +
                "channel",  sourceName);
            throw new IllegalStateException(msg);
          }
          //以下创建出ChannelProcessor并且配置其他属性
          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);

          source.setChannelProcessor(channelProcessor);//设置channelSelector
          sourceRunnerMap.put(comp.getComponentName(),
              SourceRunner.forSource(source));//将改source,以及对应的SourceRunner放入SourceRunnerMap中
          for(Channel channel : sourceChannels) {//遍历改source所有的channel并且将改source添加到该channel的Component中
            ChannelComponent channelComponent = Preconditions.
                checkNotNull(channelComponentMap.get(channel.getName()),
                    String.format("Channel %s", channel.getName()));
            channelComponent.components.add(sourceName);
          }
        } catch (Exception e) {
          String msg = String.format("Source %s has been removed due to an " +
              "error during configuration", sourceName);
          LOGGER.error(msg, e);
        }
      }
    }
    ......

从上面的分析中可以看出,Source 是后 SourceFactory 创建的,创建之后绑定到 SourceRunner 中,并且在 SourceRunner 中启动了 Source。
SourceFactory 只有一个实现 DefaultSourceFactory。创建 Source 过程如下:

  
  public Source create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of source {}, type {}", name, type);
    Class<? extends Source> sourceClass = getClass(type);//通过对应的类型找到对应的class
    try {
      Source source = sourceClass.newInstance();//直接创建实例
      source.setName(name);
      return source;
    } catch (Exception ex) {
      throw new FlumeException("Unable to create source: " + name
          +", type: " + type + ", class: " + sourceClass.getName(), ex);
    }
  }
  

在创建重,通过 type 来或者 source 类的 class。在 getClass 方法中,首先会去找 type 对应类型的 class。在 SourceType 中定义的。如果没有找到,则直接获得配置的类全路径。最后通过 Class.forName(String)获取 class 对象。

source 提供了两种方式类获取数据:轮训拉去和事件驱动

14961359396630.jpg

PollableSource 提供的默认实现如下:
14962066716627.jpg

比如 KafkaSource 利用 Kafka 的 ConsumerApi,主动去拉去数据。

EventDrivenSource 提供的默认实现如下

14962068696930.jpg

如 HttpSource,NetcatSource 就是事件驱动的,所谓事件驱动也就是被动等待。在 HttpSource 中内置了一个 Jetty server,并且设置 FlumeHTTPServlet 作为 handler 去处理数据。

source 的启动

从上面的分析中知道,在启动 flume 读取配置文件时,会将所有的组件封装好,然后再启动。对于 Source 而言,封装成了 SourceRunner,通过 SourceRunner 间接启动 Source。

public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {//判断该source是否为PollableSource
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {//判断该source是否为EventDrivenSource
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {//否则抛出异常
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }
    return runner;
  }

从上面可以看出 SourceRunner 默认提供两种实现,PollableSourceRunner,EventDrivenSource.分别对应 PollableSource 和 EventDrivenSource。

查看 PollableSourceRunner 是如何启动的

@Override
  public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();//初始化ChannelProcessor
    source.start();//启动source组件

    runner = new PollingRunner();//单独启动一个线程去轮询source

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;//设置状态为START
  }

在 PollableSourceRunner 中我们看到单独启动一个线程去执行 PollingRunner,这个线程的作用就是不断的去轮询。查看 PollingRunner 的实现

@Override
    public void run() {
      logger.debug("Polling runner starting. Source:{}", source);

      while (!shouldStop.get()) {//没有停止,那就继续吧
        counterGroup.incrementAndGet("runner.polls");

        try {
        //真正的拉去逻辑在source process()方法中,调用该方法进行拉去数据,并且
        //判断返回状态是否为BACKOFF(失败补偿),如果是那么等待時間超时之后就会重试
          if (source.process().equals(PollableSource.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.info("Source runner interrupted. Exiting");
          counterGroup.incrementAndGet("runner.interruptions");
        } ......
      }

      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

比如 KafkaSource ,它的逻辑就在 process 方法中,

          // get next message
          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
          kafkaMessage = messageAndMetadata.message();
          kafkaKey = messageAndMetadata.key();

          // Add headers to event (topic, timestamp, and key)
          headers = new HashMap<String, String>();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);
          

EventDrivenSourceRunner

@Override
  public void start() {
    Source source = getSource();//获取source
    ChannelProcessor cp = source.getChannelProcessor();//获取source对应的ChannelProcessor
    cp.initialize();//初始化channelprocessor
    source.start();//启动source
    lifecycleState = LifecycleState.START;//标记状态为START
  }

可以看到 EventDrivenSourceRunner 和 PollableSourceRunnner 启动流程大致相同,只是 PollableSourceRunner 会额外启动一个线程去轮询 source。

channel 的实现

source 获取到数据后,会交给 channelProcessor 处理,发送到 channel。最后由 sink 消费掉。
所以 channel 是 source,sink 实现异步化的关键。

channelProcessor 中两格重要的成员

  private final ChannelSelector selector;//channel选择器
  private final InterceptorChain interceptorChain; //过滤链

InterceptorChain 是有多个 Interceptor 组成,并且实现了 Interceptor 接口

public class InterceptorChain implements Interceptor {
     private List<Interceptor> interceptors;
}

Interceptor.java

public interface Interceptor {
  public void initialize();// 做一些处理话工作
  public Event intercept(Event event);//拦截单个event并且返回
  public List<Event> intercept(List<Event> events);//批量拦截event
  public void close();
    public interface Builder extends Configurable {
    public Interceptor build();
  }//用来创建特定的Interceptor
}

Interceptor 定义了一些处理 Event 的接口,再 Event 处理之后都会返回改 Envent

从 source 的分析中我们可以知道,如果是 PollableSourceRunner 会调用 source 中的 process()方法。如果是 EventDrivenSourceRunner,就会用特定的方法来获取 source,比如 httpSource 利用 FlumeHTTPServlet 来接受消息

try {
        events = handler.getEvents(request);//从请求中获取event
      ...
            try {
        getChannelProcessor().processEventBatch(events);//通过ChanneProcess进行的processEventBatch方法进行批量处理
      } catch (ChannelException ex) {

比如 KafkaSource 是 PollableSourceRunner 那么会调用 KafkaSource 中的 process()方法。

public Status process() throws EventDeliveryException {
   ...
   
   // get next message
          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
          kafkaMessage = messageAndMetadata.message();
          kafkaKey = messageAndMetadata.key();

          // Add headers to event (topic, timestamp, and key)
          headers = new HashMap<String, String>();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);
          if (kafkaKey != null) {
            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
          }
          ......
          event = EventBuilder.withBody(kafkaMessage, headers);
          eventList.add(event);
          ......
          
        if()......  
        getChannelProcessor().processEventBatch(eventList);//交给channelProcessor处理
        counter.addToEventAcceptedCount(eventList.size());
        ...
      }
}

从以上分析不管 source 是轮询还是事件驱动的,都会触发 ChannelProcessor 中的 processEvent 或者 ProcesEventBatch 方法

public void processEventBatch(List<Event> events) {
    events = interceptorChain.intercept(events);//调用Interceptor处理events
    
    List<Channel> reqChannels = selector.getRequiredChannels(event);//获取必须成功处理的Channel ,写失败了必须回滚source
    List<Channel> optChannels = selector.getOptionalChannels(event);//获取非必须成功处理的channel,写失败了就忽略
    
    
    // 這裡分析處理必須成功channel的情況。非必須的channel處理情況一樣
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();//获取该channel上的事务
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();//开始事务

        List<Event> batch = reqChannelQueue.get(reqChannel);//获取events

        for (Event event : batch) {
          reqChannel.put(event);//处理Channel
        }

        tx.commit();//提交事务
      } catch (Throwable t) {
        tx.rollback();//发生异常回滚
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();//关闭事务
        }
      }
    }
}

最后就是 ChannelSelector ,flume 默认提供两种实现多路复用和复制。多路复用选择器可以根据 header 中的值而选择不同的 channel,复制就会把 event 复制到多个 channel 中。flume 默认是复制选择器。
14962112541107.jpg

同样 Selector 的创建也是通过 ChannelSelectorFactory 创建的.

 
 public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null){
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

默认提供复制选择器,如果配置文件中配置了选择器那么就从配置文件中获取。

上面看到在 processEventBatch 方法中调用 channel 的 put 方法。channel 中提供了基本的
put 和 take 方法来实现 Event 的流转。


 public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;//向channel中存放
  public Event take() throws ChannelException;//消费event
  public Transaction getTransaction();//获取事务
}

flume 提供的默认 channel 如下图所示:

14962117376319.jpg

sink 的实现

sink 定义:

public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
}

提供了 channel 的 setter,getter 方法。process 方法用来消费。并返回状态 READY,BACKOFF

sink 的创建

SinkConfiguration config = (SinkConfiguration) comp;
 Sink sink = sinkFactory.create(comp.getComponentName(),
 comp.getType());

sink 的创建也是通过 sinkFactory

public Sink create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of sink: {}, type: {}", name, type);
    Class<? extends Sink> sinkClass = getClass(type);//获取sink对应的类型的Class
    try {
      Sink sink = sinkClass.newInstance();//创建实例
      sink.setName(name);
      return sink;
    } catch (Exception ex) {
      throw new FlumeException("Unable to create sink: " + name
          + ", type: " + type + ", class: " + sinkClass.getName(), ex);
    }
  }

通过传入的 type 找到对应的 Class 要是没有找到则直接通过 Class.forNamae(String name)来创建

sink 还提供了分组功能。该功能由 SinkGroup 实现。在 SinkGroup 内部如何调度多个 Sink,则交给 SinkProcessor 完成。

sink 的启动

和 Source 一样,flume 也为 Sink 提供了 SinkRunner 来流转 Sink
在 sinkRunner 中

public void start() {
    SinkProcessor policy = getPolicy();
    policy.start();//启动SinkProcessor
    runner = new PollingRunner();//单独启动一个线程,从channel中消费数据
    runner.policy = policy;
    runner.counterGroup = counterGroup;
    runner.shouldStop = new AtomicBoolean();
    runnerThread = new Thread(runner);
    runnerThread.setName("SinkRunner-PollingRunner-" +
        policy.getClass().getSimpleName());
    runnerThread.start();
    lifecycleState = LifecycleState.START;
  }

sinkRunner 中通过启动 SinkProcessor 间接启动 Sink,并且单独启动一个线程,不停地调用 process()方法从 channel 中消费数据
在 SinkProcessor 中,如果是 DefaultSinkProcessor 那么直接调用 sink.start()方法启动 sink。如果是 LoadBalancingSinkProcessor,FailoverSinkProcessor 由于这两种处理器中包含多个 Sink,所以会依次遍历 sink 调用 start()方法启动

public void run() {
      logger.debug("Polling sink runner starting");

      while (!shouldStop.get()) {//判断是否停止
        try {
          if (policy.process().equals(Sink.Status.BACKOFF)) {//调用SinkProcessor的proces()方法进行处理
            counterGroup.incrementAndGet("runner.backoffs");
            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        }
      }
    }

该线程会不停的执行 SinkProcessor 的 process()方法,而 SinkProcessor 的 process()方法会调用对应的 Sink 的 process()方法。然后判断处理状态如果是失败补偿,那么等待超时时间后重试

SinkGroup

public class SinkGroup implements Configurable, ConfigurableComponent {
  List<Sink> sinks;
  SinkProcessor processor;
  SinkGroupConfiguration conf;
  ......  
}

SinkGroup 中包含多个 Sink,并且提供一个 SinkProcessor 来处理 SinkGroup 内部调度

SinkProcessor

SinkProcessor 默认提供三种实现。DefaultSinkProcessor,LoadBalancingSinkProcessor,FailoverSinkProcessor

14962397506282.jpg

DefaultSinkProcessor:默认实现,适用于单个 sink
LoadBalancingSinkProcessor:提供负载均衡
FailoverSinkProcessor:提供故障转移

DefaultSinkProcessor

public class DefaultSinkProcessor implements SinkProcessor,
ConfigurableComponent {
  private Sink sink;
  private LifecycleState lifecycleState;

  @Override
  public void start() {
    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
    sink.start();//启动sink
    lifecycleState = LifecycleState.START;
  }

  @Override
  public Status process() throws EventDeliveryException {
    return sink.process();
  }

  @Override
  public void setSinks(List<Sink> sinks) {
    Preconditions.checkNotNull(sinks);
    Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
        + "only handle one sink, "
        + "try using a policy that supports multiple sinks");
    sink = sinks.get(0);
  }
}

从上面可以看出 DefaultSinkProcessor 只能处理一个 Sink。在 process 方法中调用 sink 的方法。具体到某个具体的 Sink,比如 HDFSEventSink,那么就执行该 sink 的 process 方法

接下来分析 SinkProcessor 中负载均衡和故障转移 是如何具体实现的。

FailOverSinkProcessor 实现分析

** FailOverSinkProcessor ** 中 process()方法实现如下:

@Override
  public Status process() throws EventDeliveryException {
    Long now = System.currentTimeMillis();
    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {//检查失败队列是否有sink,并且队列中第一个sink过了失败补偿时间
      FailedSink cur = failedSinks.poll();//从失败队列中获取第一个sink,并且在队列中删除
      Status s;
      try {
        s = cur.getSink().process();//调用sink的process()方法进行处理
        if (s  == Status.READY) {//如果状态是就绪
          liveSinks.put(cur.getPriority(), cur.getSink());//将该sink放入存活队列
          activeSink = liveSinks.get(liveSinks.lastKey());//重新赋值给activeSink
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {//sink 处理失败
          failedSinks.add(cur);//加入失败队列
        }
        return s;
      } catch (Exception e) {
        cur.incFails();//发生异常,增加失败次数
        failedSinks.add(cur);//放入失败队列
      }
    }

    //如果失败队列为空,或者失败队列中所有的sink都没有达到失败补偿时间,那么交给activeSink进行处理,
    Status ret = null;
    while(activeSink != null) {
      try {
        ret = activeSink.process();//交给activeSink 处理
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();//如果activeSink处理失败,则把activeSink从存活队列中移动到失败队列中
      }
    }
    throw new EventDeliveryException("All sinks failed to process, " +
        "nothing left to failover to");
  }
  • 存活队列是一个 SortMap<Key,Value> 其中 key 是 sink 的优先级。activeSink 默认取存活队列中的最后一个,存活队列是根据配置的 sink 优先级来排序的
  • 失败队列是一个优先队列,按照 FailSink 的 refresh 属性进行排序
  @Override
    public int compareTo(FailedSink arg0) {
      return refresh.compareTo(arg0.refresh);
    }

refresh 属性,在 FailSink 创建时和 sink 处理发生异常时 会触发调整
refresh 调整策略 如下:

  private void adjustRefresh() {
      refresh = System.currentTimeMillis()
              + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
    }

refresh 等于系统当前的毫秒加上最大等待时间(默认 30s)和失败次数指数级增长值中最小的一个。FAILURE_PENALTY 等 1s;(1 << sequentialFailures) * FAILURE_PENALTY)用于实现根据失败次数等待时间指数级递增。

一个配置的 failOver 具体的例子:

  host1.sinkgroups = group1
  host1.sinkgroups.group1.sinks = sink1 sink2
  host1.sinkgroups.group1.processor.type = failover
  host1.sinkgroups.group1.processor.priority.sink1 = 5
  host1.sinkgroups.group1.processor.priority.sink2 = 10
  host1.sinkgroups.group1.processor.maxpenalty = 10000

LoadBalancingSinkProcessor 实现分析

loadBalaneingSinkProcessor 用于实现 sink 的负载均衡,其功能通过 SinkSelector 实现。类似于 ChannelSelector 和 Channel 的关系

14965813235880.jpg

14965813925723.jpg

SinkSelector 中模式有三种实现
1.固定顺序
2.轮询
3.随机

LoadBalancingSinkProcessor 中使用均衡负载的方式

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;
    Iterator<Sink> sinkIterator = selector.createSinkIterator();//使用sinkSelector创建Sink迭代器。三种方式有各自不同的实现
    while (sinkIterator.hasNext()) {//遍历Sink
      Sink sink = sinkIterator.next();//获取sink
      try {
        status = sink.process();//调用sink处理
        break;//如果处理成功那么本次负载均衡就算完成
      } catch (Exception ex) {
        selector.informSinkFailed(sink);//如果发生异常则通知SinkSelector,采用相应的补偿算法进行处理
        LOGGER.warn("Sink failed to consume event. "
            + "Attempting next sink if available.", ex);
      }
    }
    if (status == null) {
      throw new EventDeliveryException("All configured sinks have failed");
    }
    return status;
  }

在上面的解释中,最大的两个疑惑就是

  • 这个 Sink 迭代器也就是 createSinkIterator() 是如何实现的
  • 发生异常后 SinkSelector 的处理是如何实现的

先来看 createSinkIterator 的实现。首先看 RoundRobinSinkSelector 的实现

14965819912139.jpg

如上图所示 RoundRobinSinkSelector 内部包含一个 OrderSelector 的属性。

  private OrderSelector<Sink> selector;

    RoundRobinSinkSelector(boolean backoff){
      selector = new RoundRobinOrderSelector<Sink>(backoff);
    }
    
    @Override
    public Iterator<Sink> createSinkIterator() {
      return selector.createIterator();
    }

内部通过一个 RoundRobinOrderSelector 来实现。查看起 createIterator 实现

  @Override
  public Iterator<T> createIterator() {
    List<Integer> activeIndices = getIndexList();//获取存活sink的索引
    int size = activeIndices.size();//存活sink的個數
    //如果下一個sink的位置超過了存活sin的個數,重新指向头
    if (nextHead >= size) {
      nextHead = 0;
    }
    int begin = nextHead++; //获取起始位置
    if (nextHead == activeIndices.size()) {//检查是否超过范围,超过了从头开始
      nextHead = 0;
    }
    int[] indexOrder = new int[size];//创建一个数组,来存放访问的顺序
    for (int i = 0; i < size; i++) {
      indexOrder[i] = activeIndices.get((begin + i) % size);//用取模的方法实现轮询,每次都从上一个sink的下一个sink 索引开始,由begin控制
    }
    //indexOrder 是访问顺序,getObjects返回相关所有的sink
    return new SpecificOrderIterator<T>(indexOrder, getObjects());
  }

接下来看一下 getIndexList 的实现

  protected List<Integer> getIndexList() {
    long now = System.currentTimeMillis();//当前时间

    List<Integer> indexList = new ArrayList<Integer>();//用来存放sink的索引

    int i = 0;
    for (T obj : stateMap.keySet()) {//获取所有sink
      if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
         //如果没有开启退避,或者该sink 到失败补偿的时间,那么将改sink的索引放入IndexList
        indexList.add(i);
      }
      i++;
    }
    return indexList;
  }

stateMap 是一个 LinkedHashMap<T,FailState> 其中 T 在这里指的是 Sink。
如果没有开启了退避算法,那么会认为每个 sink 都是存活的,所有的 sink 都加到 IndexList。否则等到了失败补偿时间才会加入到 IndexList。可以通过 processor.backoff = true 配置开启

最后分析一下当 sink 处理失败 SinkSelector 是如何处理的

  public void informFailure(T failedObject) {
    if (!shouldBackOff) {//如果没有开启退避算法,当然就不做任何处理
      return;
    }
    FailureState state = stateMap.get(failedObject);//获取当前失败sink的状态对象
    long now = System.currentTimeMillis();//当前时间
    long delta = now - state.lastFail;//自从上次失败的经过的时间
    long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//计算上一次退避等待的时间
    long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
    if (allowableDiff > delta) {//如果上次失败到现在最后退避时间后的一个小时内,并且是失败次数小于期望的退避次数限制,那么就增加state.sequentialFails 实际上就增加了退避的等待时间
      if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) {
        state.sequentialFails++;
      }
    } else {
      state.sequentialFails = 1;//否则就不再增加退避等待时间
    }
    state.lastFail = now;//更新最后失败时间
    state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//更新退避等待时间
  }

CONSIDER_SEQUENTIAL_RANGE 是一个常量 只为 1 小时,EXP_BACKOFF_COUNTER_LIMIT 为期望最大的退避次数 值为 16.如果上次失败到现在的是哪在上次退避等待时间超过一个小时后 或者 退避次数超过了 EXP_BACKOFF_COUNTER_LIMIT 那么退避的等待时间将不再增加。

  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 594 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...