实现 Client 端到服务端之间的固定服务 pojo 调用。
我们希望实现一个计算功能。
public interface Calculator { /** * 计算加法 * @param request 请求入参 * @return 返回结果 */ CalculateResponse sum(final CalculateRequest request); } 服务端实现 public class CalculatorService implements Calculator { @Override public CalculateResponse sum(CalculateRequest request) { int sum = request.getOne()+request.getTwo(); return new CalculateResponse(true, sum); } }入参和出参如下:
CalculateRequest.java public class CalculateRequest implements Serializable { private static final long serialVersionUID = 6420751004355300996L; /** * 参数一 */ private int one; /** * 参数二 */ private int two; //Getter & Setter } CalculateResponse.java public class CalculateResponse implements Serializable { private static final long serialVersionUID = -1972014736222511341L; /** * 是否成功 */ private boolean success; /** * 二者的和 */ private int sum; //Getter & Setter }主要处理客户端的调用请求。
import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.common.model.CalculateRequest; import com.github.houbb.rpc.common.model.CalculateResponse; import com.github.houbb.rpc.common.service.Calculator; import com.github.houbb.rpc.server.service.CalculatorService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @author binbin.hou * @since 0.0.1 */ public class RpcServerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(RpcServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String id = ctx.channel().id().asLongText(); log.info("[Server] channel {} connected " + id); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { final String id = ctx.channel().id().asLongText(); CalculateRequest request = (CalculateRequest)msg; log.info("[Server] receive channel {} request: {} from ", id, request); Calculator calculator = new CalculatorService(); CalculateResponse response = calculator.sum(request); // 回写到 client 端 ctx.writeAndFlush(response); log.info("[Server] channel {} response {}", id, response); } }可以使得我们 handler 中直接操作对象即可。
CalculateResponseEncoder.java import com.github.houbb.rpc.common.model.CalculateResponse; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * @author binbin.hou * @since 0.0.3 */ public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> { @Override protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception { boolean success = msg.isSuccess(); int result = msg.getSum(); out.writeBoolean(success); out.writeInt(result); } } CalculateRequestDecoder.java import com.github.houbb.rpc.common.model.CalculateRequest; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 请求参数解码 * @author binbin.hou * @since 0.0.3 */ public class CalculateRequestDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int one = in.readInt(); int two = in.readInt(); CalculateRequest request = new CalculateRequest(one, two); out.add(request); } }向客户端发送请求,并且处理服务端响应结果。
import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.core.RpcClient; import com.github.houbb.rpc.common.model.CalculateRequest; import com.github.houbb.rpc.common.model.CalculateResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * <p> 客户端处理类 </p> * * <pre> Created: 2019/10/16 11:30 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.2 */ public class RpcClientHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(RpcClient.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { CalculateRequest request = new CalculateRequest(1, 2); ctx.writeAndFlush(request); log.info("[Client] request is :{}", request); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { CalculateResponse response = (CalculateResponse)msg; log.info("[Client] response is :{}", response); } }