网络通信基础
Socket入门
什么是Socket?
Socket就是为网络服务提供的一种机制。 通讯的两端都有Sokcet. 网络通讯其实就是Sokcet间的通讯 数据在两个Sokcet间通过I0传输。
(可以跨语言传输)
socket编程网络模型
什么是网络模型?
应用层:Http:Http协议 传输层:tcp:Tcp协议 网络层:Ip:Ip协议 链路层:网络层–以太网
任何计算机语言通讯,底层都使用socket技术。 Java、C#、C Socket技术遵循一个规划二进制+IP+端口号
isocket网络编程中也服务器端、客户端
socket分为两个非常核心tcp——- udp(IO流传输) tcp与UDP区别区别:
- udp面向无连接——不会建立连接限制传输64k——不可靠协议(会丢包)
- tcp协议面向连接协议三次握手字节流传输 (udp效率比tcp协议高tcp协议比udp协议靠谐。
http协议的底层使用的时tcp协议进行连接
使用UDP协议完成客户端与服务器端进行通讯
一次完整的通信是有三种socket的(服务端有两种,客户端有一种)
//创建服务端连接
ServerSocket serverSocket = new ServerSocket(8080);
//接收客户端请求阻塞功能 具体的请求是通过它来实现的
Socket accept = serverSocket.accept();
//客户端的socket
Socket socket = new Socket("127.0.0.1", 8080);
Udp协议
package com.mumulx.socket.demo01;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 14:58
*/
/*先写服务器端*/
import javax.print.DocFlavor;
import java.io.IOException;
import java.net.*;
/*
* 在启动了服务端后是不能通过浏览器进行访问的,因为http协议的底层使用的时tcp协议进行连接 tcp与udp二者不能互通
* */
class UdpServer{
public static void main(String[] args) throws IOException {
//1. ip地址+端口号
System.out.println("udp服务器已经启动。。。。。8080");
//创建服务器端口,默认使用本机Ip地址
DatagramSocket ds = new DatagramSocket(8080);
//服务器接受客户端1024个字节
byte[] bytes = new byte[1024];
//定义数据包
DatagramPacket dp = new DatagramPacket(bytes, bytes.length);
//接受客户端请求,将数据封装给数据暴 如果客户端不往服务器端发送请求,就会一直阻塞着
ds.receive(dp);
System.out.format("来源IP地址:%s,端口号:%d %n",dp.getAddress(),dp.getPort());
String s = new String(dp.getData(), 0, dp.getLength());
System.out.println("服务器端接收到了数据: "+s);
//关闭连接
ds.close();
}
}
/*Udp协议 只管发送
* 速度较快
* */
public class UdpClient {
public static void main(String[] args) throws IOException {
System.out.println("Udp客户端启动连接.....");
//不传入端口号 作用客户端 创建一个socket客户端
DatagramSocket ds = new DatagramSocket();
String str = "test";
byte[] bytes = str.getBytes();
DatagramPacket dp = new DatagramPacket(bytes, bytes.length, InetAddress.getByName("127.0.0.1"), 8080);
ds.send(dp);
ds.close();
}
}
Tcp协议
TCP协议采用三次握手成功之后,才开始进行数据传输。 最好一次就会进行四次挥手。
TCP握手协议. 在TCP/IP协议中,TCP 协议采用三次握手建立一一个连接。
第一次握手:建立连接时,客户端发送SYN包(SYN=J)到服务器,并进入SYN_ SEND 状态,等待服务器确认;
第二次握手:服务器收到SYN包,必须确认客户的SYN (ACK=J+1) ,同时自己也发送一个SYN包(SYN=K) ,即SYN+ACK包,此时服务器V状态;
第三次握手:客户端收到服务器的SYN+ACK 包,向服务器发送确认包ACK(ACK=K+1),此包发送完毕,客户端和服务器进ESTABLISHED状态,完成三次握手。
完成三次握手,客户端与服务器开始传送数据,。
四次分手:
由于TCP连接是全双工的,因此每个方向都必须单独进行关闭。这个原则是当一方完成它的数据发送任务后就能发送一个FIN来终止这个方向的连接。收到一个FIN 只意味着这一方向上没有数据流动,一个TCP连接在收到一个FIN后仍能发送数据。首先进行关闭的一方将执行主动关闭,而另一方执行被动关闭。
(1)客户端A发送一个FIN,用来关闭客户A到服务器B的数据传送。
(2)服务器B收到这个FIN,它发回一个ACK, 确认序号为收到的序号加1。和SYN一样,一个FIN将占用一个序号。
(3)服务器B关闭与客户端A的连接,发送一个FIN给客户端A。
(4) 客户端A发回ACK报文确认,并将确认序号设置为收到序号加1。
1.为什么建立连接协议是三次握手,而关闭连接却是四次握手呢?
这是因为服务端的LISTEN状态下的SOCKET当收到SYN报文的建连请求后,它可以把ACK和SYN (ACK 起应答作用,而SYN起同步作用)放在一个报文里来发送。
但关闭连接时,当收到对方的FIN报文通知时,它仅仅表示对方没有数据发送给你了;但未必你所有的数据都全部发送给对方了,所以你可以未必会马上会关闭SOCKET,也即你可能还需要发送一些数据给对方之后,再发送FIN报文给对方来表示你同意现在可以关闭连接了,所以它这里的ACK报文和FIN报文多数情况下都是分开发送的.。
2.为什么TIME WAIT状态还需要等2MSL后才能返回到CLOSED状态? 这是因为虽然双方都同意关闭连接了,而且握手的4个报文也都协调和发送完毕,按理可以直接回到CLOSED状态(就好比从SYN SEND 状态到ESTABLISH状态那样) ;但是因为我们必须要假想网络是不可靠的,你无法保证你最后发送的ACK报文会一定被对方收到,因此对方处于LAST_ ACK 状态下的SOCKET可能会因为超时未收到ACK报文,而重发FIN报文,所以这个TIME _WAIT状态的作用就是用来重发可能丢失的ACK报文。。
TCP协议进行客户端与服务器端进行通讯.
tomcat使用的是http协议进行通信的
package com.mumulx.socket.demo01;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 15:47
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.sql.SQLOutput;
/*可以通过浏览器进行通信
* 127.0.0.1:8080
* */
class TcpServer{
public static void main(String[] args) throws IOException {
System.out.println("Tcp协议服务器端启动。。");
//创建服务端连接
ServerSocket serverSocket = new ServerSocket(8080);
//接收客户端请求阻塞功能
Socket accept = serverSocket.accept();//accept也是阻塞的
InputStream inputStream = accept.getInputStream();
//将字节流转换成String类型
byte[] bytes = new byte[1024];
String result = new String(bytes, 0, inputStream.read(bytes));//read是阻塞的
System.out.println("服务端接收客户端内容" + result);
serverSocket.close();
}
}
/*没有启动服务端时 启动客户端会报错java.net.ConnectException: Connection refused: connect*/
public class TcpClient {
public static void main(String[] args) throws IOException {
System.out.println("socket tcp 客户端启动....");
Socket socket = new Socket("127.0.0.1", 8080);
OutputStream outputStream = socket.getOutputStream();
outputStream.write("hahahhaha".getBytes());
socket.close();
}
}
tcp支持,怎么解决tcp协议同时支持多个请求
同时支持多个请求使用线程池还是多线程好?线程池(可以节约内存)
/*多个连接*/
class TcpServer1 {
public static void main(String[] args) throws IOException {
System.out.println("Tcp协议服务器端启动。。");
ExecutorService executorService = Executors.newCachedThreadPool();
//创建服务端连接
ServerSocket serverSocket = new ServerSocket(8080);
try {
while (true) {
//接收客户端请求阻塞功能
Socket accept = serverSocket.accept();
executorService.execute(()->{
InputStream inputStream = null;
try {
inputStream = accept.getInputStream();
//将字节流转换成String类型
byte[] bytes = new byte[1024];
String result = new String(bytes, 0, inputStream.read(bytes));
System.out.println("服务端接收客户端内容" + result);
OutputStream outputStream = accept.getOutputStream();
outputStream.write("接收到请求".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
NIO
NIO编程jdk1 .4在IO基础之上进行改进(NIO中有一个非阻塞IO、面向缓冲区)
IO阻塞、面向流
NIO效率高
NIO结构图
什么是NIO?
Java NIO(New IO)是一个可以替代标准Java IO API的IO API (从Java 1.4开始),Java NIO提供了与标准IO 不同的IO 工作方式。 Java NIO: Channels and Buffers (通道和缓冲区)。
标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channe|) 和缓冲区(Buffer) 进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Java NIO: Non-blocking IO (非阻塞IO)。
Java NIO可以让你非阻塞的使用IO , 例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
Java NIO: Selectors (选择器)。
Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
注意:传统IO是单向。NIO 类似。
区别
IO | NIO |
---|---|
面向流 | 面向缓冲区 |
阻塞IO | 非阻塞IO |
无 | 选择器 |
断点续传:多线程+文件拼接。将一个文件拆分成多个线程(多个请求分段下载) 最好拼接成一个文件
Buffer的数据存取
一个用于特定基本数据类行的容器。有java.nio 包定义的,所有缓冲区都是抽象类Buffer的子类。。
package com.mumulx.nio;
import org.junit.Test;
import java.nio.ByteBuffer;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 16:36
*/
/*缓冲区介绍*/
public class BuffTest {
/*Buffer类中含有着四个参数
* private int mark = -1;
private int position = 0;//缓冲区正在操作的位置 默认从0开始
private int limit;//界面(缓冲区可用大小)
private int capacity;//缓冲区最大的容量,一但声明不能改变
*
* 核心方法
* put 往buff中存放数据
* get 获取数据
*
* */
@Test
public void test() {
try {
//初始化ByteBuffer大小
ByteBuffer allocate = ByteBuffer.allocate(1024);
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println("往buffer中存放数据");
allocate.put("hahah".getBytes());
System.out.println("----------------");
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println("读取值");
/* byte[] bytes = new byte[allocate.limit()];//从5开始读
allocate.get(bytes);
System.out.println(new String(bytes, 0, bytes.length));
//会报错java.nio.BufferUnderflowException
*/
/*读取值时要开启读取模式*/
allocate.flip();//开启读取模式将position的值变为0 从0开始读
byte[] bytes = new byte[allocate.limit()];
allocate.get(bytes);
System.out.println(new String(bytes, 0, bytes.length));
System.out.println(allocate.position());
System.out.println(allocate.limit());//读取之后limit的值变为了5
System.out.println(allocate.capacity());
System.out.println("************重复读取");
/*
//读取值之后position的值又变成5了
byte[] bytes1 = new byte[allocate.limit()];//重复读取会报错java.nio.BufferUnderflowException
allocate.get(bytes);
System.out.println(new String(bytes1, 0, bytes.length));*/
/*可以设置重复读取*/
allocate.rewind();//设置重复读取 从上一次的下标的位置开始读取
byte[] bytes1 = new byte[allocate.limit()];//重复读取会报错java.nio.BufferUnderflowException
allocate.get(bytes1);
System.out.println(new String(bytes1, 0, bytes.length));
/*清空缓存区 只把下标还原了,数据被遗忘了*/
System.out.println("清空缓存区");
allocate.clear();
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println((char) allocate.get());//清空之后 值仍然存在 我们还可以取出
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
/*缓冲区是NIO 提高给传输文件和通道一起配合使用,存储数据
* Buffer
* ByteBuffer
* LongBuffer
* IntegerBuffer
* FloatBuffer
* DoubleBuffer
*
* */
}
}
结果
0
1024
1024
往buffer中存放数据
----------------
5
1024
1024
读取值
hahah
5
5
1024
************重复读取
hahah
清空缓存区
0
1024
1024
h
make与rest用法
标记(mark) 与重置(reset) :标记是一个索引,通过Buffer中的mark()方法指定Buffer中一-个特定的position,之后可以通过调用reset()方法恢复到这个position。
package com.mumulx.nio;
import java.nio.ByteBuffer;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 20:44
*/
public class BuffTest02 {
public static void main(String[] args) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String str = "abcd";
byteBuffer.put(str.getBytes());
//开启读模式
byteBuffer.flip();
byte []bytes = new byte[byteBuffer.limit()];
byteBuffer.mark();//打印一个标记,标记位置为0
byteBuffer.get(bytes,0,2);//从position开始(0)读取3个字节存储到bytes 0-2 获取完毕后position为2
System.out.println(new String(bytes,0,2));
System.out.println(byteBuffer.position());
byteBuffer.reset();//重置 还原到mark标记的位置,position为0
byteBuffer.get(bytes,2,2);//从position(0)开始两个字节 读取到bytes的2-4中
System.out.println(new String(bytes,2,2));
System.out.println(byteBuffer.position());
}
}
结果
ab
2
ab
2
直接缓冲区与非直接缓冲的区别
非直接缓冲区:通过allocate() 方法分配缓冲区,将缓冲区建立在JVM的内存中。.
直接缓冲区:通过allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率。
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//非直接缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocateDirect(1024);//直接缓冲区
缓存区分为两种分别为:直接缓存区、非缓冲区
非直接缓冲区主要存放在jvm缓冲区中,来回拷贝。
直接缓冲区—存放物理内存,不需要来回拷贝
存放在物理内存比jvm缓冲区速度快
数据放在非直接缓冲区比直接缓冲区更加安全。
直接缓存区非常占内存
字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则Java 虚拟机会尽最大努力直接在此缓冲区上执行本机I/O 操作。也就是说,在每次调用基础操作系统的一个本机I/O 操作之前(或之后),虛拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
直接字节缓冲区可以通过调用此类的allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
直接字节缓冲区还可以通过FileChannel 的map() 方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer,。Java 平台的实现有助于通过JNI(本地方法接口) 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码 中执行显式缓冲区管理。。
IO中的缓冲区–非直接缓冲区
通道(Channel)的原理获取
通道表示打开到IO设备(例如:文件、套接字)的连接。若需要使用NIO系统,需要获取用于连接IO设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。Channel负 责传输,Buffer 负责存储。通道是由java.nio.channels包定义的。Channe| 表示IO源与目标打开的链接。Channel类似于传统的“流”。只不过Channel 本身不能直接访问数据,Channel只能与Buffer进行交互。
java.nio.channeIs.Channel接口:
FileChannel
SocketChannel
ServerSocketChannel
DatagramChannel
获取通道。 1. Java针对支持通道的类提供了getChannel() 方法。
本地IO:
FilelnputStream/File0utputStream.
RandomAccessFilew
网络IO:。
Socket
ServerSocket DatagramSocket
在JDK 1.7中的NIO.2针对各个通道提供了静态方法open()
在JDK1.7中的NIO.2的Files工具类的newByteChannel ()。
获取代码运行时间
1)、创建一个 Timer 工具类对象,执行一些操作然后调用 Timer 的 duration() 方法产生以毫秒为单位的运行时间。
// onjava/Timer.java
package onjava;
import static java.util.concurrent.TimeUnit.*;
public class Timer {
private long start = System.nanoTime();
public long duration() {
return NANOSECONDS.toMillis(System.nanoTime() - start);
}
public static long duration(Runnable test) {
Timer timer = new Timer();
test.run();
return timer.duration();
}
}
2)、是以毫秒为单位计算的。
//伪代码
long startTime=System.currentTimeMillis(); //获取开始时间
doSomeThing(); //测试的代码段
long endTime=System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间: "+(end-start)+"ms");
3)、以纳秒为单位计算的。
//伪代码
long startTime=System.nanoTime(); //获取开始时间
doSomeThing(); //测试的代码段
long endTime=System.nanoTime(); //获取结束时间
System.out.println("程序运行时间: "+(end-start)+"ns");
源码
package com.mumulx.nio;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Timer;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 21:20
*/
public class BuffTest03 {
/*非直接缓冲区 读写操作*/
@Test
public static void test01() {
try (
//读入流
FileInputStream fileInputStream = new FileInputStream("1.png");
//写入流
FileOutputStream fileOutputStream = new FileOutputStream("2.png");
//创建通道
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();
) {
//分配指定大小缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
while (inChannel.read(buf) != -1) {
//开启读取模式
buf.flip();
//将数据写入到通道中
outChannel.write(buf);
buf.clear();
}
//关闭通道、关闭连接
} catch (IOException e) {
e.printStackTrace();
}
}
/*直接缓冲区读写操作*/
@Test
public static void test02() {
try (
//创建管道
FileChannel inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.png"),StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
) {
//定义映射文件
MappedByteBuffer inMap = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMap = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区进行操作
byte[] bytes = new byte[inMap.limit()];
inMap.get(bytes);
outMap.put(bytes);
} catch (IOException e) {
e.printStackTrace();
}
}
}
分散读取与聚集写入
分散读取(scattering Reads): 将通道中的数据分散到多个缓冲区中
聚集写入(gathering Writes): 将多个缓冲区的数据聚集到通道中
package com.mumulx.nio;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 21:55
*/
public class BuffTest04 {
public static void main(String[] args) {
try(
/*随机访问*/
RandomAccessFile raf = new RandomAccessFile("test.txt", "rw");
RandomAccessFile raf2 = new RandomAccessFile("test2.txt", "wr");
) {
//获取通道
FileChannel channel = raf.getChannel();
//分配指定大小指定缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
//分散读取
ByteBuffer[] bufs={buf1, buf2};
channel.read(bufs);
for (ByteBuffer byteBuffer : bufs) {
//切换成读模式
byteBuffer.flip();
}
System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("*********************************");
System.out.println(new String(bufs[1].array(), 1, bufs[1].limit()));
System.out.println("---------------聚集读取");
//获取通道
FileChannel channel1 = raf2.getChannel();
channel1.write(bufs);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
字符集Charset
编码:字符串->字节数组。
解码:字节数组->字符串。
package com.mumulx.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/28
* Time: 22:23
*/
public class BuffTest05 {
public static void main(String[] args) throws CharacterCodingException {
//获取解码器
Charset charset = Charset.forName("GBK");
//获取加密器
CharsetEncoder ce = charset.newEncoder();
//获取解码器
CharsetDecoder cd = charset.newDecoder();
CharBuffer charBuffer = CharBuffer.allocate(1024);
charBuffer.put("hahahha");
charBuffer.flip();
//编码加密
ByteBuffer buBuff = ce.encode(charBuffer);
for (int i = 0; i <7 ; i++) {
System.out.println(buBuff.get());
}
buBuff.flip();
//编码解密
/* CharBuffer decode = cd.decode(buBuff);
System.out.println(decode.toString());*/
CharBuffer decode = Charset.forName("UTF-8").newDecoder().decode(buBuff);
System.out.println(decode.toString());
/*如果解码不是GBK的时候 会报错(中文会报错)*/
}
}
NIO是通过通道(Channel)和缓冲(Buffer)进行操作的
同步阻塞与同步非阻塞
NIO通道(传输数据) +缓冲区(存放数据) +直接缓冲区与非直接缓冲区
非阻塞IO与阻塞IO区别
IO (BIO)同步、阻塞IO
NIO (1dk1.7之前)同步、 非阻塞IO
JDK之后AIO (异步、非阻塞IO)
伪异步形式缺点:没有真正解决阻塞IO核心。 缺点:占CUU内存 线程池:解决频繁创建线程,可以服用。
/*多个连接*/
class TcpServer2 {
public static void main(String[] args) throws IOException {
System.out.println("Tcp协议服务器端启动。。");
//创建服务端连接
ServerSocket serverSocket = new ServerSocket(8080);
try {
while (true) {
//接收客户端请求阻塞功能
Socket accept = serverSocket.accept();
new Thread(()->{
InputStream inputStream = null;
try {
inputStream = accept.getInputStream();
//将字节流转换成String类型
byte[] bytes = new byte[1024];
String result = new String(bytes, 0, inputStream.read(bytes));
System.out.println("服务端接收客户端内容" + result);
} catch (IOException e) {
e.printStackTrace();
}
},"new thread");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
/*线程池*/
class TcpServer1 {
public static void main(String[] args) throws IOException {
System.out.println("Tcp协议服务器端启动。。");
ExecutorService executorService = Executors.newCachedThreadPool();
//创建服务端连接
ServerSocket serverSocket = new ServerSocket(8080);
try {
while (true) {
//接收客户端请求阻塞功能
Socket accept = serverSocket.accept();
executorService.execute(()->{
InputStream inputStream = null;
try {
inputStream = accept.getInputStream();
//将字节流转换成String类型
byte[] bytes = new byte[1024];
String result = new String(bytes, 0, inputStream.read(bytes));
System.out.println("服务端接收客户端内容" + result);
OutputStream outputStream = accept.getOutputStream();
outputStream.write("接收到请求".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
伪异步形式,不靠谱。线程池,没有真正核心解决阻塞IO问题
NIO——-同步非阻塞,用于网络相关
选择器是管理通道的,在服务端。服务端直接与选择器进行通信
IO(BIO()和NIO(区别:其本质就是阻塞和非阻塞的区别
阻塞概念:应用程序在获取网络数据的时候,如果网络传输数据很慢,就会一直等待,直到传输完毕为止。
非阻塞概念:应用程序直接可以获取已经准备就绪好的数据,无需等待。
IO为同步阻塞形式,NIO为同步非阻塞形式, NIO并没有实现异步,在JDK1.7后升级NIO库包,支持异步非阻塞。
模型NIO2. 0(AIO)。 BIO:同步阻塞式10,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
AIO(NIO.2):异步非阻塞式IO,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。
同步时,应用程序会直接参与IO读写操作,并且我们的应用程序会直接阻塞到某一个方法上,直到数据准备就绪: 或者采用轮训的策略实时检查数据的就绪状态,如果就绪则获取数据。
异步时,则所有的IO读写操作交给操作系统,与我们的应用程序没有直接关系,我们程序不需要关心IO读写,当操作系统完成了IO读写操作时,会给我们应用程序发送通知,我们的应用程序直接拿走数据极即可。
伪异步
由于BIO一个客户端需要一个线程去处理,因此我们进行优化,后端使用线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大的线程数N的比例关系,其中M可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
原理:
当有新的客户端接入时,将客户端的Socket封装成一个Task(该Task任务实现了java 的Runnable接口)投递到后端的线程池中进行处理,由 于线程池可以设置消息队列的大小以及线程池的最大值,因此,它的资源占用是可控的,无论多少个客户端的并发访问,都不会导致资源的耗尽或宕机。。
IO模型关系
同步阻塞I/O(BIO) | 伪异步I/O | 非阻塞I/O(BIO) | 异步I/O(AIO) | |
---|---|---|---|---|
客户端个数:I/O线程 | 1:1 | M:N(其中M可以大于N) | M:(1个I/O线程处理多个客户端连接) | M:0(不需要启动额外的I/O线程,被动回调) |
I/O类型(阻塞) | 阻塞I/O | 阻塞I/O | 非阻塞I/O | 非阻塞I/O |
选择器Key
SelectionKey.OP_ACCEPT;//可接收连接
SelectionKey.OP_CONNECT;//可连接
SelectionKey.OP_READ;//可读
SelectionKey.OP_WRITE;//可写
小结代码
package com.mumulx.nio;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 10:42
*/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Date;
import java.util.Iterator;
/* Nio异步通信*/
class NioClient {
public static void main(String[] args) {
System.out.println("客户端已经启动");
try (
//1.创建socket通道
SocketChannel schannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));
){
//2.切换异步非阻塞
schannel.configureBlocking(false);//jdk1.7以上 客户端调用非阻塞是防止read方法不会阻塞
//3.指定缓冲区大小
ByteBuffer buff = ByteBuffer.allocate(1024);
buff.put(new Date().toString().getBytes());
//4.切换到读取模式
buff.flip();
schannel.write(buff);
//schannel.write(ByteBuffer.wrap(new Date().toString().getBytes()));
buff.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*NIO服务端
*
* tcp协议:可以通过浏览器进行访问
* */
class NioServer{
public static void main(String[] args) {
System.out.println("服务端已经启动了。。。");
try {
//1.创建服务通道
ServerSocketChannel sChannel = ServerSocketChannel.open();
//2.切换异步通信
sChannel.configureBlocking(false);//jdk1.7以上
//3.绑定连接
sChannel.bind(new InetSocketAddress(8080));
//4.获取选择器
Selector selector = Selector.open();
//5.将通道注册到选择器中 并监听已经接收到了的时间
sChannel.register(selector, SelectionKey.OP_ACCEPT);
//6.轮询获取“已经准备就绪的事件”
while (selector.select() > 0) {
//7.获取当前选择器,有注册已经监听到事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
//8.获取准备就绪事件
SelectionKey sk = iterator.next();
//9.判断事件准备就绪
if (sk.isAcceptable()) {
//10.若接收就绪,可以获取客户端连接
SocketChannel socketChannel = sChannel.accept();
//11.设置为阻塞事件
socketChannel.configureBlocking(false);//异步非阻塞IO 设置Bio的Socket accept = serverSocket.accept();不阻塞
//12.将该通道注册到服务器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
//13.获取当前选择“准备就绪”的通道
SocketChannel socketChannel = (SocketChannel)sk.channel();
//14.读取数据
int len = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class NIOTest {
}
NIO通过设置来实现非阻塞
客户端
//2.切换异步非阻塞
schannel.configureBlocking(false);//jdk1.7以上 客户端调用非阻塞是防止read方法不会阻塞
bio
inputStream = accept.getInputStream();
//将字节流转换成String类型
byte[] bytes = new byte[1024];
String result = new String(bytes, 0, inputStream.read(bytes));
服务端
//11.设置为阻塞事件
socketChannel.configureBlocking(false);//异步非阻塞IO 设置Bio的Socket accept = serverSocket.accept();不阻塞
bio
//接收客户端请求阻塞功能
Socket accept = serverSocket.accept();
一次请求做的事情
单线程模式
Netty
讲讲Netty的特点?
高并发
Netty是一 款基于NIO (Nonblocking I/O, 非阻塞I0)开发的网络通信框架,对比于BIO (Blocking I/O, 阻塞I0),他的并发性能得到了很大提高。
传输快
Netty的传输快其实也是依赖了NIO的一一个特性一零拷贝。
封装好 Netty封装了NI0操作的很多细节,提供易于使用的API.
什么是Netty:
Netty是一个基于JAVA NI0类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。
为什么netty? 传统NIO非阻塞代码bug、 进行一些封装。事件驱动
netty框架是一个通讯框架,NIO框架也是通讯框架。
springcloud使用的http协议
Netty应用场景
- 分布式开源框架中dubbo、Zookeeper,RocketMQ底层rpc通讯使用就是netty。。
- 游戏开发中,底层使用netty通讯。。
netty特征是异步通讯框架、异步非阻塞、高可用、事件驱动。
inetty应用场景
rpc远程调用框架dubbo底层就是通过nettynetty用的底层优势nio。
zk、消息中间(roketmq) 、主流rpc、通讯都会采用netty。
游戏开发服务器端都是通过netty通讯。
为什么选择了netty
在本小节,我们总结下为什么不建议开发者直接使用JDK的NIO类库进行开发的原因:。 1)NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、 SocketChannel、ByteBuffer等;。 2)需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序;。 3)可 靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大;。 4) JDK NIO的BUG,例如臭名昭著的epollbug,它会导致Selector空轮询,最终导致CPU100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概变低
服务端
package com.mumulx.netty;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 11:35
*/
/*快速重写父类方法快捷键alt+shift+s
* ctrl+alt+r
* */
class ServerHanlder extends SimpleChannelHandler {
/*通道被关闭的时候会触发*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
/*必须要建立连接,关闭通道的时候才会触发*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
/*接受出现异常*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
/*接收客户端数据*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
/**/
super.messageReceived(ctx, e);
System.out.println("messageReceived :参数:");
System.out.println(e.getMessage());
ctx.getChannel().write("hello world");
}
}
public class NettyServer {
public static void main(String[] args) {
//1.创建服务对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2.创建两个线程池,第一个监听端口号Nio监听
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService wook = Executors.newCachedThreadPool();
//3.将线程池放到工程中
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos,wook));
//4.设置管道工程
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override //设置管道
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//传输数据的时候直接为string类型
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
//设置事件监听类
pipeline.addLast("serverHanlder", new ServerHanlder());
return pipeline;
}
});
//绑定端口号
serverBootstrap.bind(new InetSocketAddress(8080));
System.out.println("服务器端已经被启动....");
/*//是非阻塞的
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("每隔0.5秒打印......");
}*/
}
}
客户端
package com.mumulx.netty;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 16:46
*/
class ClientHanlder extends SimpleChannelHandler {
/*通道被关闭的时候会触发*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
/*必须要建立连接,关闭通道的时候才会触发*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
/*接受出现异常*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
/*接收客户端数据*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
/**/
super.messageReceived(ctx, e);
System.out.println("messageReceived");
System.out.println("服务端向客户端回复的内容");
System.out.println(e.getMessage());
}
}
public class NettyClient {
public static void main(String[] args) {
//1.创建服务对象
ClientBootstrap clientBootstrap = new ClientBootstrap();
//2.创建两个线程池,第一个监听端口号Nio监听
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService wook = Executors.newCachedThreadPool();
//3.将线程池放到工程中
clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos,wook));
//4.设置管道工程
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override //设置管道
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//传输数据的时候直接为string类型
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
//设置事件监听类
pipeline.addLast("clientHanlder", new ClientHanlder());
return pipeline;
}
});
//绑定端口号
ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = connect.getChannel();
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("请输入内容");
channel.write(scanner.next());
}
}
}
io与Nio的区别
最大区别:非阻塞、通道、缓冲区
长连接与短连接
粘包与拆包
netty5.0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>network</artifactId>
<groupId>com.mumulx</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nettywork01</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.3.19.GA</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.19.GA</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.mumulx;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 19:30
*/
public class NettyServer01 {
public static void main(String[] args) {
System.out.println("服务端已经启动..........");
//1.创建两个线程池,一个负责接收客户端,一个进行传输
NioEventLoopGroup pGroup = new NioEventLoopGroup();
NioEventLoopGroup cGroup = new NioEventLoopGroup();
//2.创建辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup,cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024)
//3.设置缓冲区与发送区大小
.option(ChannelOption.SO_SNDBUF,3*1024).option(ChannelOption.SO_RCVBUF,32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());//设置返回结果为String类型
socketChannel.pipeline().addLast(new ServerHandler());
}
});
try {
//启动
ChannelFuture cf = b.bind(8080).sync();
//关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ServerHandler extends ChannelHandlerAdapter {
/*当通道被调用,执行方法(拿到数据)*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String value = (String) msg;
System.out.println("服务器端收到客户端的msg:"+value);
//ctx.writeAndFlush("hao");
ctx.write("hahah ");
ctx.flush();
super.channelRead(ctx, msg);
}
}
package com.mumulx;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 19:48
*/
/*长连接:与服务端连接后不断开
*
* 客户端端断开后客户端端会报异常io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught(关闭时没有四次挥手)
*
*
* */
class ClientHandler extends ChannelHandlerAdapter {
/*接受数据*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
String value = (String) msg;
System.out.println("client msg:" + value);
}
}
public class NettyClient01 {
public static void main(String[] args) {
System.out.println("客户端已经启动:");
//创建负责接受客户端连接
NioEventLoopGroup pGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new ClientHandler());
}
});
try {
ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
/*这是两次请求*/
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja01".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja02".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja03 ".getBytes()));
Thread.sleep(1000);
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja04 ".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja05 ".getBytes()));
//等待客户端端口号关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
为什么时两次请求呢?底层会优化。自动粘包
一个完整的业务可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这个就是TCP的拆包和封包问题。。 下面可以看一张图,是客户端向服务端发送包:。
1.第一种情况,Data1和Data2都分开发送到了Server端,没有产生粘包和拆包的情况。 2.第二种情况,bata1和Data2数据粘在了一起,打成了一个大的包发送到Server端,这个情况就是粘包。 3.第三种情况,Data2被分离成Data2_ 1和Data2_ 2, 并且Data2 1在Data1之前到达了服务端,这种情况就产生了拆包。
由于网络的复杂性,可能数据会被分离成N多个复杂的拆包/粘包的情况,所以在做TCP服务器的时候就需要首先解决拆包
解决办法.
消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。(很少使用)
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));//达到10个长度就发
包尾添加特殊分隔符,例如每条报文结東都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/*解决粘包 客户端服务端都需要添加*/
ByteBuf buf = Unpooled.copiedBuffer("_mumu".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new ClientHandler());
}
});
将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段,
将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段,
package com.mumulx;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/*
* Created by IntelliJ IDEA.
* User: 木木
* Date: 2020/5/29
* Time: 19:48
*/
/*长连接:与服务端连接后不断开
*
* 客户端端断开后客户端端会报异常io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught(关闭时没有四次挥手)
*
*
* */
class ClientHandler extends ChannelHandlerAdapter {
/*接受数据*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
String value = (String) msg;
System.out.println("client msg:" + value);
}
}
public class NettyClient01 {
public static void main(String[] args) {
System.out.println("客户端已经启动:");
//创建负责接受客户端连接
NioEventLoopGroup pGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/*解决粘包 客户端与服务端都需要添加 当数据中含有后缀时才接受,否则阻塞掉 */
ByteBuf buf = Unpooled.copiedBuffer("_mumu".getBytes());//
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new ClientHandler());
}
});
try {
ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
/*这是两次请求
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja01".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja02".getBytes()));
Thread.sleep(1000);
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja03 ".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja04 ".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja05 ".getBytes()));
*//*
这是一次请求
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja01".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja02".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja03 ".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja04 ".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja05 ".getBytes()));
*/
/*这是五次请求 通过后缀解决粘包问题
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja01_mumu".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja02_mumu".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja03_mumu".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja04_mumu".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja05_mumu".getBytes()));*/
//cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja06".getBytes()));//这样服务端是接受不到这个数据的 ,会阻塞掉
/*这样就发过去了 二者会拼接*/
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("hahja06".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("_mumu".getBytes()));
//等待客户端端口号关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
TcP粘包拆包间题
netty当中的解码器
- FixedLengthFrameDecoder 基于固定长度的解码器
- LineBasedFrameDecoder基于行(\n,\r) 的解码器
- DelimiterBasedFrameDecoder 基于分隔符的解码器(二 or 三failFast属性判断是否抛出异常,必然成立一个将抛出常)
- 一:找到分隔符,超过最大长度,直接跳过本次要读取的数据(跳过到分隔符之前的数据),抛出异常
- 二: 没有找到分割符,超过最大长度,直接跳过整个buffer中的数据,设置开启丢弃模式,根据fai1Fast属性判断是否马上抛出异常
- 三:找到了分隔符,处于了丢弃模式,设置关闭丢弃模式,跳过本次要读取的数据(跳过到分隔符之前的数据),设置已经丢弃的字节长度为0,根据fai1Fast属性判断是否马上抛出异常
- 四:没有找到分割符,没有处于丢弃模式,累加丢弃的字节,丢弃整个buffer中的数据
- LengthFieldBasedFrameDecoder 基于长度的解码器
Netty 的线程模型是什么? 这个要看我们如何编码; NioEventLoopGroup默 认线程数为cpu核心数的两倍 如果用了两个NioEventloopGroup,且指定工作线程数不为一,则是主从多线程模型; 如果用了两个NioEventLoopGroup,且指定工作线程数为一,则是主从单线程模型 如果用了一个NioEventLoopGroup,且指定线程数不为一,则是多线程模型 如果用了一个NioEventLoopGroup,且指定线程数为一,则是单线程模型
netty bossGroup默认为CPU核 数的两陪的作用?
https://stackoverflow.com/guestions/34275138/why-do-we-really-need-multiple-netty-boss-threads
Java通过Executors提供四种线程池,分别为:
- newCachedThreadPool创建一个可 缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool创建一个定长线程池, 可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。
- newsingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
序列化协议与自定义序列化协议
序列化定义
序列化(serialization)就是将对象序列化为二进制形式(字节数组),一般也将序列化称为编码(Encode) ,主要用于网络传输、数据持久化等;。
反序列化(deserialization) 则是将从网络、磁盘等读取的字节数组还原成原始对象,以便后续业务的进行, 一般也将反序列化称为解码(Decode),主要用于网絡传输对象的解码,以便完成远程调用。
序列化协议“鼻祖” 我知道的第一种序列化协议就是Java 默认提供的序列化机制,需要序列化的Java对象只需要实现Serializable / Externalizable接口并生成序列化ID,这个类就能够通过Objectlnput 和ObjectQutput 序列化和反序列化,若对Java默认的序列化协议不了解,或是遗忘了,请参考:序列化详解。
但是Java默认提供的序列化有很多问题,主要有以下几个缺点:。
无法跨语言:我认为这对于Java序列化的发展是致命的“失误”,因为Java序列化后的字节数组,其它语言无法进行反序列化;
序列化后的码流太大:相对于目前主流的序列化协议,Java 序列化后的码流太大;
序列化的性能差:由于Java序列化采用同步阻塞|0,相对于目前主流的序列化协议,它的效率非常差。
影响序列化性能的关键因素
- 序列化后的码流大小(网络带宽的占用) ;
- 序列化的性能(CPU资源占用)
- 是否支持跨语言(异构系统的对接和开发语言切换)。。
什么是序列化?
序列化:将对象序列化成二进制文件,保存在硬盘上。
反序列:将二进制文件(硬盘上),反序列化成对象。
序列化时要定义个final static:
远程调用(网络通讯)
hibernate
rpc远程调用的时候现在springcloud (微服务)、dubbo (序列化)接口调用 –服务器
public interface UserInterface {
public void add());
}
客户端
@auto
private UserInterface userInterface;
dubbo netty
序列化的协议:json、xml(数据交换格式)
几种流行的序列化协议比较.
XML
(1) 定义:
XML (Extensible Markup Language)是一种常用的序列化和反序列化协议,它历史悠久, 从1998年的1.0 版本被广泛使用至今。。
(2)优点
- 人机可读性好。
- 可指定元素或特性的名称
(3)缺点。
- 序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息。
- 类必须有一个将由XmISerializer 序列化的默认构造函数。
- 只能序列化公共属性和字段,
- 不能序列化方法。
- 文件庞大,文件格式复杂,传输点带宽。
(4)使用场景。
- 当做配置文件存储数据。
- 实时数据转换。
JSON.
(1)定义:
JSON(JavaScript Object Notation, JS对象标记)是一种轻量级的数据交换格式。它基于ECMAScript (w3c制定的is规范)的一个子集,JSON 采用与编程语言无关的文本格式,但是也使用了类C语言(包括C,C++, C#, Java,JavaScript, Perl, Python 等)的习惯,简洁和清晰的层次结构使得JSON 成为理想的数据交换语言。
(2)优点
- 前后兼容性高v
- 数据格式比较简单,易于读写。
- 序列化后数据较小,可扩展性好,兼容性好。
- 与XML相比,其协议比较简单,解析速度比较快,
(3) 缺点。
- 数据的描述性比XML差,
- 不适合性能要求为ms级别的情况。
- 额外空间开销比较大。
(4)适用场景(可替代XML)
- 跨防火墙访问。
- 可调式性要求高的情况。
- 基于Web browser的Ajax请求,
- 传输数据量相对小,实时性要求相对低(例如秒級别)的服务。
Fastjson.
(1)定义
Fastison是一个Java语言编写的高性能功能完善的JSON库。它采用一-种“假定有序快速匹配”的算法,把JSON Parse的性能提升到极致。
(2)优点。
- 接口简单易用。
- 目前java语言中最快的json库。
(3)缺点
- 过于注重快,而偏离了“标准”及功能性。
- 代码质量不高,文档不全。
(4)适用场景。
- 协议交互。
- Web输出。
- Android客户端
Thrift
(1) 定义 Thrift并不仅仅是序列化协议,而是一个RPC框架。它可以让你选择客户端与服务端之间传输通信协议的类别,即文本(text)和二进制(binary)传输协议,为节约带宽,提供传输效率,- -般情况下使用二进制类型的传输协议。
(2)优点
- 序列化后的体积小,速度快。
- 支持多种语言和丰富的数据类型。
- 对于数据字段的增删具有较强的兼容性。
- 支持二进制压縮編码。
(3)缺点。
- 使用者较少。
- 跨防火墙访问时,不安全。
- 不具有可读性,调试代码时相对困难。
- 不能与其他传输层协议共同使用(例如HTTP)。
- 无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议。
(4)适用场景。
分布式系统的RPC解决方案
Avro.
(1) 定义
Avro属于Apache Hadoop的一个子项目。Avro 提供两种序列化格式: JSON 格式或者Binary 格式。Binary格式在空间开销和解析性能方面可以和Protobuf媲美,Avro的产生解决了JSON的冗长和没有IDL的问题, (2)优点
- 支持丰富的数据类型。
- 简单的动态语言结合功能
- 具有自我描述属性
- 提高了数据解析速度w
- 快速可压縮的二进制数据形式
- 可以实现远程过程调用RPC
- 支持跨编程语言实现。
(3) 缺点。
- 对于习惯于静态类型语言的用户不直观。
(4)适用场景
在Hadoop.中做Hive、Pig 和MapReduce的持久化数据格式。
Protobuf.
(1)定义
protocol buffers由爸歌开源而来,在爸歌内部久经考验。它将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POIO对象和Protobuf相关的方法和属性。
(2)优点
- 序列化后码流小,性能高+
- 结构化数据存储格式(XMLJSON等)。
- 通过标识字段的顺序,可以实现协议的前向兼容
- 结构化的文档更容易管理和维护。
(3) 缺点。
- 需要依赖于工具生成代码。
- 支持的语言相对较少,官方只支持Java、C++、Python+
(4)适用场景。
- 对性能要求高的RPC调用。
- 具有良好的跨防火墙的访问属性。
- 适合应用层对象的持久化。
其它
- protostuff基于protobuf 协议,但不需要配置proto文件,直接导包即
- Jboss marshaling可以直接序列化java类,无须 实java.io.Serializable接口。
- Message pack 一个高效的二进制序列化格式w
- Hessian采用二进制协议的轻量級remotingonhttp工具。
- kryo基于protobuf 协议,只支持java语言,需要注册(Registration),然后序列化(Output) ,反序列化(Input)。
性能比较
时间
空间
析上图知:
- XML序列化(Xstream)无论在性能和简洁性上比较差。
- Thrift与Protobuf相比在时空开销方面都有一定的劣势。
- Protobuf和Avro在两方面表现都非常优越。
9、选型建议
不同的场景适用的序列化协议:
对于公司间的系统调用,如果性能要求在100ms以上的服务,基于XML的SOAP协议是一个值得考虑的方案。
基于Web browser的Ajax,以及Mobile app与服务端之间的通讯,JSON协议是首选。对于性能要求不太高,或者以动态类型语言为主,或者传输数据载荷很小的的运用场景,JSON也是非常不错的选择。
对于调试环境比较恶劣的场景,采用JSON或XML能够极大的提高调试效率,降低系统开发成本。
当对性能和简洁性有极高要求的场景,Protobuf,Thrift,Avro之间具有一定的竞争关系。
对于T级别的数据的持久化应用场景,Protobuf和Avro是首要选择。如果持久化后的数据存储在Hadoop子项目里,Avro会是更好的选择。
由于Avro的设计理念偏向于动态类型语言,对于动态语言为主的应用场景,Avro是更好的选择。
对于持久层非Hadoop项目,以静态类型语言为主的应用场景,Protobuf会更符合静态类型语言工程师的开发习惯。
如果需要提供一个完整的RPC解决方案,Thrift是一个好的选择。
如果序列化之后需要支持不同的传输层协议,或者需要跨防火墙访问的高性能场景,Protobuf可以优先考虑。
何时接受客户端请求? 服务端(reactor线程)启动后就可以接收
何时注册接受Socket 并注册到对应的EventLoop管理的selector ? 在channel通道创建和初始化完毕后,会通过group. register ()方法将channel通道注册到EventLoop线程池中;从线程池中轮询获取一 个线程EventLoop并与之绑定;而EventLoop线 程池会绑定一 个selector选择器
客户端如何进行初始化?
何时创建的DefaultChannelPipeline ? 服务端channel被反射创建时被创建
ByteBuf的分类: Pooled和Unpooled–》Pooled从 已经分配好的内存取内存Unpooled新创建一块内存
Unsafe和非Unsafe–》Unsafe依赖于jdk底层Unsafe对象非Unsafe不依赖于jdk底层Unsafe对象
Heap和Direct–》Heap底层就是byte数组Direct依赖于Nio的ByteBuffer创建出DirectByteBuffer,堆外内存
Channel与Socket是什么 关系?
Socket:网络上的两个程序通过一一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket,
Channel:一个链接,它提供了如下的功能。
- 获取当前链接的状态
- 配置当前链接参数
- 进行read, write, connect, bind等通道支持的操作。
- 该Channel关联的ChannelPipeLine处理所有的I0事件和绑定在这个channel的请求
channel与EventLoop是什么关系?
一个Channe1在它的生命周期内只注册于一个EventLoop;
一个EventLoop可能会被分配给一 个或多个Channel
Channel与ChannelPipeline是什么关系?
一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由Channe lHandlercontext组成的双向链表, 这个链表的头是HeadContext,链表的尾是TailContext,并且么个ChannelHandlerContext中又关联着- - 个ChannelHandler; EventLoop与EventLoopGroup是什么关系? NioEventLoopGroup是NioEventLoop的组合,用于管理NioEventLoop
EventLoop接口用于处理连接的生命周期中所发生的事件。一个EventLoop在它的生命周期内只和一一个Thread绑定。所有由EventLoop处理的I/0事件都将在它专有的Thread上被处理