首先来看一些 Channel 的基础定义:

1
2
3
4
5
public interface Channel extends Closeable {
public boolean isOpen();

public void close() throws IOException;
}

与 Buffer 不同,Channel 的 API 主要是通过接口来定义,不同的操作系统上 Channel 的实现会有根本差别,所以 API 只会描述可以做什么。

InterruptibleChannel 是一个标记接口,表示该 Channel 是可以中断的。

从 Channel 引申出来的接口都是面向字节的子接口, WritableByteChannel 和 ReadableByteChannel。Channel 只能在 ByteBuffer 上操作。

打开 Channel

I/O 可以分为广义的两大类别:File I/O 和 Stream I/O。发现的 Channel 有如下:

  • FileChannel
  • SocketChannel,ServerSocketChannel,DatagramChannel

Socket 通道可以通过工厂来创建,但是 FileChannel 只能通过 RandomAccessFile、FileInputStream 或 FileOutputStream 对象上调用 getChannel( )方法来获取。

1
2
3
4
5
6
7
8
9
10
SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));

ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));

DatagramChannel dc = DatagramChannel.open( );

RandomAccessFile raf = new RandomAccessFile ("somefile", "r");
FileChannel fc = raf.getChannel( );

使用 Channel

Channel 把数据传输给 ByteBuffer 对象或者从 ByteBuffer 对象当中获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
public interface ReadableByteChannel extends Channel {
public int read(ByteBuffer dst) throws IOException;
}

public interface WritableByteChannel extends Channel {
public int write(ByteBuffer src) throws IOException;
}


public interface ByteChannel
extends ReadableByteChannel, WritableByteChannel
{}

通道可以是单向(unidirectional)或者双向的(bidirectional)。一个 channel 类可能实现定义 read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以 供 write( )方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果 一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。

SocketChannel 一直都是双向的,但是 FileChannel 可不全是,从 FileInputStream 获取的 FileChannel 是可读的,从 FileOutputStream 获取的 FileChannel 是可写的。

ByteChannel 的 read() 和 write() 方法使用 ByteBuffer 对象作为参数。两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新 交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining() 方法返回 false 值。

如下是从一个 Channel 复制数据到另一个 Channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package nio.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

public class ChannelCopy {

public static void main(String[] argv) throws IOException {
ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
channelCopy1(source, dest);
// alternatively, call channelCopy2 (source, dest);
source.close();
dest.close();
}

private static void channelCopy1(ReadableByteChannel src, WritableByteChannel dest)
throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(8);
while (src.read(buffer) != -1) {
buffer.flip();
dest.write(buffer);
buffer.compact();//如果没写完的话,下次继续写
System.out.println();
}

buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
}

private static void channelCopy2(ReadableByteChannel src, WritableByteChannel dest)
throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
while (src.read(buffer) != -1) {
buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
buffer.clear();
}
}
}

与 Buffer 不同, Channel 不能被重复使用。一个打开的通道即代表与一个特定 I/O 服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。

调用通道的 close()方法时,可能会导致在通道关闭底层 I/O 服务时线程暂时阻塞,哪怕该通道处于非阻塞模式。

如果一个 Channel 实现 InterruptibleChannel 接口,那么一个线程在一个 Channel 上被阻塞并且同时被中断时,这个 Channel 会被关闭,该线程也会产生一个 ClosedByInterruptException 异常。

Scatter/Gather

Scatter/Gather 的概念是指在多个缓冲区上实现一个简单的 I/O 操作。

对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着 Channel 发送的。

对于 read 操作而言,从通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至 Channel 中的数据或者缓冲区的最大空间被消耗完。

1
2
3
4
5
6
7
8
9
10
11
public interface ScatteringByteChannel extends ReadableByteChannel {
public long read(ByteBuffer[] dsts) throws IOException;

public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
}

public interface GatheringByteChannel extends WritableByteChannel {
public long write(ByteBuffer[] srcs) throws IOException;

public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}

通过例子来理解概念:

channel 连接到一个有 48 字节数据等待读取的 socket 上:

1
2
3
4
ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (80);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);

一旦 read( )方法返回,bytesRead 就被赋予值 48,header 缓冲区将包含前 10 个从通道读取的字节而 body 缓冲区则包含接下来的 38 个字节。

用一个 gather 操作将多个缓冲区的数据组合并发送出去。

1
2
3
4
5
body.clear( );
body.put("FOO".getBytes()).flip( ); // "FOO" as bytes
header.clear( );
header.putShort (TYPE_FILE).putLong(body.limit()).flip( );
long bytesWritten = channel.write (buffers);

带 offset 和 length 参数版本的 read( ) 和 write( )方法使得我们可以使用缓冲区 列的子集 缓冲区。

这里的 offset 值指哪个缓冲区将开始被使用,而不是指数据的 offset。

举个例子,假设我们有一个五元素的 fiveBuffers 列,它已经被 初始化并引用了五个缓冲区,下面的代码将会写第二个、第三个和第四个缓冲区的内容:

1
int bytesRead = channel.write (fiveBuffers, 1, 3);

【参考资料】

  1. Java Nio

—EOF—