louzin
6 months ago
2 changed files with 174 additions and 0 deletions
@ -0,0 +1,71 @@ |
|||||||
|
# Selector |
||||||
|
多路复用器,用来检查一个或多个NIO Channel是否处于可读、可写 |
||||||
|
|
||||||
|
如此实现单线程管理多个channels |
||||||
|
## 可选通道 SelectableChannel |
||||||
|
不是所有的Channel都可被Selector复用如,FileChannel |
||||||
|
|
||||||
|
取决于是否继承了抽象类SelectableChannel |
||||||
|
|
||||||
|
SelectableChannel可选择监指定通道是可读或者可写 |
||||||
|
|
||||||
|
## 注册 Channel Selector |
||||||
|
`Channel.register(Selector sel,int ops)` |
||||||
|
第一个参数指定要注册的选择器,第二个参数指定选择器需要查询的通道操作 |
||||||
|
- 可读:SelectionKey.OP_READ |
||||||
|
- 可写:SelectionKey.OP_WRITE |
||||||
|
- 连接:SelectionKey.OP_CONNECT |
||||||
|
- 接收:SelectionKey.OP_ACCEPT |
||||||
|
|
||||||
|
若Selector对一个通道的多种操作感兴趣,可使用: |
||||||
|
`int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;` |
||||||
|
## SelectionKey |
||||||
|
Channel注册后,若通道处于以上几种状态,就可以被选择器查询到,使用Selector中的select(),对感兴趣的通道进行操作 |
||||||
|
|
||||||
|
Selector可以不断查询Channel的状态,一旦有Channel进入Selector监听的就绪状态,就会被Selector选中放入选择键集合中 |
||||||
|
|
||||||
|
一个选择键包含注册Selector的通道操作类型,如绑定SelectionKey.OP_READ,也包含特定Channel与Selector之间的注册关系 |
||||||
|
|
||||||
|
## 使用方法 |
||||||
|
|
||||||
|
### 创建 |
||||||
|
```java |
||||||
|
//Channel与Selector使用时必须属于非阻塞模式 |
||||||
|
//这意味着FileChannel不能和Selector使用 |
||||||
|
//create selector |
||||||
|
Selector selector = Selector.open(); |
||||||
|
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); |
||||||
|
serverSocketChannel.configureBlocking(false); |
||||||
|
//bind |
||||||
|
serverSocketChannel.bind(new InetSocketAddress(9999)); |
||||||
|
//register channel and listener Accept |
||||||
|
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); |
||||||
|
System.out.println(serverSocketChannel.validOps()==SelectionKey.OP_WRITE); |
||||||
|
``` |
||||||
|
### 轮询查询就绪操作 |
||||||
|
- select():阻塞到至少有一个通道在你注册的事件上生效了 |
||||||
|
- select(long timeout):与select一致,但最长阻塞事件为timeout毫秒 |
||||||
|
- selectNow():非阻塞,只要有通道准备就绪立即返回 |
||||||
|
|
||||||
|
select返回int,表示自前一次select到这个之间有多少个通道变为就绪状态 |
||||||
|
```java |
||||||
|
Selector open = Selector.open(); |
||||||
|
Set<SelectionKey> selectionKeys = open.selectedKeys(); |
||||||
|
Iterator<SelectionKey> iterator = selectionKeys.iterator(); |
||||||
|
while (iterator.hasNext()){ |
||||||
|
SelectionKey next = iterator.next(); |
||||||
|
if(next.isAcceptable()){ |
||||||
|
|
||||||
|
}else if (next.isConnectable()){ |
||||||
|
|
||||||
|
} |
||||||
|
//... |
||||||
|
} |
||||||
|
``` |
||||||
|
### 停止选择的方法 |
||||||
|
选择器执行选择过程中,系统底层会依次询问每个通道是否已经准备就绪,这个过程可能会造调用线程进入阻塞状态 |
||||||
|
|
||||||
|
有几种方式唤醒在select()方法中阻塞的线程 |
||||||
|
|
||||||
|
- wakeup():通过Selector中的方法让处于阻塞状态中的select()方法立即返回;若本次未生效,则下一次select时立即返回 |
||||||
|
- close():通过close关闭Selector,能使唤醒,但会让这个Selector注销,所有的键被唤醒 |
@ -0,0 +1,103 @@ |
|||||||
|
package com.louzin.niodemo.selecter; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
|
||||||
|
import java.io.IOException; |
||||||
|
import java.net.InetSocketAddress; |
||||||
|
import java.nio.ByteBuffer; |
||||||
|
import java.nio.channels.*; |
||||||
|
import java.nio.charset.StandardCharsets; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.Iterator; |
||||||
|
import java.util.Set; |
||||||
|
|
||||||
|
public class selectorcrud { |
||||||
|
//Channel与Selector使用时必须属于非阻塞模式
|
||||||
|
//这意味着FileChannel不能和Selector使用
|
||||||
|
|
||||||
|
@Test |
||||||
|
public void createSelector() throws IOException { |
||||||
|
//create selector
|
||||||
|
Selector selector = Selector.open(); |
||||||
|
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); |
||||||
|
serverSocketChannel.configureBlocking(false); |
||||||
|
//bind
|
||||||
|
serverSocketChannel.bind(new InetSocketAddress(9999)); |
||||||
|
//register channel and listener Accept
|
||||||
|
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); |
||||||
|
System.out.println(serverSocketChannel.validOps()==SelectionKey.OP_WRITE); |
||||||
|
serverSocketChannel.close(); |
||||||
|
selector.close(); |
||||||
|
|
||||||
|
} |
||||||
|
@Test |
||||||
|
public void cycleSelector() throws IOException { |
||||||
|
Selector open = Selector.open(); |
||||||
|
Set<SelectionKey> selectionKeys = open.selectedKeys(); |
||||||
|
Iterator<SelectionKey> iterator = selectionKeys.iterator(); |
||||||
|
while (iterator.hasNext()){ |
||||||
|
SelectionKey next = iterator.next(); |
||||||
|
if(next.isAcceptable()){ |
||||||
|
|
||||||
|
}else if (next.isConnectable()){ |
||||||
|
|
||||||
|
} |
||||||
|
//...
|
||||||
|
} |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void serverDemo() throws IOException { |
||||||
|
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); |
||||||
|
serverSocketChannel.bind(new InetSocketAddress("localhost",8000)); |
||||||
|
serverSocketChannel.configureBlocking(false); |
||||||
|
|
||||||
|
Selector selector = Selector.open(); |
||||||
|
//register channel and listen accept
|
||||||
|
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); |
||||||
|
while (selector.select()>0){ |
||||||
|
Set<SelectionKey> selectionKeys = selector.selectedKeys(); |
||||||
|
Iterator<SelectionKey> iterator = selectionKeys.iterator(); |
||||||
|
while (iterator.hasNext()){ |
||||||
|
SelectionKey next = iterator.next(); |
||||||
|
//judgement operation
|
||||||
|
if(next.isAcceptable()){ |
||||||
|
//get connect
|
||||||
|
SocketChannel accept = serverSocketChannel.accept(); |
||||||
|
//change to nio
|
||||||
|
accept.configureBlocking(false); |
||||||
|
//register
|
||||||
|
accept.register(selector,SelectionKey.OP_READ); |
||||||
|
}else if(next.isReadable()){ |
||||||
|
SocketChannel channel = (SocketChannel) next.channel(); |
||||||
|
ByteBuffer allocateinno = ByteBuffer.allocate(1024); |
||||||
|
int length = 0; |
||||||
|
while ((length = channel.read(allocateinno))>0){ |
||||||
|
allocateinno.flip(); |
||||||
|
System.out.println(new String(allocateinno.array(),0,length)); |
||||||
|
allocateinno.clear(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
iterator.remove(); |
||||||
|
} |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void clientDemo() throws IOException { |
||||||
|
//get channel and bind port
|
||||||
|
SocketChannel so = SocketChannel.open(new InetSocketAddress("localhost",8000)); |
||||||
|
//change to nio
|
||||||
|
so.configureBlocking(false); |
||||||
|
//create buffer
|
||||||
|
ByteBuffer allocate = ByteBuffer.allocate(64); |
||||||
|
//write buffer
|
||||||
|
allocate.put(new Date().toString().getBytes(StandardCharsets.UTF_8)); |
||||||
|
//mode change
|
||||||
|
allocate.flip(); |
||||||
|
//write into channel
|
||||||
|
so.write(allocate); |
||||||
|
//clear buffer and close connect
|
||||||
|
allocate.clear(); |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue