概述
OkHttp个人理解
简介
OkHttp做为一种很牛逼的网络框架,目前使用的人数越来越多,在github上面项目的start数也达到了28k的存在,可谓是一种很牛逼的网络框架,所以做为一个小菜鸡的我,就有必要的去学习
下人家的牛逼所在,下面分几个点来分析
1.OkHttp 的简单使用
2.OkHttp 提交请求
3.OkHttp 传说中的责任链的模式
OkHttp的简单使用
我们可以在项目的build.gradle 中简单的添加一句 implementation 'com.squareup.okhttp3:okhttp:3.10.0' 就可以将okhttp包含进来
当然要网络请求,还要配置相应的网络权限
下面是简单的提交一个同步的请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
Response response = null;
try
{
response = client.newCall(request).execute();
response.body().string();
}
catch (IOException e)
{
e.printStackTrace();
}
提交一个异步的请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
Response response = null;
try
{
client.newCall(request).enqueue(new Callback()
{
@Override
public void onFailure(Call call, IOException e)
{
}
@Override
public void onResponse(Call call, Response response) throws IOException
{
response.body().string();
}
});
}
catch (Exception e)
{
e.printStackTrace();
}
OkHttp提交任务
1.创建OkHttpClient对象
OkHttpClient client = new OkHttpClient();
public OkHttpClient() {
this(new Builder());
}
原来是方便我们使用,提供了一个“快捷操作”,全部使用了默认的配置。OkHttpClient.Builder类成员很多,后面我们再慢慢分析,这里先暂时略过:
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
}
2.创建Request对象
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
首先是Builder 构造函数的执行
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
创建Requeest内部也是通过Build模式来解析传递进来的url来构建一个请求对象
3.发起 HTTP 请求
Response response = client.newCall(request).execute();
response.body().string();
OkHttpClient实现了Call.Factory,负责根据请求创建新的Call。
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
....
}
那我们现在就来看看它是如何创建 Call 的:
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
如此看来功劳全在RealCall类了,下面我们一边分析同步网络请求的过程,一边了解RealCall的具体内容。 首先来看RealCall 的构造函数的创建
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
同步请求的执行 接着分析 RealCall#execute:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed"); // (1)
executed = true;
}
try {
client.dispatcher().executed(this); // (2)
Response result = getResponseWithInterceptorChain(); // (3)
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this); // (4)
}
}
这里我们做了 4 件事:
检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用call#clone方法进行克隆。
利用client.dispatcher().executed(this)来进行实际执行dispatcher是刚才看到的OkHttpClient.Builder的成员之一,它的文档说自己是异步 HTTP 请求的执行策略,现在看来,
同步请求它也有掺和。调用getResponseWithInterceptorChain()函数获取 HTTP 返回结果,从函数名可以看出,这一步还会进行一系列“拦截”操作。这个会在后面进行分析
最后还要通知dispatcher自己已经执行完毕。
首先分析 client.dispatcher().executed(this) 实现,下面是对应的函数的实现
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
而runningSyncCalls 的定义为 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); 代表一个正在进行请求的同步队列,
而真正的执行网络请求的部分是在这部分的代码 getResponseWithInterceptorChain();所以对应同步的请求,这里只是创建一个RealCall然后添加到runningAsyncCalls 队列中
这里接下来分析 对于异步请求的添加
client.newCall(request).enqueue(new Callback());函数的实现为
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
首先执行 new AsyncCall(responseCallback) 构建一个一个AsyncCall对象,AsyncCall本质是实现了Runnbale接口,同时将回调函数保存到responseCallback 成员变量中
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
...
}
接着执行 client.dispatcher()会执行到默认的Dispatcher类,也即是Dispatcher类中的对应方法
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
runningAsyncCalls 定义为
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
上面的判断也即是如果当前正在运行的异步队列的大小,小于最大的同时请求的大小,对于同一个host的请求不能超过5个,才会将当前的请求,添加到runningAsyncCalls,
否则会添加到readyAsyncCalls 也即是一个异步的等待队列,当请求添加到了正在运行的异步队列的时候,执行 executorService().execute(call);
executorService()函数的实现:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
大致的理解下这个线程池的配置,这个线程池核心线程数为0,但是最大的线程数量没有限制,60, TimeUnit.SECONDS 代表这些线程 空闲如果超过了60秒,就会被回收掉,
这里的队列为 new SynchronousQueue<Runnable>() 内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间。也即是只要提交一个任务就会执行,不会添加到队列中
而由于当前的call 为 AsyncCall 所以当这个任务执行的时候,会执行对应的execute函数,下面是这个函数的关键实现
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
...
}
...
}
可以看出,最后执行的还是getResponseWithInterceptorChain()函数,这个才会真正的执行网络的请求等操作,对于同步跟异步的区别就是,同步直接在当前线程中跑这个函数,
异步的化,会通过任务的方式提交给线程池执行,当这个任务执行的时候,再来执行这个函数,这就是唯一的区别
所以对于异步的请求的添加也即是构建一个Runnable对象,然后通过线程池的形式,execute(call)的形式来添加任务,在添加任务的时候,会判断当前是否允许立刻执行,
如果不允许就放在等待队列中,如果当前的正在运行的请求队列执行完毕了,怎么样将正在等待的队列转移到执行的队列,会在分析完了责任链模式之后分析
OkHttp责任链模式(真正执行请求的地方)
前面分析了执行同步请求,跟执行异步请求的区别,最终都会执行execute函数,下面是这俩种exeuute函数的实现
执行异步请求的方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
执行同步请求的方法
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
可以看到对于异步还是同步的请求,关键都是通过 Response result = getResponseWithInterceptorChain(); 来获取到Resonse结果。下面是函数的具体实现:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
首先是构建一个集合用来存储Interceptor对象 List<Interceptor> interceptors = new ArrayList<>();,对于Interceptor本质是是一个接口,接口的定义为
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
}
接下来就是往集合中添加元素了,第一个 interceptors.addAll(client.interceptors()); 获取的是用户自定义的拦截器集合,Okhttp是允许自定义拦截器的,
接下来添加的是Okhttp内部的拦截器对象对应的拦截器功能为:
负责失败重试以及重定向的RetryAndFollowUpInterceptor;
负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的BridgeInterceptor;
负责读取缓存直接返回、更新缓存的CacheInterceptor;
负责和服务器建立连接的ConnectInterceptor;
配置OkHttpClient时设置的networkInterceptors;
负责向服务器发送请求数据、从服务器读取响应数据CallServerInterceptor
添加完之后,然后构建一个RealInterceptorChain对象,这里有几个是比较关键的参数 比如第一个interceptors 代表拦截器的集合,index 代表当前拦截器的索引
也即是当前正要处理的拦截器相应的在集合中的索引,对应的构造方法的实现为:
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
this.call = call;
this.eventListener = eventListener;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
然后执行 chain.proceed(originalRequest);定义的函数实现为
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError(); index代表当前正在执行的拦截器在集合中的索引,所以这个所以不能大于拦截器集合的大小值
...
// Call the next interceptor in the chain. 这里又构建一个RealInterceptorChain 对象,要注意这里的索引为index+1,而且其他的参数都为同一个对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);然后从集合中获取到当前所有的Interceptor对象
Response response = interceptor.intercept(next);//然后执行对象的对应的方法,同时将刚构建的RealInterceptorChain对象传递进去,注意这里不会往下执行,跳到了另一边了
...
return response;
}
假设我们这里采用默认的配置,也即是我上面写的那样,就是没有配置自定义的拦截器的,所以第一个获取到的拦截器为retryAndFollowUpInterceptor 对象,代表重试的拦截器对象,
所以执行对应的方法
@Override public Response intercept(Chain chain) throws IOException {
//获取到Chain中的Request对象
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//获取到Chain中的Call对象
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//构建一个StreamAllocation对象
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
//要注意这里是while(true)
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//又调用了proceed函数
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
...
}
....
//当重试的此时大于了最大的限制之后,就会抛出一个异常,这样while(true)就会中断,支持重试终止
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
...
request = followUp;
priorResponse = response;
}
}
当调用到 response = realChain.proceed(request, streamAllocation, null, null);的时候,此时,第二个参数streamAllocation有值了,第一次的时候,后面的三个参数都为null
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError(); index代表当前正在执行的拦截器在集合中的索引,所以这个所以不能大于拦截器集合的大小值
...
// Call the next interceptor in the chain. 这里又构建一个RealInterceptorChain 对象,要注意这里的索引为index+1,而且其他的参数都为同一个对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);然后从集合中获取到当前所有的Interceptor对象
Response response = interceptor.intercept(next);//然后执行对象的对应的方法,同时将刚构建的RealInterceptorChain对象传递进去,注意这里不会往下执行,跳到了另一边了
...
return response;
}
所以这次获取的Interceptor对象为BridgeInterceptor 对象,所以执行对应的方法 ,这个拦截器主要进行的操作就是用来拼接请求的
@Override public Response intercept(Chain chain) throws IOException {
//获取到chain中的Request对象
Request userRequest = chain.request();
//构建一个Request对象
Request.Builder requestBuilder = userRequest.newBuilder();
//拼接一些参数
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//又执行到了chain中的proceed函数,继而转到对应的方法实现
Response networkResponse = chain.proceed(requestBuilder.build());
//下面不会再往下面执行,要等待上面的方式执行完毕之后,才会往下执行,也即是后面是请求结果的处理了
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
执行到 chain.proceed(requestBuilder.build());
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
构建一个Request对象,存储Http的请求地址,请求头,请求体等
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tag = builder.tag != null ? builder.tag : this;
}
之后继续执行 chain.proceed()函数
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
只是这个时候执行到了这里的时候requet已经构建完成了,第二个参数也因为重新构建RealInterceptorChain的时候,传递了进来,所以得以保存
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError(); index代表当前正在执行的拦截器在集合中的索引,所以这个所以不能大于拦截器集合的大小值
...
// Call the next interceptor in the chain. 这里又构建一个RealInterceptorChain 对象,要注意这里的索引为index+1,而且其他的参数都为同一个对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);然后从集合中获取到当前所有的Interceptor对象
Response response = interceptor.intercept(next);//然后执行对象的对应的方法,同时将刚构建的RealInterceptorChain对象传递进去,注意这里不会往下执行,跳到了另一边了
...
return response;
}
当执行interceptors.get(index)的时候,这里获取到的Interceptor对象为CacheInterceptor 对象,执行对应的方法
@Override public Response intercept(Chain chain) throws IOException {
...
// If we don't need the network, we're done. 如果有缓存,就直接从缓存中获取
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//否则执行网络请求
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
...
}
这里假设没有缓存,也即是第一次执行的时候,所以会执行 networkResponse = chain.proceed(networkRequest)
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
这里将原先的Request变成了newWorkRequest,然后存储到request参数中,至于其他的参数也有保存起来在构建RealInterceptorChain的时候
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError(); index代表当前正在执行的拦截器在集合中的索引,所以这个所以不能大于拦截器集合的大小值
...
// Call the next interceptor in the chain. 这里又构建一个RealInterceptorChain 对象,要注意这里的索引为index+1,而且其他的参数都为同一个对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);然后从集合中获取到当前所有的Interceptor对象
Response response = interceptor.intercept(next);//然后执行对象的对应的方法,同时将刚构建的RealInterceptorChain对象传递进去,注意这里不会往下执行,跳到了另一边了
...
return response;
}
当执行interceptors.get(index)的时候,这里获取到的Interceptor对象为ConnectInterceptor 对象,执行对应的方法
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//获取到Request对象,也即是获取到第三个拦截器中传递的Request对象
Request request = realChain.request();
//获取到StreamAllocation 对象,也即是第一个拦截器中构建的对象
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//得到一个HttpCodec对象
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//得到RealConnection对象
RealConnection connection = streamAllocation.connection();
//执行下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
首先分析 HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);的实现
public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//RealConnection 对Socket连接的封装
//TPC/IP协议是传输层协议,主要解决数据如何在网络中传输
//Socket则是对TCP/IP协议的封装和应用(程序员层面上)。
//Http 应用层协议,解决如何包装数据
//使用Http协议封装数据,借助TCP/IP协议的实现:Socket 进行数据传输
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled,
doExtensiveHealthChecks);
//HttpCodec 处理解析请求与响应的工具类
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
//查找连接并在健康状况下返回。 如果不健康,则重复该过程直到找到健康的连接一直没找到,最终创建新的socket连接。
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis,
boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//TODO 真正找连接的方法
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good.
// If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
findConnection 函数的实现为:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
....
//TODO 从连接池中获取连接 (先看put加入连接池)
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
...
//TODO 连接可用就返回 否则需要创建新的连接
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a
// connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);//从connectionPool中获取可用的连接
//如果获取到了可用的连接,标识 foundPooledConnection 已经找到了连接,将找到的连接赋值给 result ,break程序
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//如果到了这里就说明没有找到可以用的连接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes
// it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 创建新的连接
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 执行连接
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
//加入连接池
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
这里先分析下 Internal.instance.get(connectionPool, address, this, route); 函数的实现为
//静态的对象
static {
Internal.instance = new Internal() {
....
@Override
public RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
...
}
}
pool.get(address, streamAllocation, route);函数实现为
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
//TODO 遍历
for (RealConnection connection : connections) {
//TODO 检查连接是否复用
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
connections本质为 private final Deque<RealConnection> connections = new ArrayDeque<>();
connection.isEligible(address, route)函数实现为:
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
...
return true; // The caller's address can be carried by this connection.
}
可以看出复用的条件是host要为一样才能实现复用
假设程序第一次使用,这里就不能够从缓存池中获取到可用的连接,所以 程序就会继续的往下执行 当执行到
//TODO 创建新的连接 ,创建一个RealConnection对象
result = new RealConnection(connectionPool, selectedRoute);
//接着 执行连接
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,connectionRetryEnabled, call, eventListener);
//连接操作
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
...
//连接socket
connectSocket(connectTimeout, readTimeout, call, eventListener);
...
}
connectSocket() 函数的实现为:
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
/**
* TODO ProxySelector.getDefault().select(new URI("path"));
*/
Proxy proxy = route.proxy();
Address address = route.address();
//创建socket对象,这里会直接的创建一个Sokcet对象,通过 new Socket(proxy)的方式
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
//回调通知连接开始
eventListener.connectStart(call, route.socketAddress(), proxy);
//设置连接socket超时时间
rawSocket.setSoTimeout(readTimeout);
try {
//执行socket的连接 连接的地址为route.socketAddress() 具体的实现类 AndroidPlatform
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//连接完socket之后,获取到对应的socket的InputStream 以及 OutputStream
//Okio.source 主要是获取到socket.getInputStream()
source = Okio.buffer(Okio.source(rawSocket));
//Okio.sink 主要是获取到socket.getOutputStream()
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);函数的实现为:
//执行socket的连接
@Override public void connectSocket(Socket socket, InetSocketAddress address,int connectTimeout) throws IOException {
try {
//socket的连接操作,最终还是调用了socket的connect函数完成连接的操作
socket.connect(address, connectTimeout);
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
} catch (SecurityException e) {
// Before android 4.3, socket.connect could throw a SecurityException
// if opening a socket resulted in an EACCES error.
IOException ioException = new IOException("Exception in connect");
ioException.initCause(e);
throw ioException;
} catch (ClassCastException e) {
// On android 8.0, socket.connect throws a ClassCastException due to a bug
// see https://issuetracker.google.com/issues/63649622
if (Build.VERSION.SDK_INT == 26) {
IOException ioException = new IOException("Exception in connect");
ioException.initCause(e);
throw ioException;
} else {
throw e;
}
}
}
程序继续往下面执行
source = Okio.buffer(Okio.source(rawSocket));
//Okio.sink 主要是获取到socket.getOutputStream()
sink = Okio.buffer(Okio.sink(rawSocket));
Okio.source(rawSocket)函数的实现为
public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
可以看出最终是获取到socket中的inputStream封装在Source对象中
sink = Okio.buffer(Okio.sink(rawSocket));函数的实现为:
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
可以看出最终是获取到socket中的OutputStream封装在Sink对象中
findConnection函数继续往下面执行,当执行到了这里
synchronized (connectionPool) {
reportedAcquired = true;
//TODO 加入连接池
// Pool the connection.
Internal.instance.put(connectionPool, result);
...
}
最终的函数实现会进入到ConnectPool中对应的函数
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//TODO 每次加入缓存 先执行检测清理
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
//最终将这个连接保存到了连接池的队列中
connections.add(connection);
}
下面来分析下连接池中的清理操作: 连接池里面有一个线程是用来清理连接的,清理当一个连接在连接池中超过了最大的存活的时间之后,就会被清理掉
cleanupRunning为一个标识 boolean cleanupRunning; 标识当前的清理线程是否正在运行
//构建一个线程池 ,同时Util.threadFactory("OkHttp ConnectionPool", true) 后面的参数设置为true,代表他是一个守护线程
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
嗯,这个清理的线程池跟异步任务执行的线程池配置是一样的,除了最后一个参数 Util.threadFactory("OkHttp ConnectionPool", true)
public static ThreadFactory threadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, name);
//TODO 是否设置为守护线程 ,设置为守护线程化,当一个应用程序当没有任何的线程正在运行的化,此时守护线程会退出,如果不设置为守护线程的化,
//那么这个应用程序就不会退出
result.setDaemon(daemon);
return result;
}
};
}
//最大闲置连接数量
private final int maxIdleConnections;
//每个连接的最大存活时间
private final long keepAliveDurationNs;
//清理线程 Runnable任务实现
private final Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {
while (true) {
//获取下一个需要的清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
//等待多久达到最大闲置
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
long waitNanos = cleanup(System.nanoTime());函数的实现为:
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
//遍历当前所有连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//TODO 正在使用 小于0表示闲置
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
//TODO 获得的最长闲置时间的连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//TODO 闲置时间大于最大保存时间 或者连接闲置数量大于最大允许闲置数 则移除最长闲置的连接
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it
// below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//TODO 闲置数量大于0(存在闲置连接) 获得等待时间(等待多久达到最大闲置)
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//TODO 存在正在使用连接
// All connections are in use. It'll be at least the keep alive duration 'til we
// run again.
return keepAliveDurationNs;
} else {
//TODO 连接池中不存在连接
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
回到前面 函数继续执行
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//TODO RealConnection 对Socket连接的封装
//TODO TPC/IP协议是传输层协议,主要解决数据如何在网络中传输
//TODO Socket则是对TCP/IP协议的封装和应用(程序员层面上)。
//TODO Http 应用层协议,解决如何包装数据
//TODO 使用Http协议封装数据,借助TCP/IP协议的实现:Socket 进行数据传输
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled,
doExtensiveHealthChecks);
//TODO HttpCodec 处理解析请求与响应的工具类
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
当程序执行到 HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
//得到一个HttpCodec对象
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
//设置socket的超时时间
socket.setSoTimeout(chain.readTimeoutMillis());
//设置socket的读取超时时间
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
//设置socket的写超时时间
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
//然后构建一个Http1Codec对象
return new Http1Codec(client, streamAllocation, source, sink);
}
}
这里我们会构建一个Http1Codec对象 ,对应的构造函数为:
//构建一个Http1Codec对象,source 为OutPutStream的封装 sink为InputStream的封装
public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
函数继续执行到了ConnectInterceptor中的intercept中
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//TODO 连接服务器/复用socket
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//得到RealConnection对象
RealConnection connection = streamAllocation.connection();
//执行下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
当执行 realChain.proceed(request, streamAllocation, httpCodec, connection);的时候,此时四个参数都有值了
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError(); index代表当前正在执行的拦截器在集合中的索引,所以这个所以不能大于拦截器集合的大小值
...
// Call the next interceptor in the chain. 这里又构建一个RealInterceptorChain 对象,要注意这里的索引为index+1,而且其他的参数都为同一个对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);然后从集合中获取到当前所有的Interceptor对象
Response response = interceptor.intercept(next);//然后执行对象的对应的方法,同时将刚构建的RealInterceptorChain对象传递进去,注意这里不会往下执行,跳到了另一边了
...
return response;
}
这个时候获取到的是拦截器对象为CallServerInterceptor对象,执行对应的方法
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//回调写请求头开始
realChain.eventListener().requestHeadersStart(realChain.call());
//写请求头
httpCodec.writeRequestHeaders(request);
//回调写请求头结束
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
....
//完成请求,
httpCodec.finishRequest();
//构建ResonseBuilder对象
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
//获取返回的状态码
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
...
//返回读取的结果
return response;
}
首先分析
//写请求头
httpCodec.writeRequestHeaders(request); 函数的实现为:
@Override public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
/** Returns bytes of a request header for sending on an HTTP transport. */
//写请求头,最终是调用sink的write方法,sink为sokcet的OutputStream的封装,所以最终是将结果写到了sokcet的OutputStream中
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
最终是调用sink的write方法,sink为sokcet的OutputStream的封装,所以最终是将结果写到了sokcet的OutputStream中
之后执行
//构建ResonseBuilder对象
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
responseBuilder = httpCodec.readResponseHeaders(false); 函数的实现为:
//获取Response Headers
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
//通过socket中的来获取返回的结果,这里读取状态栏,将读取的结果封装成一个StatusLine对象
StatusLine statusLine = StatusLine.parse(readHeaderLine());
//将读取的状态栏的结果,封装成Response.Builder对象
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
StatusLine statusLine = StatusLine.parse(readHeaderLine());函数的实现为:
//读取请求结果的头部内容
private String readHeaderLine() throws IOException {
//source 为 socket的InputStream的封装,所以一旦连接上,就可以获取返回的结果了
String line = source.readUtf8LineStrict(headerLimit);
headerLimit -= line.length();
return line;
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
可以看出最终是调用了 source 为 socket的InputStream的封装,所以一旦连接上,就可以获取返回的结果了,接下来就是解析返回的请求头了,之后封装成一个response对象返回
继而回到了ConnectInterceptor 中的Intercept函数
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//TODO 连接服务器/复用socket
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//得到RealConnection对象
RealConnection connection = streamAllocation.connection();
//执行下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
发现他没有任何的处理,是直接的返回,所以又回到了另一个拦截器那
@Override public Response intercept(Chain chain) throws IOException {
.....
//TODO 执行下一个拦截器
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
//TODO 如果存在缓存 更新
if (cacheResponse != null) {
//TODO 304响应码 自从上次请求后,请求需要响应的内容未发生改变
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//TODO 缓存Response
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//如果当前是有设置cache对象的
if (cache != null) {
//判断当前的respon有body部分,并且当前的请求是允许缓存的
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
//缓存当前的结果,缓存是使用DiskLru算法来实现的
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
//如果当前的缓存是非法的缓存,就移除掉
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
如果我们这里给了cache 并且给的是Okhttp中的InternalCache,那么 cache.put(response); 就会执行到对应的方法
final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}
@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}
@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
Cache.this.put(response); 函数的实现
@Nullable CacheRequest put(Response response) {
....
//最终使用了DiskLru来缓存文件的
Entry entry = new Entry(response);
DiskLruCache.Editor editor = null;
try {
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
entry.writeTo(editor);
return new CacheRequestImpl(editor);
} catch (IOException e) {
abortQuietly(editor);
return null;
}
}
当程序继续执行到cacheWritingResponse(cacheRequest, response); 这里又会返回一个新建的Response对象,返回到上一个拦截器
@Override public Response intercept(Chain chain) throws IOException {
...
//TODO 执行下一个拦截器 ,上一个拦截器处理之后的结果,如果当前有设置缓存的化,并且缓存是可以用的化,这里返回的为 cacheWritingResponse 返回的请求结果
Response networkResponse = chain.proceed(requestBuilder.build());
//保存用户的请求头部信息,如果当前有设置Cookie的化
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
上边在得到上一个拦截器返回的数据之后,这边又根据是否有设置CookieJar来保存请求的头,然后又封装一个Response对象返回给上一个拦截器,最终又返回给我们自定义的拦截器
通过拦截器的时候,我们可以看到我们自定义的拦截器是第一个先执行的,当获取到结果之后,我们的自定义的拦截器是最后一个执行的,每一个拦截器都负责值的内容,各司其职
拦截器的总结
等待队列的执行
前面说过等待的队列什么时候会执行,这里解决这个疑问
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
当进行请求的时候,不管结果怎么样,最后都会执行到了client.dispatcher().finished(this);
/**
* Used by {@code Call#execute} to signal completion.
*/
void finished(RealCall call) {
finished(runningSyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//TODO 移除队列
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//TODO 检查执行 readyAsyncCalls 中的请求
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//闲置调用
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
由于 promoteCalls 为true,所以会执行 promoteCall() ,promoteCall()函数的实现为:
private void promoteCalls() {
//如果当前正在运行的队列数量还是超过了最大的限制,直接返回
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//如果当前等待的队列为空,也直接返回
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//如果到了这里,就说明,正在运行的队列没有达到最大的限制,等待的队列也不为空,下面将等待队列的任务移出添加到运行的队列中
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//TODO 相同host的请求没有达到最大
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}