OkHttp3的源码解析

李虎

OkHttp是一款非常常用的网络框架,本文试图对其源码进行解析,为方便大家阅读,先列目录如下:
1.基本用法;
2.Dispatcher解析
3.拦截器执行流程
4.RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor
5.ConnectInterceptor
6.CallServerInterceptor

流程图如下:

首先构造RealCall对象,若是异步请求则由Dispatcher分发,同步则直接进行请求,最后依次执行五个拦截器的intercept方法完成请求。

1. 基本用法

首先是创建OkHttpClient对象。

OkHttpClient client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();  

这里采用建造者模式,我们可以很方便的设置各个参数。下面我详细看下OkHttpClient.Builder()这个内部类的构造方法里有哪些参数。

public Builder() {  
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }  

这里有两个参数比较重要,一个是dispatcher,由它来决定异步请求是直接执行还是放入等待队列;另一个是connectionPool,OkHttp把请求都抽象成Connection,而connectionPool就是来管理这些Connection的。这两者后文都会详细讲解。

第二步就是创建Request对象。

Request request = new Request.Builder().url("http://www.baidu.com").get().build();  

同样采用了建造者模式。这里封装了请求的URL、请求方式等信息。

第三步是创建Call对象。

Call call = client.newCall(request);  

由于Call是个接口,具体的操作都是在其实现类RealCall里完成的,来看RealCall的构造方法。

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {  
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
    this.eventListener = eventListenerFactory.create(this);
  }   

可以看到它将前面创建的OkHttpClient和Request对象都传给了RealCall。

第四步,根据是同步请求还是异步请求调用不同的方法。

同步请求:

Response response = call.execute();  

我们来具体看下execute()方法。

 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }  

我们直接看重点代码,首先看client.dispatcher().executed(this);client.dispatcher()是获取到Dispatcher对象,然后执行其executed()方法,我们看下这个方法的实现。

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }  

就是将当前的请求call添加到runningSyncCalls这个队列中去,而runningSyncCalls就是同步请求的队列。
下面一行Response result = getResponseWithInterceptorChain();就是具体的网络请求操作,这个后文会细讲。然后我们再来看finally里面的代码client.dispatcher().finished(this);finished()方法代码如下:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {  
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  } 

重点看这一行if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");,将call从runningSyncCalls移除了,如果不能移除就抛出异常。所以总结一下,同步方法所做的工作就是先将call添加到Dispatcher的runningSyncCalls队列中去,完成请求后再将其移除。

异步请求:

call.enqueue(new Callback() {  
            @Override
            public void onFailure(Call call, IOException e) {
                System.out.println("Response:onFailure");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                System.out.println("Response:" + response.body().string());
            }
        });  

这里enqueue最终会调用Dispatcher的enqueue方法,代码如下:

synchronized void enqueue(AsyncCall call) {  
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }  

判断进行中的异步请求队列runningAsyncCalls的size是否小于最大请求数及进行中的请求数是否小于每个主机最大请求数,若都满足,则将call添加到异步请求队列runningAsyncCalls,并放入线程池中执行;若不满足,则将call添加到就绪异步请求队列readyAsyncCalls中。
接下来我们具体看下executorService().execute(call);,也就是将call放入线程池中执行时具体做了哪些操作。

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }  

首先在Response response = getResponseWithInterceptorChain();这一行进行实际的网络请求,这与同步请求是一样的,在finally中执行client.dispatcher().finished(this);,也与同步请求一样。通过判断retryAndFollowUpInterceptor.isCanceled()分别回调onFailure和onResponse,这两个回调方法正是我们传入的callback。

2. Dispatcher解析

上面我们已经反复提到Dispatcher了,现在我们来具体分析Dispatcher这个类,总的来说,它的作用就是管理请求队列,并用线程池执行请求。

/** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();  

首先Dispatcher里维护了这三个队列,readyAsyncCalls是异步请求就绪队列,也就是等待执行的异步请求队列。runningAsyncCalls是进行中的异步请求队列。runningSyncCalls是进行中的同步请求队列。当执行同步请求时比较简单,首先调用synchronized void executed(RealCall call) {runningSyncCalls.add(call);}添加到同步请求队列中,执行完后调用if (!calls.remove(call))将call移除。

当执行异步请求时,方法如下

synchronized void enqueue(AsyncCall call) {  
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }  

如上文所述,通过判断runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost是否满足来决定将call添加到runningAsyncCalls还是readyAsyncCalls,那么问题来了,如果条件不满足,call被添加到readyAsyncCalls中等待了,待条件线程池中有空余的线程时,如何执行call呢。上文也有提到,上述方法中的executorService().execute(call);执行时,最终会调用client.dispatcher().finished(this);方法,方法如下

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {  
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }  

if (!calls.remove(call))runningAsyncCalls进行中的异步请求队列从中将call移除掉,然后执行if (promoteCalls) promoteCalls();,这里promoteCalls这个标志位,同步为false,即不执行promoteCalls(),异步为true,会执行,下面我们来看下这个方法的具体实现。

private void promoteCalls() {  
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }  

对就绪异步请求队列进行了遍历,只要满足if (runningCallsForHost(call) < maxRequestsPerHost),就将其从就绪队列中remove掉,然后添加到进行中的队列,并放入线程池中执行。
最后我们看下线程池的实例化,

public synchronized ExecutorService executorService() {  
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }  

核心线程数为0,这意味着线程空闲超过等待时间时,所以线程都会被杀掉;最大线程数为int最大值,但实际上Dispatcher对最大请求数做了限制为64个,所以最多只会有64个线程;等待时间为60s,即线程空闲一分钟后会被杀掉。

3. 拦截器执行流程

上文我们提到过,不管同步还是异步,最终进行网络请求的都是这一行Response response = getResponseWithInterceptorChain();,来看其具体实现,

Response getResponseWithInterceptorChain() throws IOException {  
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }  

OkHttp进行网络请求的过程就是五个拦截器依次执行的过程,这五个拦截器分别是:
a. 负责失败重试以及重定向的RetryAndFollowUpInterceptor
b. 负责把封装一些头部信息的BridgeInterceptor
c. 负责读取缓存直接返回、更新缓存的CacheInterceptor
d. 负责和服务器建立连接的ConnectInterceptor
e. 负责向服务器发送请求数据、从服务器读取响应数据的CallServerInterceptor.

下面我们理一下这些拦截器是如何依次执行的,上面的代码首先将这些拦截器存入一个list保存起来,然后传入RealInterceptorChain的构造方法里,然后调用RealInterceptorChainproceed方法,在proceed里有如下代码

// Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);  

这里调用了拦截器的intercept方法,注意,这里将index+1了,而在每个intercept又会调用RealInterceptorChainproceed方法,而每次index都会+1,这样就将list里的拦截器的intercept方法都执行了。接下来我们来看这五个拦截器。

4. RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor

RetryAndFollowUpInterceptor是用来进行重试和重定向的,在这个拦截器中创建了StreamAllocation对象,但并未使用,一直传到ConnectInterceptor。
BridgeInterceptor是用来封装一些请求头部信息的。
CacheInterceptor是用于处理缓存的,这三个拦截器这里不做过多延伸,重点来看后两个拦截器。

5. ConnectInterceptor

这个拦截器主要的作用是

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

首先获取在RetryAndFollowUpInterceptor中创建的StreamAllocation对象,然后获取HttpCodec对象和RealConnection对象,HttpCodec用于编码Request和解码Response,RealConnection用于实际进行网络IO传输。然后将这两个对象传递给下一个拦截器。接下来我们进入streamAllocation.newStream(client, doExtensiveHealthChecks);方法详细看看是如何获取HttpCodec对象的。

RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,  
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);  

继续跟进到findHealthyConnection里;

  * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }. 

这是个死循环,获取到candidate后首先判断if (candidate.successCount == 0),满足即代表这是个全新的RealConnection,直接返回;不满足则检查是否健康可用,不可用则跳出此次循环继续获取下个RealConnection对象,直到找到可用的。
findConnection里有如下核心代码,

// Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }  

首先尝试复用RealConnection,如果不为空则直接将其返回;若为空则从connectionPool里获取一个。如果获取不到,就new一个并放入connectionPool。最终调用connect方法进行打开一个socket链接。

6. CallServerInterceptor

具体的网络请求就是在这个拦截器的intercept方法中执行的,重点的代码有以下几步:
a. httpCodec.writeRequestHeaders(request);向socket中写入头部信息。
b. request.body().writeTo(bufferedRequestBody);向socket中写入body信息。
c. httpCodec.finishRequest();写入完毕。
d. responseBuilder = httpCodec.readResponseHeaders(false);读取头部信息。
e. response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();读取body信息。

至此本文对OkHttp的分析已全部完毕,限于篇幅,部分环节的深度还显不够,待日后再补充。如有错漏之处还请指出。通过分析源码我们弄清楚了网络请求的执行流程,以后若有网络方面的问题可以很方便的断点调试。另外,这个框架的设计还有很多值得我们学习的地方,比如Builder模式的使用,在OkHttpClient和Request的初始化中都用到了,我们也可以在自己的项目中使用,当初始化一个类时有大量参数需要传递,就可以使用Builder模式,很方便的通过链式调用传递参数。Dispatcher中线程池的使用也很巧妙,将核心线程数设为0,这样超过等待时间后所有线程都会被杀掉,节省了资源;同时将最大进行中请求数设置为64,这样最多只会创建64个线程,多余的请求则进入等待队列;这些管理子线程的做法都可以在我们项目中借鉴。