Aria2性能优化上篇


简介

最近俩个多月都在研究Aria2性能方面的问题,这里是Aria2的开源地址 https://github.com/aria2/aria2,
原本的Aria2是tcp的协议,我们在原本的基础上,添加了utp的支持,这里是utp的开源地址,https://github.com/bittorrent/libutp
也即是utorrent Bt下载软件开源出来的一种协议,本质是一种udp的协议,只是帮我们做了内部的处理,使我们使用起来跟tcp一样,下面分析下性能的情况

性能的表现

测试环境Ubuntu18.04 ,放在阿里云服务器上, 测试下载文件1.8G,测试方法模拟多个用户真实下载

测试我们之前移植utp到Aria2性能情况,
结果显示

上面的情况是单线程utp的情况,可以看到utp的cpu几乎占满,接下来我们通过捉包分析差异
结果显示
utp包的大小
结果显示
可以看到一个tcp包的大小为utp的64倍,加上utp为了tcp的逻辑实现的丢包重发的逻辑,通过测试下载同一个内容,utp发的包数量为tcp的200倍,通过捉包分析utorrent发包的数量比tcp的数量也为200倍,查看utorrent线程的数量既然达到了38个线程
结果显示

工具分析

首先使用pidstat 命令分析,可以看出用户态和内核态几乎各一半,当然还可以通过top,vmstat等工具来分析,甚至可以通过gcc增加-pg选项运行生成性能分析文件gmon.out.
结果显示

代码分析

首先看整体的框架

error_code::Value main(int argc, char** argv)
{
  error_code::Value exitStatus = error_code::FINISHED;
  if (context.reqinfo) {
    exitStatus = context.reqinfo->execute();
  }
  return exitStatus;
}
error_code::Value MultiUrlRequestInfo::execute()
{
  ...
  try {
    e_->run();
  }
  catch (RecoverableException& e) {
    A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
  }
  ...
  return returnValue;
}

这里关键是这个run方法

//引擎开始运行 这边参数为true
int DownloadEngine::run(bool oneshot)
{
  GlobalHaltRequestedFinalizer ghrf;
  //如果  commands_ 或者 routineCommands_ 队列不为空,commands_一开始就有添加了一个保持事件响应的引用对象KeepRunningCommand  所以不为空
  while (!commands_.empty() || !routineCommands_.empty()) {
    //如果  commands_ 不为空,等待数据
    if (!commands_.empty()) {
       waitData();
    }
    noWait_ = false;
    //重置时钟
    global::wallclock().reset();
    //计算下载的速度状态等,用来显示在输出控制台上
    calculateStatistics();

    //判断是否达到了刷新界面的时间,constexpr auto A2_DELTA_MILLIS = std::chrono::milliseconds(10);
    if (lastRefresh_.difference(global::wallclock()) + A2_DELTA_MILLIS >= refreshInterval_) {
      //刷新的间隔为1秒
      refreshInterval_ = DEFAULT_REFRESH_INTERVAL;
      //保存上一次刷新的时间
      lastRefresh_ = global::wallclock();
      //执行命令 ,状态为 Command::STATUS_ALL
      executeCommand(commands_, Command::STATUS_ALL);
    }
    else {
      //如果还没有到刷新的时间,执行命令,状态为  Command::STATUS_ACTIVE
      executeCommand(commands_, Command::STATUS_ACTIVE);
    }
    //执行命令
    executeCommand(routineCommands_, Command::STATUS_ALL);
    afterEachIteration();
    if (!noWait_ && oneshot) {
        return 1;
    }
  }
  //如果到了这里,就说明引擎已经开始停止
  onEndOfRun();
  return 0;
}

首先大概的讲解下Aria2的框架,Aria2是单线程模式,但是可以做到单cpu 时间片的效果, 内部通过执行一个一个的command来实现,其中有俩个command数组,一个是commands为普通的command,比如请求,下载类似的,还有一种为routineCommands 常规的command,也即是用来执行一些比较重要的command,比如进度保存的command等,而且每个command是有分状态的,这些状态的改变是通过epoll来改变的,Epoll通过监听对应的事件,来改变command的状态,然后决定是否要执行对应的command,当然常规的command是每次都要执行的

//引擎等待数据
void DownloadEngine::waitData()
{
  struct timeval tv;
  if (noWait_) {
    tv.tv_sec = tv.tv_usec = 0;
  }
  else {
    //求要等待的时间
    auto t = std::chrono::duration_cast<std::chrono::microseconds>(refreshInterval_);
    tv.tv_sec = t.count() / 1000000;
    tv.tv_usec = t.count() % 1000000;
  }
  //调用具体类的实现
  eventPoll_->poll(tv);
}

//tv是有超时时间的
void EpollEventPoll::poll(const struct timeval& tv)
{
  // timeout is millisec
  int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000;

  int res;
  //等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents表示每次能处理的最大事件数
  while ((res = epoll_wait(epfd_, epEvents_.get(), EPOLL_EVENTS_MAX, timeout)) == -1 && errno == EINTR);

  //如果有事件到来,处理这些事件
  if (res > 0) {
    for (int i = 0; i < res; ++i) {
      KSocketEntry* p = reinterpret_cast<KSocketEntry*>(epEvents_[i].data.ptr);
      p->processEvents(epEvents_[i].events);
    }
  }
  else if (res == -1) {
    int errNum = errno;
    A2_LOG_INFO(
        fmt("epoll_wait error: %s", util::safeStrerror(errNum).c_str()));
  }
  ...
}

p->processEvents(epEvents_[i].events); 实现为
virtual void processEvents(int events)
{
    if ((events_ & events) || ((EventPoll::IEV_ERROR | EventPoll::IEV_HUP) & events)) {
      command_->setStatusActive();
    }
    if (EventPoll::IEV_READ & events) {//有数据到来
      command_->readEventReceived();
    }
    if (EventPoll::IEV_WRITE & events) {//可以写缓冲区
      command_->writeEventReceived();
    }
    if (EventPoll::IEV_ERROR & events) {
      command_->errorEventReceived();
    }
    if (EventPoll::IEV_HUP & events) {
      command_->hupEventReceived();
    }
}

上面大体的思路是这样的,每次都会执行waitData来判断是否有数据到来,判断数据到来内部的实现是通过Epoll机制来实现的,这里EpollEventPoll 的逻辑是通过监听这些事件,实时的判断是否有数据的变更,如果有相应的事件到来,就改变command对应的状态,当然如果没有相应的事件到来,epoll是有设置超时时间的,所以当没有时间到来的时候,这个线程是可以休息的但是当有时间到来的时候,则会立刻的处理对应的事件,比如有数据到来当前command的状态就会变为 command_->setStatusActive();,接下来继续看

//判断是否达到了刷新界面的时间,constexpr auto A2_DELTA_MILLIS = std::chrono::milliseconds(10);
if (lastRefresh_.difference(global::wallclock()) + A2_DELTA_MILLIS >= refreshInterval_) {
    //刷新的间隔为1秒
    refreshInterval_ = DEFAULT_REFRESH_INTERVAL;
    //保存上一次刷新的时间
    lastRefresh_ = global::wallclock();
    //执行命令 ,状态为 Command::STATUS_ALL
    executeCommand(commands_, Command::STATUS_ALL);
    }
    else {
    //如果还没有到刷新的时间,执行命令,状态为  Command::STATUS_ACTIVE
    executeCommand(commands_, Command::STATUS_ACTIVE);
}
//执行命令
executeCommand(routineCommands_, Command::STATUS_ALL);
这里就是通过epoll将有数据到来改变状态的command分别处理,如果为STATUS_ACTIVE 状态的则立刻处理,否则要等一段时间才有机会执行

结果

这个框架采用单线程的方式来处理,内部通过epoll来处理,当没有数据到来则通过epoll设置的超时时间,来休眠当前的线程,如果有数据到来则立刻处理,再回到我们前面引进utp之后性能为什么会变的这么差,前面分析过utp发包的总量为tcp发包总量的200倍,也即是epoll触发有数据到会更多,从而cpu少了更多的休眠时间,导致这个while循环一直循环导致cpu很高,也即是说引进utp的程序,在网络不好的情况下,可以简单的类比为下面的情况
while(1){
….
}
而且由于是单线程的方式,当command的数量起来之后,cpu也是很吃力的,一个个command排队执行也是非常低效率的,下篇文章介绍怎么优化这个问题


文章作者: AheadSnail
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 AheadSnail !
评论
  目录