本文共 11638 字,大约阅读时间需要 38 分钟。
心跳检测的概念
在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。
Netty实现心跳检测代码实例
心跳信息对象
主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | package day4; import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * Created by zhangfengzhe on 2017/2/4. */ public class HeartInfo implements Serializable{ private String ip; private int port; private Date lasttime; private Map<String , String> cpuInfo = new HashMap<String,String>(); private Map<String , String> memInfo = new HashMap<String, String>(); public String getIp() { return ip; } public void setIp(String ip) { this .ip = ip; } public int getPort() { return port; } public void setPort( int port) { this .port = port; } public Date getLasttime() { return lasttime; } public void setLasttime(Date lasttime) { this .lasttime = lasttime; } public Map<String, String> getCpuInfo() { return cpuInfo; } public void setCpuInfo(Map<String, String> cpuInfo) { this .cpuInfo = cpuInfo; } public Map<String, String> getMemInfo() { return memInfo; } public void setMemInfo(Map<String, String> memInfo) { this .memInfo = memInfo; } @Override public String toString() { return "HeartInfo{" + "ip='" + ip + '\ '' + ", port=" + port + ", lasttime=" + lasttime + ", cpuInfo=" + cpuInfo + ", memInfo=" + memInfo + '}' ; } } |
JBoss Marshalling编解码处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | package day3; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工厂 */ public final class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" ); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion( 5 ); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 ); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" ); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion( 5 ); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } } |
Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | package day4; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); final int port = 8765 ; final String serverIP = "127.0.0.1" ; b.group(group) .channel(NioSocketChannel. class ) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast( new ClientHandler(port)); } }); ChannelFuture cf = b.connect(serverIP, port).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } } |
Client Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | package day4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * Created by zhangfengzhe on 2017/2/4. */ public class ClientHandler extends ChannelHandlerAdapter { private String ip; private int port; private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool( 1 ); private ScheduledFuture<?> scheduledFuture; private static final String SUCCESS = "OK" ; public ClientHandler(){} public ClientHandler( int port) { this .port = port; //获取本机IP try { this .ip = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } } //通道建立初始化时 发送信息 准备握手验证 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String authInfo = this .ip + ":" + this .port; ctx.writeAndFlush(authInfo); } //当服务器发送认证信息后,开始启动心跳发送 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof String){ //认证成功 if (SUCCESS.equals((String)msg)){ this .scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay( new HeartTask(ctx,ip,port), 2 , 3 , TimeUnit.SECONDS); } else { System.out.println( "服务器发来消息:" + msg); } } ReferenceCountUtil.release(msg); } //如果出现异常 取消定时 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if ( this .scheduledFuture != null ){ this .scheduledFuture.cancel( true ); this .scheduledFuture = null ; } } } |
Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。
心跳任务HeartTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package day4; import io.netty.channel.ChannelHandlerContext; import org.hyperic.sigar.CpuPerc; import org.hyperic.sigar.Mem; import org.hyperic.sigar.Sigar; import java.util.Date; import java.util.Random; /** * Created by zhangfengzhe on 2017/2/4. */ public class HeartTask implements Runnable{ //持有引用,方便读写操作 private ChannelHandlerContext ctx; private HeartInfo heartInfo = new HeartInfo(); public HeartTask(ChannelHandlerContext ctx, String ip, int port) { this .ctx = ctx; heartInfo.setIp(ip); heartInfo.setPort(port); } @Override public void run() { try { //利用sigar获取 内存/CPU方面的信息 ; 利用CTX给服务器端发送消息 Sigar sigar = new Sigar(); //内存使用信息memory Mem mem = sigar.getMem(); heartInfo.getMemInfo().put( "total" ,String.valueOf(mem.getTotal())); heartInfo.getMemInfo().put( "used" ,String.valueOf(mem.getUsed())); heartInfo.getMemInfo().put( "free" ,String.valueOf(mem.getFree())); //CPU使用信息 CpuPerc cpuPerc = sigar.getCpuPerc(); heartInfo.getCpuInfo().put( "user" ,String.valueOf(cpuPerc.getUser())); heartInfo.getCpuInfo().put( "sys" ,String.valueOf(cpuPerc.getSys())); heartInfo.getCpuInfo().put( "wait" ,String.valueOf(cpuPerc.getWait())); heartInfo.getCpuInfo().put( "idle" ,String.valueOf(cpuPerc.getIdle())); heartInfo.setLasttime( new Date()); ctx.writeAndFlush(heartInfo); } catch (Exception e){ e.printStackTrace(); } } } |
首先,为了方便在心跳任务中进行读写操作,HeartTask持有ChannelHandlerContext的引用。其次,为了方便收集系统的内存、CPU信息,这里使用了Sigar,也是在实际中引用非常广泛的一个工具。
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package day4; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel. class ) .option(ChannelOption.SO_BACKLOG, 1024 ) //设置日志 .handler( new LoggingHandler(LogLevel.INFO)) .childHandler( new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast( new ServerHandler()); } }); ChannelFuture cf = b.bind( 8765 ).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } |
Server Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | package day4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by zhangfengzhe on 2017/2/4. */ public class ServerHandler extends ChannelHandlerAdapter { //KEY: ip:port VALUE: HeartInfo private Map<String,HeartInfo> heartInfoMap = new HashMap<String, HeartInfo>(); private static final List<String> authList = new ArrayList<String>(); static { //从其他地方加载出来的IP列表 authList.add( "192.168.99.219:8765" ); } //服务器会接收到2种消息 一个是客户端初始化时发送过来的认证信息 第二个是心跳信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof String){ if (authList.contains(msg)){ //验证通过 ctx.writeAndFlush( "OK" ); } else { ctx.writeAndFlush( "不在认证列表中..." ); } } else if (msg instanceof HeartInfo){ System.out.println((HeartInfo)msg); ctx.writeAndFlush( "心跳接收成功!" ); HeartInfo heartInfo = (HeartInfo)msg; heartInfoMap.put(heartInfo.getIp() + ":" + heartInfo.getPort(),heartInfo); } } } |
运行结果
Client端
Server端
到这里,心跳检测就实现了,就这么简单,你会了么,See U~
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1895031,如需转载请自行联系原作者