博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty实践(四):心跳检测实现
阅读量:6241 次
发布时间:2019-06-22

本文共 11638 字,大约阅读时间需要 38 分钟。

心跳检测的概念

在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。

Netty实现心跳检测代码实例

心跳信息对象

主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package 
day4;
 
import 
java.io.Serializable;
import 
java.util.Date;
import 
java.util.HashMap;
import 
java.util.Map;
 
/**
 
* Created by zhangfengzhe on 2017/2/4.
 
*/
public 
class 
HeartInfo 
implements 
Serializable{
 
    
private 
String ip;
 
    
private 
int 
port;
 
    
private 
Date lasttime;
 
    
private 
Map<String , String> cpuInfo = 
new 
HashMap<String,String>();
 
    
private 
Map<String , String> memInfo = 
new 
HashMap<String, String>();
 
    
public 
String getIp() {
        
return 
ip;
    
}
 
    
public 
void 
setIp(String ip) {
        
this
.ip = ip;
    
}
 
    
public 
int 
getPort() {
        
return 
port;
    
}
 
    
public 
void 
setPort(
int 
port) {
        
this
.port = port;
    
}
 
    
public 
Date getLasttime() {
        
return 
lasttime;
    
}
 
    
public 
void 
setLasttime(Date lasttime) {
        
this
.lasttime = lasttime;
    
}
 
    
public 
Map<String, String> getCpuInfo() {
        
return 
cpuInfo;
    
}
 
    
public 
void 
setCpuInfo(Map<String, String> cpuInfo) {
        
this
.cpuInfo = cpuInfo;
    
}
 
    
public 
Map<String, String> getMemInfo() {
        
return 
memInfo;
    
}
 
    
public 
void 
setMemInfo(Map<String, String> memInfo) {
        
this
.memInfo = memInfo;
    
}
 
    
@Override
    
public 
String toString() {
        
return 
"HeartInfo{" 
+
                
"ip='" 
+ ip + '\
'' 
+
                
", port=" 
+ port +
                
", lasttime=" 
+ lasttime +
                
", cpuInfo=" 
+ cpuInfo +
                
", memInfo=" 
+ memInfo +
                
'}'
;
    
}
}

JBoss Marshalling编解码处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package 
day3;
 
import 
io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import 
io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import 
io.netty.handler.codec.marshalling.MarshallerProvider;
import 
io.netty.handler.codec.marshalling.MarshallingDecoder;
import 
io.netty.handler.codec.marshalling.MarshallingEncoder;
import 
io.netty.handler.codec.marshalling.UnmarshallerProvider;
 
import 
org.jboss.marshalling.MarshallerFactory;
import 
org.jboss.marshalling.Marshalling;
import 
org.jboss.marshalling.MarshallingConfiguration;
 
/**
 
* Marshalling工厂
 
*/
public 
final 
class 
MarshallingCodeCFactory {
 
    
/**
     
* 创建Jboss Marshalling解码器MarshallingDecoder
     
* @return MarshallingDecoder
     
*/
    
public 
static 
MarshallingDecoder buildMarshallingDecoder() {
       
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
      
final 
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
      
//创建了MarshallingConfiguration对象,配置了版本号为5 
      
final 
MarshallingConfiguration configuration = 
new 
MarshallingConfiguration();
      
configuration.setVersion(
5
);
      
//根据marshallerFactory和configuration创建provider
      
UnmarshallerProvider provider = 
new 
DefaultUnmarshallerProvider(marshallerFactory, configuration);
      
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
      
MarshallingDecoder decoder = 
new 
MarshallingDecoder(provider, 
1024
);
      
return 
decoder;
    
}
 
    
/**
     
* 创建Jboss Marshalling编码器MarshallingEncoder
     
* @return MarshallingEncoder
     
*/
    
public 
static 
MarshallingEncoder buildMarshallingEncoder() {
      
final 
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
      
final 
MarshallingConfiguration configuration = 
new 
MarshallingConfiguration();
      
configuration.setVersion(
5
);
      
MarshallerProvider provider = 
new 
DefaultMarshallerProvider(marshallerFactory, configuration);
      
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
      
MarshallingEncoder encoder = 
new 
MarshallingEncoder(provider);
      
return 
encoder;
    
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package 
day4;
 
import 
io.netty.bootstrap.Bootstrap;
import 
io.netty.channel.ChannelFuture;
import 
io.netty.channel.ChannelInitializer;
import 
io.netty.channel.EventLoopGroup;
import 
io.netty.channel.nio.NioEventLoopGroup;
import 
io.netty.channel.socket.SocketChannel;
import 
io.netty.channel.socket.nio.NioSocketChannel;
 
public 
class 
Client {
 
    
   
public 
static 
void 
main(String[] args) 
throws 
Exception{
       
      
EventLoopGroup group = 
new 
NioEventLoopGroup();
      
Bootstrap b = 
new 
Bootstrap();
 
      
final 
int 
port = 
8765
;
      
final 
String serverIP = 
"127.0.0.1"
;
 
      
b.group(group)
       
.channel(NioSocketChannel.
class
)
       
.handler(
new 
ChannelInitializer<SocketChannel>() {
         
@Override
         
protected 
void 
initChannel(SocketChannel sc) 
throws 
Exception {
            
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            
sc.pipeline().addLast(
new 
ClientHandler(port));
         
}
      
});
       
      
ChannelFuture cf = b.connect(serverIP, port).sync();
 
      
cf.channel().closeFuture().sync();
      
group.shutdownGracefully();
   
}
}

Client Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package 
day4;
 
import 
io.netty.channel.ChannelHandlerAdapter;
import 
io.netty.channel.ChannelHandlerContext;
import 
io.netty.util.ReferenceCountUtil;
 
import 
java.net.InetAddress;
import 
java.net.UnknownHostException;
import 
java.util.concurrent.Executors;
import 
java.util.concurrent.ScheduledExecutorService;
import 
java.util.concurrent.ScheduledFuture;
import 
java.util.concurrent.TimeUnit;
 
/**
 
* Created by zhangfengzhe on 2017/2/4.
 
*/
public 
class 
ClientHandler 
extends 
ChannelHandlerAdapter {
 
    
private 
String ip;
    
private 
int 
port;
    
private 
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
1
);
    
private 
ScheduledFuture<?> scheduledFuture;
 
    
private 
static 
final 
String SUCCESS = 
"OK"
;
 
    
public 
ClientHandler(){}
 
    
public 
ClientHandler(
int 
port) {
 
        
this
.port = port;
 
        
//获取本机IP
        
try 
{
            
this
.ip = InetAddress.getLocalHost().getHostAddress();
        
catch 
(UnknownHostException e) {
            
e.printStackTrace();
        
}
 
    
}
 
    
//通道建立初始化时  发送信息 准备握手验证
    
@Override
    
public 
void 
channelActive(ChannelHandlerContext ctx) 
throws 
Exception {
 
        
String authInfo = 
this
.ip + 
":" 
this
.port;
 
        
ctx.writeAndFlush(authInfo);
    
}
 
    
//当服务器发送认证信息后,开始启动心跳发送
    
@Override
    
public 
void 
channelRead(ChannelHandlerContext ctx, Object msg) 
throws 
Exception {
 
        
if
(msg 
instanceof 
String){
 
            
//认证成功
            
if
(SUCCESS.equals((String)msg)){
 
                
this
.scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(
new 
HeartTask(ctx,ip,port),
2
,
3
, TimeUnit.SECONDS);
 
            
}
else
{
 
                
System.out.println(
"服务器发来消息:" 
+ msg);
            
}
 
        
}
 
        
ReferenceCountUtil.release(msg);
 
    
}
 
    
//如果出现异常 取消定时
    
@Override
    
public 
void 
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws 
Exception {
 
        
cause.printStackTrace();
        
if
(
this
.scheduledFuture != 
null
){
            
this
.scheduledFuture.cancel(
true
);
            
this
.scheduledFuture = 
null
;
        
}
 
    
}
}

Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。

心跳任务HeartTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package 
day4;
 
import 
io.netty.channel.ChannelHandlerContext;
import 
org.hyperic.sigar.CpuPerc;
import 
org.hyperic.sigar.Mem;
import 
org.hyperic.sigar.Sigar;
 
import 
java.util.Date;
import 
java.util.Random;
 
/**
 
* Created by zhangfengzhe on 2017/2/4.
 
*/
public 
class 
HeartTask 
implements  
Runnable{
 
    
//持有引用,方便读写操作
    
private 
ChannelHandlerContext ctx;
 
    
private 
HeartInfo heartInfo = 
new 
HeartInfo();
 
    
public 
HeartTask(ChannelHandlerContext ctx, String ip, 
int 
port) {
 
        
this
.ctx = ctx;
 
        
heartInfo.setIp(ip);
        
heartInfo.setPort(port);
    
}
 
    
@Override
    
public 
void 
run() {
 
        
try
{
            
//利用sigar获取 内存/CPU方面的信息 ; 利用CTX给服务器端发送消息
            
Sigar sigar = 
new 
Sigar();
 
            
//内存使用信息memory
            
Mem mem = sigar.getMem();
            
heartInfo.getMemInfo().put(
"total"
,String.valueOf(mem.getTotal()));
            
heartInfo.getMemInfo().put(
"used"
,String.valueOf(mem.getUsed()));
            
heartInfo.getMemInfo().put(
"free"
,String.valueOf(mem.getFree()));
 
 
            
//CPU使用信息
            
CpuPerc cpuPerc = sigar.getCpuPerc();
            
heartInfo.getCpuInfo().put(
"user"
,String.valueOf(cpuPerc.getUser()));
            
heartInfo.getCpuInfo().put(
"sys"
,String.valueOf(cpuPerc.getSys()));
            
heartInfo.getCpuInfo().put(
"wait"
,String.valueOf(cpuPerc.getWait()));
            
heartInfo.getCpuInfo().put(
"idle"
,String.valueOf(cpuPerc.getIdle()));
 
            
heartInfo.setLasttime(
new 
Date());
 
            
ctx.writeAndFlush(heartInfo);
 
        
}
catch 
(Exception e){
            
e.printStackTrace();
        
}
    
}
}

首先,为了方便在心跳任务中进行读写操作,HeartTask持有ChannelHandlerContext的引用。其次,为了方便收集系统的内存、CPU信息,这里使用了Sigar,也是在实际中引用非常广泛的一个工具。

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package 
day4;
 
import 
io.netty.bootstrap.ServerBootstrap;
import 
io.netty.channel.ChannelFuture;
import 
io.netty.channel.ChannelInitializer;
import 
io.netty.channel.ChannelOption;
import 
io.netty.channel.EventLoopGroup;
import 
io.netty.channel.nio.NioEventLoopGroup;
import 
io.netty.channel.socket.SocketChannel;
import 
io.netty.channel.socket.nio.NioServerSocketChannel;
import 
io.netty.handler.logging.LogLevel;
import 
io.netty.handler.logging.LoggingHandler;
 
public 
class 
Server {
 
   
public 
static 
void 
main(String[] args) 
throws 
Exception{
       
      
EventLoopGroup pGroup = 
new 
NioEventLoopGroup();
      
EventLoopGroup cGroup = 
new 
NioEventLoopGroup();
       
      
ServerBootstrap b = 
new 
ServerBootstrap();
      
b.group(pGroup, cGroup)
       
.channel(NioServerSocketChannel.
class
)
       
.option(ChannelOption.SO_BACKLOG, 
1024
)
       
//设置日志
       
.handler(
new 
LoggingHandler(LogLevel.INFO))
       
.childHandler(
new 
ChannelInitializer<SocketChannel>() {
         
protected 
void 
initChannel(SocketChannel sc) 
throws 
Exception {
            
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            
sc.pipeline().addLast(
new 
ServerHandler());
         
}
      
});
       
      
ChannelFuture cf = b.bind(
8765
).sync();
       
      
cf.channel().closeFuture().sync();
      
pGroup.shutdownGracefully();
      
cGroup.shutdownGracefully();
       
   
}
}

Server Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package 
day4;
 
import 
io.netty.channel.ChannelHandlerAdapter;
import 
io.netty.channel.ChannelHandlerContext;
 
import 
java.util.ArrayList;
import 
java.util.HashMap;
import 
java.util.List;
import 
java.util.Map;
 
/**
 
* Created by zhangfengzhe on 2017/2/4.
 
*/
public 
class 
ServerHandler 
extends 
ChannelHandlerAdapter {
 
    
//KEY: ip:port VALUE: HeartInfo
    
private 
Map<String,HeartInfo> heartInfoMap = 
new 
HashMap<String, HeartInfo>();
 
    
private 
static 
final 
List<String> authList = 
new 
ArrayList<String>();
 
    
static 
{
        
//从其他地方加载出来的IP列表
        
authList.add(
"192.168.99.219:8765"
);
    
}
 
 
    
//服务器会接收到2种消息 一个是客户端初始化时发送过来的认证信息 第二个是心跳信息
    
@Override
    
public 
void 
channelRead(ChannelHandlerContext ctx, Object msg) 
throws 
Exception {
 
        
if
(msg 
instanceof 
String){
 
            
if
(authList.contains(msg)){ 
//验证通过
                
ctx.writeAndFlush(
"OK"
);
            
}
else
{
                
ctx.writeAndFlush(
"不在认证列表中..."
);
            
}
 
        
}
else 
if
(msg 
instanceof 
HeartInfo){
 
            
System.out.println((HeartInfo)msg);
 
            
ctx.writeAndFlush(
"心跳接收成功!"
);
 
            
HeartInfo heartInfo = (HeartInfo)msg;
            
heartInfoMap.put(heartInfo.getIp() + 
":" 
+ heartInfo.getPort(),heartInfo);
        
}
 
    
}
}

运行结果

Client端

Server端

到这里,心跳检测就实现了,就这么简单,你会了么,See U~

本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1895031,如需转载请自行联系原作者

你可能感兴趣的文章
Scrum 项目 4.0-5.0-约教网站开发(一)
查看>>
CSS3变形transform 2D初级了解
查看>>
uva 11806 Cheerleaders (容斥)
查看>>
[HAOI2012]音量调节
查看>>
week07 codelab02 C72
查看>>
ubuntu系统备份与还原
查看>>
人无股权不富
查看>>
JavaScript屏蔽Backspace键
查看>>
dom4j的安装
查看>>
graphical Layout调大一点
查看>>
Python中使用lambda函数
查看>>
句柄类的应用中减少重复编译的方法
查看>>
dj cookie与session 2
查看>>
协程和异步io
查看>>
Java流程控制
查看>>
去除重复的邮箱
查看>>
杭电1018-Big Number(大数)
查看>>
java调用com组件将office文件转换成pdf
查看>>
LINQ To SQL在N层应用程序中的CUD操作、批量删除、批量更新
查看>>
JQuery zTree v3.2和demo
查看>>