Okhttp源码解析

之前用到 Okhttp,但是一直没有深入了解,前段时间有时间看了下 Okhttp 的源码,这里记录分析下。主要从以下几方面分析吧:

  1. Okhttp 的简单使用
  2. Okhttp 访问网络的整体流程
  3. 一些重要类的介绍
  4. 一些总结

Okhttp 的简单使用

这里简单介绍下使用,然后从使用方法入手进行分析,详细的使用方法可以参考官方文档或者一些博客介绍,Okhttp使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//创建okHttpClient对象
OkHttpClient mOkHttpClient = new OkHttpClient();
//创建一个Request
final Request request = new Request.Builder()
.url("https://github.com/hongyangAndroid")
.build();
//new call
Call call = mOkHttpClient.newCall(request);
//请求加入调度
call.enqueue(new Callback() {
@Override
public void onFailure(Request request, IOException e)
{
}
@Override
public void onResponse(final Response response) throws IOException
{
//String htmlStr = response.body().string();
}
});

Okhttp 整体流程

从上面的使用我们可以看到,首先先 new 一个 OkhttpClient(),之后通过 OkhttpClient#newCall() 返回一个 Call 对象,通过 Call#enqueue()(异步) 或者 Call#execute()(同步) 返回访问结果。

OkhttpClient 的构造函数就是对一些设置进行初始化,所以直接从 OkhttpClient#newCall() 进行分析了。

1
2
3
4
5
OkhttpClient#newCall()
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}

可以看到,返回的是 Call 的实现类 RealCall,之后会调用 RealCall#enqueue() 和 RealCall#execute(),先看 RealCall#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
RealCall#execute()
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
////////////////////////////////////////////////////
client.dispatcher().executed(this); // 就是把请求加到队列中
Response result = getResponseWithInterceptorChain(); // 真正执行请求
////////////////////////////////////////////////////
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
Dispatcher#executed(RealCall)
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}

其中主要逻辑就是把请求加入队列中,之后通过 getResponseWithInterceptorChain() 执行请求,接下来就看这个函数的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
OkhttpClient#getResponseWithInterceptorChain()
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
// 把用户请求构建成网络请求,包括添加了 header 字段等
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client)); // 建立 sokcet 连接,设置了 http 版本
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
// 向服务器发送 reqeust 并且解析返回的 Response
// 首先是根据 request 和 http 规范构建请求并写到流中,随后解析流里返回的数据并构建 response
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}

可以说这个方法是最核心的方法了,使用了链式处理,对请求的处理都封装成 Interceptor 并一层层调用,其中内置的有 BridgeInterceptor,CacheInterceptor(处理缓存),ConnectInterceptor(建立连接),CallServerInterceptor(处理具体请求和返回),同时我们也可以自己添加拦截器,是通过client.interceptors()添加进去的,接下来先看下拦截器是如何链式调用的,之后再具体看拦截器的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RealInterceptorChain#proceed()
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
Connection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// 省略...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// 省略...
return response;
}

从上面代码可以看出,先构建下一个 interceptor 的 RealInterceptorChain,然后获取到当前的 interceptors,执行 Interceptor#intercept(),并在其中用相同方法调用下一个 interceptor,这样层层调用下去,最后返回的就是被所有 Interceptor 处理过的 Response。

接下来我们看下 Interceptor#intercept() 中是如何处理请求的。重点分析 ConnectInterceptor 和 CallServerInterceptor。

介绍 ConnectInterceptor 之前先简单说下 RetryAndFollowUpInterceptor,下面看到的StreamAllocation 就是这里构建的。

先看 ConnectInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ConnectInterceptor#intercept()
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 这个里面是 socket 连接
// HttpCodec 中封装了连接以后的 buffer source 等
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection(); // 在 newStream() 中会对 connection 赋值
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

首先获取到前面创建的 StreamAllocatoin(这个类封装了连接,请求,流等),接着会调用 StreamAllocation#newStream(),这个方法里调用了 socket 连接到服务器,跟进看下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamAllocation#newStream()
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
// 省略...
// findHealthyConnection 会一直循环直到找到 healthyConnection
// healthyConnection 指 socket没有关闭或者 stream 正常等等, RealConnection#isHealthy()里面有
// 这个里有真正的 socket 连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec;
// 省略...
resultCodec = new Http1Codec(
client, this, resultConnection.source, resultConnection.sink);
// 省略...
return resultCodec;
}

再跟进 findHealthyConnection()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
StreamAllocation#newStream()
-- StreamAllocation#findHealthyConnection()
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) { // 会一直循环直到建立连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
StreamAllocation#newStream()
-- StreamAllocation#findHealthyConnection()
-- StreamAllocation#findConnection()
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
// 省略...
// 查看连接池里有没有相同的连接,有就直接拿来用
// Internal.instance 在 OkHttpClient 中赋值, get() 方法直接调用 pool.get
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
// 省略 route 的选择过程
RealConnection newConnection = new RealConnection(selectedRoute);
synchronized (connectionPool) {
acquire(newConnection); // 加入 allocations list 中
Internal.instance.put(connectionPool, newConnection); // 其实就是调用 pool.put()
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
// 真正连接过程
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());
return newConnection;
}

真正的连接过程在 newConnection#connect() 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
StreamAllocation#newStream()
-- StreamAllocation#findHealthyConnection()
-- StreamAllocation#findConnection()
-- RealConnectoin#connect()
public void connect(...) {
// 省略...
// 这里是 循环建立连接,直到建好为止
while (protocol == null) {
try {
if (route.requiresTunnel()) { // 是否是 https
buildTunneledConnection(connectTimeout, readTimeout, writeTimeout,
connectionSpecSelector);
} else {
// 通过 socket http 连接
buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
}
} catch (IOException e) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
StreamAllocation#newStream()
-- StreamAllocation#findHealthyConnection()
-- StreamAllocation#findConnection()
-- RealConnectoin#connect()
-- RealConnection#buildConnection()
private void buildConnection(...) throws IOException {
// 主要调用 socket.connect()
connectSocket(connectTimeout, readTimeout);
// 设置 http 协议版本
establishProtocol(readTimeout, writeTimeout, connectionSpecSelector);
}
private void connectSocket(...) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 下面的socketFactory 默认使用的是 DefaultSocketFactory,通过SocketFactory.getDefault() 获取
// DefaultSocketFactory 中的 createSocket() 直接返回了 new Socket()
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 调用 rawSocket.connect()
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
}
// Okio 是另外一个项目,没有直接加到 okhttp 里面? 不合理啊
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
1
2
3
4
5
6
7
8
9
10
11
StreamAllocation#newStream()
-- StreamAllocation#findHealthyConnection()
-- StreamAllocation#findConnection()
-- RealConnectoin#connect()
-- RealConnection#buildConnection()
-- Platform#connectSocket()
public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
socket.connect(address, connectTimeout);
}

经过这么多函数调用,终于调到了 socket,之后再往回看 newStream()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
StreamAllocation#newStream()
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
// 省略...
// findHealthyConnection 会一直循环直到找到 healthyConnection
// healthyConnection 指 socket没有关闭或者 stream 正常等等, RealConnection#isHealthy()里面有
// 这个里有真正的 socket 连接
// 上面已经分析过这个函数了
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec;
// 省略...
// 根据 建立的连接构建 HttpCodec
resultCodec = new Http1Codec(
client, this, resultConnection.source, resultConnection.sink);
// 省略...
return resultCodec;
}

在建立连接以后会构建 HttpCodec,Http1Codec 是对流的封装,这里用的 okio 里的 BufferedSource 和 BufferSink,分别可以对应到 IntputStream 和 OutputStream。

之后就返回到了 ConnectInterceptor#intercept(),他自己的已经处理完成,接着会调用下一个 Interceptor,也就是 CallServerInterceptor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CallServerInterceptor#intercept()
@Override public Response intercept(Chain chain) throws IOException {
// 上一个 Interceptror / ConnectInterceptor 传入的,
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request); // 把 headers 写到 sink 中
// 写入 body
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// sink 相当于 outputstream
// source 相当于 inputstream
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
httpCodec.finishRequest(); // 调用 sink.flush()
Response response = httpCodec.readResponseHeaders() // 读取流里的数据, source.readUtf8LineStrict()
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
// 省略错误处理...
return response;
}

可以看到 CallServerIntercept 的主要作用就是把 request 数据写到输出流中发送到服务器,并从输入流里获取数据构建 response 返回。

整体流程就是这样,放一张流程图

热评文章