java 从零开始手写 RPC (07)-timeout 超时处理

2021年11月26日 阅读数:14
这篇文章主要向大家介绍java 从零开始手写 RPC (07)-timeout 超时处理,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

《过期不候》java

最漫长的莫过于等待git

咱们不可能永远等一我的github

就像请求dom

永远等待响应socket

在这里插入图片描述

超时处理

java 从零开始手写 RPC (01) 基于 socket 实现ide

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端oop

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?性能

java 从零开始手写 RPC (04) 序列化学习

java 从零开始手写 RPC (05) 基于反射的通用化实现测试

必要性

前面咱们实现了通用的 rpc,可是存在一个问题,同步获取响应的时候没有超时处理。

若是 server 挂掉了,或者处理太慢,客户端也不可能一直傻傻的等。

当外部的调用超过指定的时间后,就直接报错,避免无心义的资源消耗。

思路

调用的时候,将开始时间保留。

获取的时候检测是否超时。

同时建立一个线程,用来检测是否有超时的请求。

实现

思路

调用的时候,将开始时间保留。

获取的时候检测是否超时。

同时建立一个线程,用来检测是否有超时的请求。

超时检测线程

为了避免影响正常业务的性能,咱们另起一个线程检测调用是否已经超时。

package com.github.houbb.rpc.client.invoke.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
import com.github.houbb.rpc.common.support.time.impl.Times;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 超时检测线程
 * @author binbin.hou
 * @since 0.0.7
 */
public class TimeoutCheckThread implements Runnable{

    /**
     * 请求信息
     * @since 0.0.7
     */
    private final ConcurrentHashMap<String, Long> requestMap;

    /**
     * 请求信息
     * @since 0.0.7
     */
    private final ConcurrentHashMap<String, RpcResponse> responseMap;

    /**
     * 新建
     * @param requestMap  请求 Map
     * @param responseMap 结果 map
     * @since 0.0.7
     */
    public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,
                              ConcurrentHashMap<String, RpcResponse> responseMap) {
        ArgUtil.notNull(requestMap, "requestMap");
        this.requestMap = requestMap;
        this.responseMap = responseMap;
    }

    @Override
    public void run() {
        for(Map.Entry<String, Long> entry : requestMap.entrySet()) {
            long expireTime = entry.getValue();
            long currentTime = Times.time();

            if(currentTime > expireTime) {
                final String key = entry.getKey();
                // 结果设置为超时,从请求 map 中移除
                responseMap.putIfAbsent(key, RpcResponseFactory.timeout());
                requestMap.remove(key);
            }
        }
    }

}

这里主要存储请求,响应的时间,若是超时,则移除对应的请求。

线程启动

在 DefaultInvokeService 初始化时启动:

final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
Executors.newScheduledThreadPool(1)
                .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);

DefaultInvokeService

原来的设置结果,获取结果是没有考虑时间的,这里加一下对应的判断。

设置请求时间

  • 添加请求 addRequest

会将过期的时间直接放入 map 中。

由于放入是一次操做,查询多是屡次。

因此时间在放入的时候计算完成。

@Override
public InvokeService addRequest(String seqId, long timeoutMills) {
    LOG.info("[Client] start add request for seqId: {}, timeoutMills: {}", seqId,
            timeoutMills);
    final long expireTime = Times.time()+timeoutMills;
    requestMap.putIfAbsent(seqId, expireTime);
    return this;
}

设置请求结果

  • 添加响应 addResponse
  1. 若是 requestMap 中已经不存在这个请求信息,则说明可能超时,直接忽略存入结果。

  2. 此时检测是否出现超时,超时直接返回超时信息。

  3. 放入信息后,通知其余等待的全部进程。

@Override
public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
    // 1. 判断是否有效
    Long expireTime = this.requestMap.get(seqId);
    // 若是为空,多是这个结果已经超时了,被定时 job 移除以后,响应结果才过来。直接忽略
    if(ObjectUtil.isNull(expireTime)) {
        return this;
    }

    //2. 判断是否超时
    if(Times.time() > expireTime) {
        LOG.info("[Client] seqId:{} 信息已超时,直接返回超时结果。", seqId);
        rpcResponse = RpcResponseFactory.timeout();
    }

    // 这里放入以前,能够添加判断。
    // 若是 seqId 必须处理请求集合中,才容许放入。或者直接忽略丢弃。
    // 通知全部等待方
    responseMap.putIfAbsent(seqId, rpcResponse);
    LOG.info("[Client] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, rpcResponse);
    LOG.info("[Client] seqId:{} 信息已经放入,通知全部等待方", seqId);
    // 移除对应的 requestMap
    requestMap.remove(seqId);
    LOG.info("[Client] seqId:{} remove from request map", seqId);
    synchronized (this) {
        this.notifyAll();
    }
    return this;
}

获取请求结果

  • 获取相应 getResponse
  1. 若是结果存在,直接返回响应结果

  2. 不然进入等待。

  3. 等待结束后获取结果。

@Override
public RpcResponse getResponse(String seqId) {
    try {
        RpcResponse rpcResponse = this.responseMap.get(seqId);
        if(ObjectUtil.isNotNull(rpcResponse)) {
            LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
            return rpcResponse;
        }
        // 进入等待
        while (rpcResponse == null) {
            LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId);
            // 同步等待锁
            synchronized (this) {
                this.wait();
            }
            rpcResponse = this.responseMap.get(seqId);
            LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
        }
        return rpcResponse;
    } catch (InterruptedException e) {
        throw new RpcRuntimeException(e);
    }
}

能够发现获取部分的逻辑没变,由于超时会返回一个超时对象:RpcResponseFactory.timeout();

这是一个很是简单的实现,以下:

package com.github.houbb.rpc.common.rpc.domain.impl;

import com.github.houbb.rpc.common.exception.RpcTimeoutException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;

/**
 * 响应工厂类
 * @author binbin.hou
 * @since 0.0.7
 */
public final class RpcResponseFactory {

    private RpcResponseFactory(){}

    /**
     * 超时异常信息
     * @since 0.0.7
     */
    private static final DefaultRpcResponse TIMEOUT;

    static {
        TIMEOUT = new DefaultRpcResponse();
        TIMEOUT.error(new RpcTimeoutException());
    }

    /**
     * 获取超时响应结果
     * @return 响应结果
     * @since 0.0.7
     */
    public static RpcResponse timeout() {
        return TIMEOUT;
    }

}

响应结果指定一个超时异常,这个异常会在代理处理结果时抛出:

RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
Throwable error = rpcResponse.error();
if(ObjectUtil.isNotNull(error)) {
    throw error;
}
return rpcResponse.result();

测试代码

服务端

咱们故意把服务端的实现添加沉睡,其余保持不变。

public class CalculatorServiceImpl implements CalculatorService {

    public CalculateResponse sum(CalculateRequest request) {
        int sum = request.getOne()+request.getTwo();

        // 故意沉睡 3s
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return new CalculateResponse(true, sum);
    }

}

客户端

设置对应的超时时间为 1S,其余不变:

public static void main(String[] args) {
    // 服务配置信息
    ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
    config.serviceId(ServiceIdConst.CALC);
    config.serviceInterface(CalculatorService.class);
    config.addresses("localhost:9527");
    // 设置超时时间为1S
    config.timeout(1000);

    CalculatorService calculatorService = config.reference();
    CalculateRequest request = new CalculateRequest();
    request.setOne(10);
    request.setTwo(20);

    CalculateResponse response = calculatorService.sum(request);
    System.out.println(response);
}

日志以下:

.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 14:59:40.974] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
...
[INFO] [2021-10-05 14:59:42.504] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端完成,监听地址 localhost:9527
[INFO] [2021-10-05 14:59:42.533] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb', createTime=1633417182525, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 14:59:42.534] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 62e126d9a0334399904509acf8dfe0bb, timeoutMills: 1000
[INFO] [2021-10-05 14:59:42.535] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
...
Exception in thread "main" com.github.houbb.rpc.common.exception.RpcTimeoutException
	at com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.<clinit>(RpcResponseFactory.java:23)
	at com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
	at com.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)
...
[INFO] [2021-10-05 14:59:45.615] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb 信息已超时,直接返回超时结果。
[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 62e126d9a0334399904509acf8dfe0bb, rpcResponse: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb 信息已经放入,通知全部等待方
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb remove from request map
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:59:45.619] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 62e126d9a0334399904509acf8dfe0bb 对应结果已经获取: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
...

能够发现,超时异常。

不足之处

对于超时的处理能够拓展为双向的,好比服务端也能够指定超时限制,避免资源的浪费。

小结

为了便于你们学习,以上源码已经开源:

https://github.com/houbb/rpc

但愿本文对你有所帮助,若是喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

在这里插入图片描述