Aria2移植utp


简介

先来了解下打洞的基础知识:

点对点穿透 需要实现的是对NAT的穿透。想实现NAT的穿透,当然要先了解NAT到底是什么,以及NAT是用来干什么的。

NAT全称Network Address Translation,意思是网络地址转换,在1994年提出。它可以对不同的IP及端口进行映射,将一个网络地址转换为另一个。NAT的主要用途,大家可以看路由器。
路由器具有一个WAN口及多个LAN口;WAN口对外,连接因特网,拥有公网IP;LAN口对内,构建本地网络,分配的是私网IP。当处于LAN网下的本地主机想要访问因特网的时候,路由器就会通过NAT技术,将LAN 口的私网IP映射到WAN口的公网IP,实现网络地址的转换,这样本地主机就可以访问因特网了。

NAT 工作的需要

NAT 的工作需要LAN网下的本地设备主动发起网络连接,然后NAT服务才会将这个连接映射到WAN口的公网IP完成转换。也就是说,如果本地主机没有主动发起连接,那么这个映射就不会存在,
那么公网上的机器就无法访问到私网上的机器。也就是说,只能是私网机器主动连接公网机器,而不能是公网机器主动连接私网机器。而为了实现公网机器主动连接私网机器,我们就需要穿透NAT,
这就是NAT穿透的由来。

何为打洞以及为什么要会用udp

目前比较好实现NAT穿透的方式是采用UDP连接对NAT进行打洞,然后完成连接。何为打洞呢?就是为了使NAT产生一个可用的映射。具体步骤就是在私网机器上用UDP向某台公网机器发起连接,
使得NAT产生一个可以使用的映射(洞)。然后通过这个映射(洞),就可以穿透NAT。至于为什么要用UDP,这是由于UDP的某些特性。UDP通信需要先绑定本地机器的端口,完成后就可以从这个端口收发数据,
至于从哪里收,发到哪里,可以在收发数据的时候再决定,这也就意味着我可以用这一个端口同时和多个对象通信,只要我收发数据的时候指定不同的对象即可。当本地机器用UDP向英特网上的某个服务器,发送数据的时候,这个映射不但能用来和这个服务器进行数据交互,也能用来接收其他主机发来的数据。NAT穿透就是本地主机向公网上的某台服务器发送数据,这时服务器就可以获得NAT对这台主机的映射,
在之前举得例子中就是55.66.77.88:5000这个地址。由于NAT会将55.66.77.88:5000收到的数据转发至本地主机,所以公网上的其他机器可以从服务器获取到55.66.77.88:5000这个网络地址,
然后通过这个网络地址向私网下的机器发出数据。而至于为什么不用TCP,也是由于TCP的某些特性。TCP通信的步骤与UDP不同,它需要先在两个对象之间建立一个专用通道,再用这个通道收发数据。也就是说外人无法插手。这样一来,虽然其他机器可以通过服务器获取到NAT的映射对象,也没办法利用它向私网下的机器发出数据。

由于我们这个采用的是Aria2这个开源的p2p项目,当然如果人家本身就支持的化,那当然是最好的,当然也有人去提问,但是看情况的化,近期之内是不会添加的

结果显示
结果显示

utp的使用

utp的下载及配置

可以看出来,Aria2在2015年就已经提出了,但是直到现在都没有任何的进展,当然我们不能等他支持,现在我要尝试的引进这个utp的协议,utp这个库本质就是udp的协议,只是他做了一层封装,对于使用者来说,不用去管理丢包等问题,接下来先看看utp的使用 utp下载源码地址

https://github.com/bittorrent/libutp

将源码下载下来,放在linux下面是可以直接编译的,这里为了看代码的快速,这里推荐使用Eclipse for c++,这个专门为c/c++定制的Eclipse还是挺好用的,由于他本身就包含了Makefile文件的,所以
在使用Eclipse的时候,直接File 选项 选择Makefile project with Exiting Code 导进项目,点击make就可以编译了,编译完成之后,进入到源码有一个ucat可执行程序,这个就是提供的demo

utp的使用

结果显示
服务端监听端口
结果显示
客户端连接
结果显示
utp连接上互相发送消息
结果显示

utp的源码分析

首先是解析传递过来的参数

int main(int argc, char *argv[])
{
    int i;

    o_local_address = "0.0.0.0";

    while (1) {
        int c = getopt (argc, argv, "hdlp:B:s:n");
        if (c == -1) break;
        switch(c) {
            case 'h': usage(argv[0]);                break;
            case 'd': o_debug++;                    break;
            case 'l': o_listen++;                    break;//标识是否是服务端
            case 'p': o_local_port = optarg;        break;//服务端监听的端口号
            case 'B': o_buf_size = atoi(optarg);    break;
            case 's': o_local_address = optarg;        break;
            case 'n': o_numeric++;                    break;
            //case 'w': break;    // timeout for connects and final net reads
            default:
                die("Unhandled argument: %c\n", c);
        }
    }
    ...
    setup();
    ...
}

void setup(void)
{
    //首先客户端跟服务端都要绑定一个端口号,要不然怎么能收到数据
    if (o_numeric)
        hints.ai_flags |= AI_NUMERICHOST;

    if ((error = getaddrinfo(o_local_address, o_local_port, &hints, &res)))
        die("getaddrinfo: %s\n", gai_strerror(error));

    //udp监听端口
    if (bind(fd, res->ai_addr, res->ai_addrlen) != 0)
        pdie("bind");

    freeaddrinfo(res);
    ..
    //执行utp的初始化
    ctx = utp_init(2);

    //配置utp的函数指针
    utp_set_callback(ctx, UTP_LOG,                &callback_log); //utp日志回调
    utp_set_callback(ctx, UTP_SENDTO,            &callback_sendto);//utp发送数据的回调
    utp_set_callback(ctx, UTP_ON_ERROR,            &callback_on_error);//utp错误的回调
    utp_set_callback(ctx, UTP_ON_STATE_CHANGE,    &callback_on_state_change);//utp 状态的回调
    utp_set_callback(ctx, UTP_ON_READ,            &callback_on_read);//utp接收到数据之后,解析返回原始数据的回调
    utp_set_callback(ctx, UTP_ON_FIREWALL,        &callback_on_firewall);
    utp_set_callback(ctx, UTP_ON_ACCEPT,        &callback_on_accept);//服务端收到utp连接的回调
    ...

    //o_listen 是解析传递过来的参数来确定是否是服务端,这里!代表是客户端,也即是客户端此时应该要触发连接操作
    if (! o_listen) {
        //客户端触发连接操作
        s = utp_create_socket(ctx);

        if ((error = getaddrinfo(o_remote_address, o_remote_port, &hints, &res)))
            die("getaddrinfo: %s\n", gai_strerror(error));

        utp_connect(s, res->ai_addr, res->ai_addrlen);
        freeaddrinfo(res);
    }
}

由于utp不管你数据的接收跟发送,所以当你发送数据的之前,你先将数据丢给utp,当utp处理完之后,其实就是添加数据头,然后他会调用 callback_sendto 函数,来完成真正的发送数据

uint64 callback_sendto(utp_callback_arguments *a)
{
    struct sockaddr_in *sin = (struct sockaddr_in *) a->address;
    //真正的发送数据
    sendto(fd, a->buf, a->len, 0, a->address, a->address_len);
    return 0;
}
//客户端监听标准输入,也即是命令行,获取到用户输入的内容
if ((p[0].revents & POLLIN) == POLLIN) {
    //读取用户输入的内容
    len = read(STDIN_FILENO, buf+buf_len, o_buf_size-buf_len);
    if (len < 0 && errno != EINTR)
        pdie("read stdin");
    if (len == 0) {
        debug("EOF from file\n");
        eof_flag = 1;
        close(STDIN_FILENO);
    }
    else {
        //将当前要发送的内容存储到缓冲区中,至于为什么要有缓冲区,是因为当utp还没有连接或者处于UTP_STATE_WRITABLE的时候,是不能发送数据的,这时候,就要将内容缓存起来
        buf_len += len;
        debug("Read %d bytes, buffer now %d bytes long\n", len, buf_len);
    }
    //发送用户的内容
    write_data();
}
//发送内容
void write_data(void)
{
    if (! s)
        goto out;

    while (p < buf+buf_len) {
        size_t sent;

        //返回值代表写入了多少内容,如果为0可能utp还没有连接上,或者对方处于UTP_STATE_WRITABLE 状态,此时应该退出,下次再来尝试的写
        sent = utp_write(s, p, buf+buf_len-p);
        if (sent == 0) {
            debug("socket no longer writable\n");
            return;
        }

        //如果写成功,将当前写的内容从缓存中移除
        p += sent;

        if (p == buf+buf_len) {
                debug("wrote %zd bytes; buffer now empty\n", sent);
                p = buf;
                buf_len = 0;
            }
        else
            debug("wrote %zd bytes; %d bytes left in buffer\n", sent, buf+buf_len-p);
    }
    ...
}

//接下来分析下utp状态改变的回调函数,
uint64 callback_on_state_change(utp_callback_arguments *a)
{
    debug("state %d: %s\n", a->state, utp_state_names[a->state]);
    utp_socket_stats *stats;

    switch (a->state) {
        case UTP_STATE_CONNECT://utp连接上可以发送数据了
        case UTP_STATE_WRITABLE://出现这种状态是当utp发现对方的滑动窗口太小,就会处于这种状态
            write_data();
            break;

        case UTP_STATE_EOF://这是当收到了另一方的断开连接的状态,要关闭连接
            debug("Received EOF from socket\n");
            utp_eof_flag = 1;
            if (utp_shutdown_flag) {
                utp_close(a->socket);
            }
            break;
        ....
    }

    return 0;
}

下面看看服务端的接收数据
if ((p[1].revents & POLLIN) == POLLIN) {
    while (1) {
        //接收数据
        len = recvfrom(fd, socket_data, sizeof(socket_data), MSG_DONTWAIT, (struct sockaddr *)&src_addr, &addrlen);
        //没有读取到内容
        if (len < 0) {
            //要注意这个,当没有收到内容的时候,要调用这个utp_issue_deferred_acks 方法,这是发送ack,这样对方才能知道你这边的情况,到底是丢包了还是收到了等
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                utp_issue_deferred_acks(ctx);
                break;
            }
            else
            pdie("recv");
        }

        //读取到了内容
        debug("Received %zd byte UDP packet from %s:%d\n", len, inet_ntoa(src_addr.sin_addr), ntohs(src_addr.sin_port));

        //交给utp处理
        if (! utp_process_udp(ctx, socket_data, len, (struct sockaddr *)&src_addr, addrlen))
            debug("UDP packet not handled by UTP.  Ignoring.\n");
    }
}

当utp解析客户度发送的内容之后,如果发现 客户度发送的连接为utp连接,utp库会内部自动的创建一个utpSocket,并且回调执行,通知服务端可以发送数据了
uint64 callback_on_accept(utp_callback_arguments *a)
{
    assert(!s);
    s = a->socket;
    debug("Accepted inbound socket %p\n", s);
    write_data();
    return 0;
}

当utp解析客户度发送的内容之后,如果发现 客户度发送的内容不为连接请求,那么将会回调执行  callback_on_read ,那么我们就能得到了对方发送的原始的数据了
uint64 callback_on_read(utp_callback_arguments *a)
{
    const unsigned char *p;
    ssize_t len, left;

    left = a->len;
    p = a->buf;

    //将内容写到标准的输出控制台
    while (left) {
        len = write(STDOUT_FILENO, p, left);
        left -= len;
        p += len;
        debug("Wrote %d bytes, %d left\n", len, left);
    }
    utp_read_drained(a->socket);
    return 0;
}

utp的使用总结

下面说下大概的实现细节:

首先调用下面的这个方法,得到一个UtpContext对象,这个方法只能调用一次,UtpContext对象非常重要,基本上所有的api都要使用这个对象
ctx = utp_init(2);

创建UtpSocket对象,要注意的是可以创建多个UtpSocket对象,但是UtpContext对象只能有一个,UtpContext实现是内部会有一个集合,会根据对应的ip和端口号,找到对应的UtpSocket对象
而UtpSocket对象就相当是一个连接
s = utp_create_socket(ctx);

对应的这些回调的设置也只需要设置一次就好了,即使存在多个UtpSocket对象,内部也会找到当前对应的UtpSocket对象返回给你
utp_set_callback(ctx, UTP_LOG,                &callback_log);
utp_set_callback(ctx, UTP_SENDTO,            &callback_sendto);
utp_set_callback(ctx, UTP_ON_ERROR,            &callback_on_error);
utp_set_callback(ctx, UTP_ON_STATE_CHANGE,    &callback_on_state_change);
utp_set_callback(ctx, UTP_ON_READ,            &callback_on_read);
utp_set_callback(ctx, UTP_ON_FIREWALL,        &callback_on_firewall);
utp_set_callback(ctx, UTP_ON_ACCEPT,        &callback_on_accept);

给UtpContext对象设置userData,这相当于是设置公有的配置
void*            utp_context_set_userdata        (utp_context *ctx, void *userdata);
获取到UtpConext对象设置的userData
void*            utp_context_get_userdata        (utp_context *ctx);

给当前的UtpSocket设置Userdata。这相当于是给每一个UtpSocket设置自己的UserData,这相当于是自己独有的
void*            utp_set_userdata                (utp_socket *s, void *userdata);
获取到UtpSocket对象设置的userData
void*            utp_get_userdata                (utp_socket *s);

Aria2 移植utp

当然一开始我并不知道怎么去移植utp库,我是参考transmission的做法,只是transmission使用的utp库不是官网提供的,估计是早先的版本,他有做了部分的修改,但是只要了解他为什么要这样做
就其实也没有多大的关系,所以最终我采用的是系统的utp库,并且到了最后面也没有动到这个库的一行代码,关于transmission的源码分析,前面一篇文章有介绍,下面开始移植

1.首先找到初始化UtpContext的对象,而且要考虑这个对象只能初始化一次,也即是单例的存在,最终放到了Dht的初始化部分
DHTSetup::setup(DownloadEngine* e, int family)
{
    ...
    //这里要注意,  DHTRegistry::isInitialized() 这个取的是静态对象值,所以对于多个种子来说,如果之前dht已经初始化过了,就不会再继续往下执行了,直接return 处理,那这样也可以保证我们的
    //utpContext  的创建也是只会有一份 ,也即是后面的这些对象关于Dht的创建都是只有一份的,也即是所有的种子文件公用的
    if ((family != AF_INET && family != AF_INET6) || (family == AF_INET && DHTRegistry::isInitialized()) || (family == AF_INET6 && DHTRegistry::isInitialized6())) {
        return {};
    }

    //创建Utp的 context对象,下面的这个选项是自己创建的,让他变成一个可控的形式
    if(e->getOption()->getAsBool(PREF_ENABLE_UTP))
    {
        utp_context* utpContext = utp_init(2);
        assert(utpContext);
        A2_LOG_DEBUG(fmt("UTP context create"));
        //构建一个公有的对象记得回收处理
        UtpContextUserData * utpContextUserData = new UtpContextUserData();
        //赋值UserData
        utpContextUserData->connection = connection.get();
        utpContextUserData->downloadEngine = e;
        //设置公有的回调函数执行的时候,传递的参数
        utp_context_set_userdata(utpContext,utpContextUserData);
        //保存utp Context对象,这个对象是全局的唯一的
        e->getBtRegistry()->setUtpContext(utpContext);

        //添加对应的回调函数
        utp_set_callback(utpContext, UTP_SENDTO,            &callback_sendto);
        utp_set_callback(utpContext, UTP_ON_ERROR,            &callback_on_error);
        utp_set_callback(utpContext, UTP_ON_STATE_CHANGE,    &callback_on_state_change);
        utp_set_callback(utpContext, UTP_ON_READ,            &callback_on_read);
        utp_set_callback(utpContext, UTP_ON_ACCEPT,            &callback_on_accept);
    }    
    ...
}

transmission中utp和dht都是使用同一个端口号,所以这边也是参考transmission的做法,所以在上边设置UtpContext的UserData的时候设置了一个成员为connnection对象,这个connection对象用来
后面真正的发送消息

//Utp 发送的触发函数
uint64 callback_sendto(utp_callback_arguments *a)
{
    struct sockaddr_in *sin = (struct sockaddr_in *) a->address;
    //获取到UtpContext 公有的UserData
    UtpContextUserData *contextUserData = static_cast<UtpContextUserData *>(utp_context_get_userdata(a->context));
    //发送消息
    size_t port = ntohs(sin->sin_port);
    //最终都是使用socketfd发送的,所以可以这样写
    contextUserData->connection->sendMessage(a->buf, a->len,inet_ntoa(sin->sin_addr), port);
    return 0;
}

//看下utp的消息的接收
bool DHTInteractionCommand::execute()
{
    ...
    while (1) {
      //尝试的读取是否接受到了内容
      ssize_t length = connection_->receiveMessage(data.data(), data.size(), remoteAddr, remotePort);
      //如果没有接受到内容
      if (length <= 0) {
          utp_context * utpContext = e_->getBtRegistry()->getUtpContext();
          if(utpContext)
          {
              //发送ack
              if (errno == EAGAIN || errno == EWOULDBLOCK) {
                  utp_issue_deferred_acks(utpContext);
              }
          }
          break;
      }

      if (data[0] == 'd') {  //这是dht接收的内容
        // udp tracker response does not start with 'd', so assume
        // this message belongs to DHT. nothrow.
        receiver_->receiveMessage(remoteAddr, remotePort, data.data(), length);
        //LOGD("dht receiveMessage");
      }
      else if (length >= 8 && data.data()[0] == 0 && data.data()[1] == 0 && data.data()[2] == 0 && data.data()[3] <= 3) {//udp tracker
          std::shared_ptr<UDPTrackerRequest> req;
          if (udpTrackerClient_->receiveReply(req, data.data(), length, remoteAddr, remotePort, global::wallclock()) == 0) {
              if (req->action == UDPT_ACT_ANNOUNCE) {
                  auto c = static_cast<TrackerWatcherCommand *>(req->user_data);
                  if (c) {
                      c->setStatus(Command::STATUS_ONESHOT_REALTIME);
                      e_->setNoWait(true);
                  }
              }
          }
      }
      else {
         //utp  消息的转发处理
         utp_context * utpContext = e_->getBtRegistry()->getUtpContext();
         if(utpContext)
         {
             int error;
             struct addrinfo hints,*res;
             memset(&hints, 0, sizeof(hints));
             hints.ai_family = AF_INET;
             hints.ai_socktype = SOCK_DGRAM;
             hints.ai_protocol = IPPROTO_UDP;
             //转成地址
             if((error = getaddrinfo(remoteAddr.c_str(), util::uitos(remotePort).c_str(),&hints, &res)))
             {
                 //出现异常
                 if (error) {
                     LOGE("DHTInteractionCommand::execute getaddrinfo error break");
                     throw DL_ABORT_EX(fmt(EX_RESOLVE_HOSTNAME, remoteAddr.c_str(), gai_strerror(error)));
                 }
             }

             //将接受的内容转到utp
             if (!utp_process_udp(utpContext, data.data(), length, res->ai_addr, res->ai_addrlen))
             {
                 LOGD("UDP packet not handled by UTP.  Ignoring.\n");
             }
             //释放res
             freeaddrinfo(res);
         }
      }
    }
    ...
}
在原有的消息类型的基础上,添加了一个utp类型的消息接收,这个也是参照transmission的做法

创建UtpSocket执行连接操作
bool PeerInitiateConnectionCommand::executeInternal()
{
    ...
    //每个peer都要创建对应的Utp_Sokcet 对象
    utp_context * utpContext = getDownloadEngine()->getBtRegistry()->getUtpContext();
    //当前peer 如果么有创建对应的utpSocket对象,则构建一个
    utp_socket * utpSocket = getPeer()->getUtpSocket();
    if(utpContext && !utpSocket)
    {
        A2_LOG_DEBUG(fmt(MSG_CONNECTING_TO_SERVER, getCuid(), getPeer()->getIPAddress().c_str(), getPeer()->getPort()));
        LOGD(MSG_CONNECTING_TO_SERVER, getCuid(), getPeer()->getIPAddress().c_str(), getPeer()->getPort());
        int error;
        struct addrinfo hints,*res;
        memset(&hints, 0, sizeof(hints));
        hints.ai_family = AF_INET;
        hints.ai_socktype = SOCK_DGRAM;
        hints.ai_protocol = IPPROTO_UDP;
        //创建utp
        utpSocket = utp_create_socket(utpContext);
        assert(utpSocket);
        A2_LOG_DEBUG(fmt("CUID#%" PRId64 " UTP socket create Success",getCuid()));
        LOGD("CUID#%" PRId64 " UTP socket create Success",getCuid());

        //保存当前种子对象 对应的utpSocket对象
        getPeer()->setUtpSocket(utpSocket);
        //转成地址
        if((error = getaddrinfo(getPeer()->getIPAddress().c_str(), util::uitos(getPeer()->getPort()).c_str(),&hints, &res)))
        {
            //出现异常,抛出
            if (error) {
                A2_LOG_DEBUG(fmt("utp_connect getAddrinof error"));
                LOGD("utp_connect getAddrinof error");
                throw DL_ABORT_EX(fmt(EX_RESOLVE_HOSTNAME, getPeer()->getIPAddress().c_str(), gai_strerror(error)));
            }
        }

        if(utpSocket)
        {
            //这里也会重置掉之前的设置的userData
            utp_set_userdata(utpSocket,this);
        }
        //执行utp的连接
        utp_connect(utpSocket, res->ai_addr, res->ai_addrlen);
        //释放res
        freeaddrinfo(res);
    }
    ...
}

这里要注意对于原有的Aria2的源码来说,都是使用command来执行对应的类的,对应原有的tcp的化,他每个都会创建一个连接,但是对于utp的化,我们因为是在同一个地方执行数据的发送,同一个对方执行数据的接收,所以我们就要设计到对应的分发操作,所以设置了 utp_set_userdata(utpSocket,this); 这样当utp接收到了数据,触发callback_on_read 的时候,就可以分发数据了

//utp 将接受的数据包,处理之后执行的回调函数
uint64 callback_on_read(utp_callback_arguments *a)
{
    //LOGD("utp callback_on_read len %d",a->len);
    //对应的utpSocket对象
    utp_socket * utpSocket = a->socket;
    //获取到UtpSocket对应的UserData对象
    void* utp_socket_userData = utp_get_userdata(utpSocket);
    //我们在超时的时候,手动的置为了NULL
    if(utp_socket_userData)
    {
        PeerAbstractCommand * peerAbstractCommand = static_cast<PeerAbstractCommand *>(utp_socket_userData);
        if(peerAbstractCommand)
        {
            //对应的子类实现 分发消息
            peerAbstractCommand->dispatchUtpMessage(a->buf,a->len);
        }
    }
    utp_read_drained(a->socket);
    return 0;
}

由于这些command都是PeerAbstractCommand的子类,所以子类只要重写了这个方法,就能将消息转发过来,比如
//utp分发消息
void PeerReceiveHandshakeCommand::dispatchUtpMessage(const unsigned char *buff,size_t buffLen)
{
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " PeerReceiveHandshakeCommand dispatchUtpMessage buffLen %d",getCuid(),buffLen));
    LOGD("CUID#%" PRId64 " PeerReceiveHandshakeCommand dispatchUtpMessage buffLen %d",getCuid(),buffLen);
    //第三个参数默认为true
    peerConnection_->PeerUtpRecvDataProcess(buff,buffLen);
}

我们采用的一个思想就是尽量的少改动他tcp原有的逻辑,让utp也走tcp一样的逻辑,所以对应utp来说的,当把消息接受过来之后,我们就将消息的内容填充到对应的buff中,剩余的逻辑就走他原本tcp
的逻辑,我们可以实时的改变对应的UtpSocket对象的UserData,然后将对应的数据转到对应的对象来处理

utp的连接以及对应的超时重试处理

如果当没有收到utp的状态变成UTP_STATE_CONNECT时,就执行数据的发送,此时utp的处理是直接抛去这个数据,参照他的demo也可知道,此时正确的做法是应该等他处于连接状态才能允许发送数据,还有后面由于打洞的需要,我们要将utp能实现超时重连的机制,增大打洞的机制,这些操作,我们都可以直接在第一个Command 也即是PeerInitiateConnectionCommand 中处理

我们在utp状态改变回调的时候,通知这个对象,让他做相应的原本的逻辑
//utp 状态的变化回调
uint64 callback_on_state_change(utp_callback_arguments *a)
{
  ...
  case UTP_STATE_CONNECT://通知utp连接上了,可以往下走了
        if(utp_socket_userData)
        {
            PeerAbstractCommand * peerAbstractCommand = static_cast<PeerAbstractCommand *>(utp_socket_userData);
            if(peerAbstractCommand)
            {
                peerAbstractCommand->utpConnected();
            }
        }
  break;
  case UTP_STATE_EOF://收到了对方的断开连接的消息,比如下载完成的时候,或者其他的错误的时候,此时应该要关闭掉utp的连接,
        A2_LOG_DEBUG(fmt("Received EOF from socket"));
        if(utp_socket_userData)
        {
            PeerAbstractCommand * peerAbstractCommand = static_cast<PeerAbstractCommand *>(utp_socket_userData);
            if(peerAbstractCommand)
            {
                //这里简单,直接传递一个utp错误
                peerAbstractCommand->setUtpErrorCode(1);
            }
        }
   break;
     ...
}

对应utp的连接通知,只有第一个command才需要知道,对应后面的command都是在当前处于连接状态才往后面走,所以在PeerAbstractCommand中可以添加默认的虚函数实现 
//新增utp连接上的回调,默认不处理
virtual void utpConnected(){};

然后让PeerInitiateConnectionCommand 实现这个方法,这样消息就能转发到这里
//第一个command应该要确保utp连接上之后,才让继续往下走,如果没有收到当前的回调函数,如果采用utp的化,应该要继续重试连接,直到peer的超时为止
void PeerInitiateConnectionCommand::utpConnected()
{
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - utpConnected",getCuid()));
    LOGD("CUID#%" PRId64 " - utpConnected",getCuid());
    //返回true,代表当前这个command可以回收内存了
    isUtpConnected = true;
    //获取到当前peer保存的utpSocket对象,设置对应的SocketCore对象,然后继续往下走
    utp_socket * utpSocket = getPeer()->getUtpSocket();
    UtpContextUserData *contextUserData = static_cast<UtpContextUserData *>(utp_context_get_userdata(getDownloadEngine()->getBtRegistry()->getUtpContext()));
    //为每一个peer的连接都创建对应的SocketCore对象,但是由于是utp,所以这里的fd要共用一个 socketCore = contextUserData->connection->getSocket();
    auto socketCore = std::make_shared<SocketCore>(contextUserData->connection->getSocket()->getSockfd(),contextUserData->connection->getSocket()->getSocketType());
    //赋值utpSocket
    socketCore->setUtpSocket(utpSocket);
    //mseHandshakeEnabled_ 默认为true
    if (mseHandshakeEnabled_) {
        //构建一个InitiatorMSEHandshakeCommand 对象,调用对应的构造函数完成初始化,注意这里传递了socket对象 getSocket为对应的tcp 对象
        auto c = make_unique<InitiatorMSEHandshakeCommand>(getCuid(), requestGroup_, getPeer(), getDownloadEngine(), btRuntime_, socketCore);
        c->setPeerStorage(peerStorage_);
        c->setPieceStorage(pieceStorage_);
        getDownloadEngine()->addCommand(std::move(c));
    }
    else {
        //为false代表当前peer发送的交换秘钥的消息没有回应,直接走这里
        A2_LOG_DEBUG(fmt("PeerInitiateConnectionCommand mseHandshakeEnabled_ false"));
        LOGD("PeerInitiateConnectionCommand mseHandshakeEnabled_ false");
        //状态为INITIATOR_SEND_HANDSHAKE 代表客户端
        auto c = make_unique<PeerInteractionCommand>(getCuid(), requestGroup_, getPeer(), getDownloadEngine(), btRuntime_, pieceStorage_, peerStorage_, socketCore,
                                                     PeerInteractionCommand::INITIATOR_SEND_HANDSHAKE);
        getDownloadEngine()->addCommand(std::move(c));
    }
}

当前这个command收到了utp连接的状态才会往下执行构建其他的command执行剩余的操作,还有一点就是由于我们utp跟dht都是采用了同一个socket的fd,做到了消息的统一发送,消息的统一接受
但是每一个command内部都有自己的数据,如果不区分的化,那么数据就混在一起了,对应的也即是SocketCore 对象,我们这里采用的是保持他原本的逻辑只是我们构建SocketCore的时候,都是使用
同一个socket的fd,这样每一个command都有自己的socketCore,就可以将数据区分开来


//为每一个peer的连接都创建对应的SocketCore对象,但是由于是utp,所以这里的fd要共用一个 socketCore = contextUserData->connection->getSocket();
auto socketCore = std::make_shared<SocketCore>(contextUserData->connection->getSocket()->getSockfd(),contextUserData->connection->getSocket()->getSocketType());

当然对应SocketCore原有的写数据的方法,要换成我们utp的写方法,由于我们前面处理了utp处于连接状态的问题,所以后面就可以随便的发送数据,不用再考虑这个问题
ssize_t SocketCore::writeVector(a2iovec* iov, size_t iovcnt)
{
  ssize_t ret = 0;
  wantRead_ = false;
  wantWrite_ = false;
  if (!secure_) {
#ifdef __MINGW32__
    DWORD nsent;
    int rv = WSASend(sockfd_, iov, iovcnt, &nsent, 0, 0, 0);
    if (rv == 0) {
      ret = nsent;
    }
    else {
      ret = -1;
    }
#else  // !__MINGW32__
    //转成utp
    if(sockType_ == SOCK_DGRAM && utpSocket_)
    {
        //这里要增加返回值,要不然外层会抛出异常,0表示套接字不再可写,这个要注意utp的这个状态不是错误
        ret = utp_writev(utpSocket_,(struct utp_iovec *)iov,iovcnt);
        int errNum = SOCKET_ERRNO;
        if (ret == -1) {
            if (!A2_WOULDBLOCK(errNum)) {
                LOGD(EX_SOCKET_SEND, errorMsg(errNum).c_str());
                throw DL_RETRY_EX(fmt(EX_SOCKET_SEND, errorMsg(errNum).c_str()));
            }
            //出现了错误,如果为-1的化
            wantWrite_ = true;
            LOGD("utp_writev return -1");
            ret = 0;
        }
    }else{
        while ((ret = writev(sockfd_, iov, iovcnt)) == -1 && SOCKET_ERRNO == A2_EINTR);
#endif // !__MINGW32__
        int errNum = SOCKET_ERRNO;
        if (ret == -1) {
            if (!A2_WOULDBLOCK(errNum)) {
                throw DL_RETRY_EX(fmt(EX_SOCKET_SEND, errorMsg(errNum).c_str()));
            }
            wantWrite_ = true;
            ret = 0;
        }
    }
  }
  else {
    // For SSL/TLS, we could not use writev, so just iterate vector
    // and write the data in normal way.
    for (size_t i = 0; i < iovcnt; ++i) {
      ssize_t rv = writeData(iov[i].A2IOVEC_BASE, iov[i].A2IOVEC_LEN);
      if (rv == 0) {
        break;
      }
      ret += rv;
    }
  }
  return ret;
}

当然对应SocketCore原本的逻辑,当这个对象被销毁的时候,会关闭掉这个socket的fd,我们由于是共用的,不能关闭

//关闭连接,这里要处理,由于当前的utp共有一个fd,但是含有多个SocketCore对象,所以对应的虚构函数,就不能关闭掉这个fd,要不然会导致所有的都会失败
//最终关闭的地方应该交给最原始的对象DHTConnectionImpl 这个对象中的socketCore来关闭
void SocketCore::closeConnection()
{
  ...
  //tcp 不干预处理
  if(isNeedCloseFd || sockType_ != SOCK_DGRAM)
  {
    if (sockfd_ != (sock_t)-1) {
        shutdown(sockfd_, SHUT_WR);
        CLOSE(sockfd_);
        sockfd_ = -1;
        LOGD("SocketCore isNeedCloseFd shutDown sockfd");
    }
  }
}

而utp的超时回调会在错误的回调中告知,相应的我们也要在这个方法中告知对应的消息
//utp 错误回调,超时也是一个错误回调,所以这里要区分
uint64 callback_on_error(utp_callback_arguments *a)
{
    utp_socket * utpSocket = a->socket;
    void* utp_socket_userData = utp_get_userdata(utpSocket);
    //我们在超时的时候,手动的置为了NULL
    if(utp_socket_userData)
    {
        PeerAbstractCommand * peerAbstractCommand = static_cast<PeerAbstractCommand *>(utp_socket_userData);
        if(peerAbstractCommand)
        {
            A2_LOG_DEBUG(fmt("CUID#%" PRId64 " -- Error: %s Close Utp ", peerAbstractCommand->getCuid(),utp_error_code_names[a->error_code]));
            //设置对应的command为禁止写的,对于握手的command,有可能出现第一步utp的连接件就失败了,对于现有的逻辑握手的消息只有等utp连接上才会执行发送数据,导致
            //缓冲区中有数据,让command一直的在执行,不会销毁这个连接,而其实这边会收到当前utp的连接错误,如果对方不支持utp的化,那么就要这边来触发对应的command超时
            peerAbstractCommand->setUtpErrorCode(a->error_code);
        }
    }
    return 0;
}

对应utp的超时,其实也只要在第一个command也即是PeerInitateConnectionCommand中处理就好了,这个超时的处理是写在父类的 PeerAbstractCommand中的,下面新增这个方法
void PeerAbstractCommand::setUtpErrorCode(int utpErrorCode)
{
    A2_LOG_DEBUG(fmt("PeerAbstractCommand setError Code %d",utpErrorCode));
    utpErrorCode_ = utpErrorCode;
}

这样当这个command执行的时候,就要事实的去判断是否处于超时,是否有必要重新连接
bool PeerAbstractCommand::execute()
{
  //exitBeforeExecute() 由具体的子类来实现, 可以用来检查种子是否下载完成,如果完成,回收对象使用,btRuntime_->isHalt(); 默认为false,当下载完成的时候由SeedCheckCommand::execute()触发,设置为true
  if (exitBeforeExecute()) {
    //下载完成的时候也会触发onAbort函数
    onAbort();
    return true;
  }
  try {
    //readEventEnabled() 是检查socketfd 的是否可以读的结果       writeEventEnabled() 是检查socketfd 是否可以写的结果
    if (noCheck_ || (checkSocketIsReadable_ && readEventEnabled()) || (checkSocketIsWritable_ && writeEventEnabled()) || hupEventEnabled()) {
       checkPoint_ = global::wallclock();
    }
    else if (errorEventEnabled()) {
      A2_LOG_DEBUG(fmt(MSG_NETWORK_PROBLEM, socket_->getSocketError().c_str()));
      throw DL_ABORT_EX(fmt(MSG_NETWORK_PROBLEM, socket_->getSocketError().c_str()));
    }
    //检查是否超时了,如果超时了抛出异常
    if (checkPoint_.difference(global::wallclock()) >= timeout_) {
       LOGD(EX_TIME_OUT);
       throw DL_ABORT_EX(EX_TIME_OUT);
    }

    //如果当前utp的错误码为超时
    if(utpErrorCode_ == UTP_ETIMEDOUT)
    {
       A2_LOG_DEBUG(fmt("CUID#%" PRId64 " utp timeout error retry Connect",getCuid()));
       //utp超时了,当peer还没有超时,utp 应该再次触发连接,这个utp的超时重新连接应该发生在第一个command上,默认为空实现
       bool isNeedProcess = onUtpTimeOutNextConnect();
       if(isNeedProcess)
       {
           A2_LOG_DEBUG(fmt("CUID#%" PRId64 " utp timeout error recycle data",getCuid()));
           throw DL_ABORT_EX(EX_TIME_OUT);
       }
    } else if(utpErrorCode_ != -1){//默认为-1
        //如果是其他的错误,就直接抛出异常终止连接
        A2_LOG_DEBUG(fmt("CUID#%" PRId64 " utp other error recycle data",getCuid()));
        throw DL_ABORT_EX(EX_TIME_OUT);
    }
    //变为默认值
    utpErrorCode_ = -1;
    //执行子类的executeInternal函数 ,相应的也即是执行每一个commamand的execute方法
    return executeInternal();
  }
  catch (DownloadFailureException& err) {//下载异常会导致返回true,这样当前对应的对象就会被释放掉了
    //A2_LOG_ERROR_EX(EX_DOWNLOAD_ABORTED, err);
    onAbort();
    onFailure(err);
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " PeerAbstractCommand Download Failure return true ",getCuid()));
    return true;
  }
  catch (RecoverableException& err) {//超时异常的响应
    A2_LOG_DEBUG_EX(fmt(MSG_TORRENT_DOWNLOAD_ABORTED, getCuid()), err);
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - PeerAbstractCommand 超时或者出现了异常了或者下载完成了..... ",getCuid()));
    //LOGD(MSG_PEER_BANNED, getCuid(), peer_->getIPAddress().c_str(), peer_->getPort());
    onAbort();
    //子类实现
    return prepareForNextPeer(0);
  }
}

我们的utp超时是跟peer超时关联在一起的,也即是当peer还没有超时,而utp超时的化,此时utp应该要重试连接,所以我们把超时的逻辑放到了peer检查超时的后面,这样对应peer的超时我们就不要处理任何的逻辑,走原本的逻辑就好,超时的时候会触发onUtpTimeOutNextConnect 操作,这个函数默认是返回true的,返回true的化,就代表不用执行重新连接

//当前收到了utp的超时回调,默认返回true,代表是否需要中断连接,执行销毁操作
bool PeerAbstractCommand::onUtpTimeOutNextConnect()
{
    return true;
}

除了我们前面所的第一个command  PeerInitiateConnectionCommand,要重写这个方法,实现重新连接,由于返回了false,就不会抛出异常
//utp的超时重连实现,只针对第一个command才要增加,如果utp连接上了,后面就不用管了
bool PeerInitiateConnectionCommand::onUtpTimeOutNextConnect()
{
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - onUtpTimeOutNextConnect",getCuid()));
    LOGD("CUID#%" PRId64 " - onUtpTimeOutNextConnect",getCuid());
    //显示的调用父类的函数,完成utp销毁操作
    PeerAbstractCommand::onAbort();
    return false;
}

PeerAbstractCommand::onAbort(); 用来统一的销毁,所以写在父类 异常终止的时候会触发这个方法,或者下载完成也会触发这个方法,在这里执行对应的utp的清理操作
void PeerAbstractCommand::onAbort()
{
    //如果这里返回了true,DownLoadEngine就会执行释放当前的对象,那么我们绑定的userData就为非法的所以再返回true之前,应该手动的清理userData
    utp_socket * utpSocket = getPeer()->getUtpSocket();
    if(utpSocket)
    {
        //我们在清除utpSocket的UserData的时候,要清除掉用户的数据,还有这里要手动的释放关闭掉utp的连接操作,要不然可能会一直收到对方peer的信息,但是我们是不需要的,比如收到了对方peer数据不完整的时候,此时
        //我们是需要断开的,
        //utp的关闭操作处理,由于有些地方我们手动的调用了close操作,所以这里可能会导致二次close会出现异常,下面的代码来自utp
        if(utp_get_status(utpSocket) != 0 && utp_get_status(utpSocket) != 7)
        {
            utp_close(utpSocket);
        }
        utp_set_userdata(utpSocket,NULL);
        //清除peer的 utpSocket对象引用
        getPeer()->clearUtpSocket();
    }
    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " -- PeerAbstractCommand::onAbort clear utp",getCuid()));
}

//command命令的执行,由于在超时的方法中清空掉了内容所以 getPeer()->getUtpSocket();就为空,所以会再创建一个utpSocket执行连接
bool PeerInitiateConnectionCommand::executeInternal()
{
   if(requestGroup_->getOption()->getAsBool(PREF_ENABLE_UTP))//utp 的处理,默认为true
   {
       //如果当前utp连接上了,可以销毁这个对象了
       if(isUtpConnected)
       {
           return true;
       }
       //每个peer都要创建对应的Utp_Sokcet 对象
       utp_context * utpContext = getDownloadEngine()->getBtRegistry()->getUtpContext();
       //当前peer 如果么有创建对应的utpSocket对象,则构建一个
       utp_socket * utpSocket = getPeer()->getUtpSocket();
       if(utpContext && !utpSocket)
       {
           A2_LOG_DEBUG(fmt(MSG_CONNECTING_TO_SERVER, getCuid(), getPeer()->getIPAddress().c_str(), getPeer()->getPort()));
           LOGD(MSG_CONNECTING_TO_SERVER, getCuid(), getPeer()->getIPAddress().c_str(), getPeer()->getPort());
           int error;
           struct addrinfo hints,*res;
           memset(&hints, 0, sizeof(hints));
           hints.ai_family = AF_INET;
           hints.ai_socktype = SOCK_DGRAM;
           hints.ai_protocol = IPPROTO_UDP;
           //创建utp
           utpSocket = utp_create_socket(utpContext);
           assert(utpSocket);
           A2_LOG_DEBUG(fmt("CUID#%" PRId64 " UTP socket create Success",getCuid()));
           LOGD("CUID#%" PRId64 " UTP socket create Success",getCuid());

           //保存当前种子对象 对应的utpSocket对象
           getPeer()->setUtpSocket(utpSocket);
           //转成地址
           if((error = getaddrinfo(getPeer()->getIPAddress().c_str(), util::uitos(getPeer()->getPort()).c_str(),&hints, &res)))
           {
               //出现异常,抛出
               if (error) {
                   A2_LOG_DEBUG(fmt("utp_connect getAddrinof error"));
                   LOGD("utp_connect getAddrinof error");
                   throw DL_ABORT_EX(fmt(EX_RESOLVE_HOSTNAME, getPeer()->getIPAddress().c_str(), gai_strerror(error)));
               }
           }

           if(utpSocket)
           {
               //这里也会重置掉之前的设置的userData
               utp_set_userdata(utpSocket,this);
           }
           //执行utp的连接
           utp_connect(utpSocket, res->ai_addr, res->ai_addrlen);
           //释放res
           freeaddrinfo(res);
       }
       //添加自己到commands集合中 ,下次继续的检查
       addCommandSelf();
       return false;
   }
}

//服务端收到了utp的消息,服务端进行后续的操作,比如初始化 握手对象等,这里就能看出来为什么我们将engine存储在userData中
uint64 callback_on_accept(utp_callback_arguments *a)
{
    struct sockaddr_in *sin = (struct sockaddr_in *) a->address;
    //构建一个Peer 对象,调用对应的构造函数完成初始化,注意这里的最后一个参数为true
    size_t port = ntohs(sin->sin_port);
    auto peer = std::make_shared<Peer>(inet_ntoa(sin->sin_addr), port, true);

    //获取到utpContext对象中的userData
    UtpContextUserData *contextUserData = static_cast<UtpContextUserData *>(utp_context_get_userdata(a->context));
    DownloadEngine * engine = contextUserData->downloadEngine;
    cuid_t cuid = engine->newCUID();

    //为每一个peer的连接都创建对应的SocketCore对象,但是由于是utp,所以这里的fd要共用一个 socketCore = contextUserData->connection->getSocket();
    //注意这里不能使用contextUserData中的connection中的socketCore,那个SocketCore是公有的,只负责utp的发数据,其他的都应该重新的构建一个SokcetCore,只是共用一个fd
    auto socketCore = std::make_shared<SocketCore>(contextUserData->connection->getSocket()->getSockfd(),contextUserData->connection->getSocket()->getSocketType());
    //保存当前的UtpSockt,方便后面获取到更改对应的userData
    peer->setUtpSocket(a->socket);

    LOGD("CUID#%" PRId64 " service receive  the connection from %s:%u.", cuid,peer->getIPAddress().c_str(), peer->getPort());

    //设置当前的socket对应的utp_socket对象,后面要用到
    socketCore->setUtpSocket(a->socket);
    //构建服务端的握手对象
    auto c = make_unique<ReceiverMSEHandshakeCommand>(cuid, peer, engine, socketCore);
    //服务端 构建一个ReceiverMSEHandshakeCommand 对象,调用对应的构造函数完成初始化,也即是服务端要初始化对应的捂手对象等,用于和客户端的连接,这里还传递了当前连接创建的peerSocket对象
    engine->addCommand(std::move(c));

    A2_LOG_DEBUG(fmt("CUID#%" PRId64 " service receive  the connection from %s:%u.", cuid,peer->getIPAddress().c_str(), peer->getPort()));
    return 0;
}

下面再说几个细节问题,对应buff的接受,不是直接将内容接受过来,就好了,由于采用了utp 他会将多个数据包同时返回给你,所以对于一些细节就要注意:比如
//utp 接受分发消息
void InitiatorMSEHandshakeCommand::dispatchUtpMessage(const unsigned char *buff,size_t buffLen)
{
    //这里要注意,如果处于当前的这个状态的化,对于原有的tcp接收数据,socket_->readData(rbuf_ + rbufLength_, len); 这里是有最大的长度限制的,如果传递的数据大于当前可以接收的buff
    //会留到下一次再来接受,所以我们采用utp也要有这样的逻辑,要不然会出现的偶现失败  !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    mseHandshake_->utpRecvDataProcess(buff,buffLen);
}

最后说下Utp的销毁操作,由于BtRegister是跟DownloadEngine共生死的,而我们的UtpContext也是类似的,所以将UtpContext放到了BtRegister中,所以在此处来执行销毁的操作
//增加一个虚构函数,移除utpContext对象
BtRegistry::~BtRegistry()
{
    if(utpContext_)
    {
        //释放之前,先释放我们设置的对象也即是 UtpContextUserData
        UtpContextUserData * userData = static_cast<UtpContextUserData *>(utp_context_get_userdata(utpContext_));
        if(userData)
        {
            delete userData;
            utp_context_set_userdata(utpContext_,NULL);
        }
        utp_destroy(utpContext_);
        utpContext_ = NULL;
        LOGD("BtRegistry 虚构函数执行,清理UtpContext 以及 UtpContextUserData内存");
    }
}

文章作者: AheadSnail
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 AheadSnail !
评论
  目录