IO-InputStream
read
抽象类,提供了一个 read 方法来读取字节。这个方法是一个抽象方法
交由具体的子类来实现。
1 | public abstract class InputStream implements Closeable { |
Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255.
If no byte is available because the end of the stream has been reached, the value -1 is returned.
This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
对于 read 方法来说,每调用一次,其返回数据中的下一个字节,所以就像流水一样。数据从某个地方(数据的源头,例如:磁盘文件,内存,java序列化后的对象)不断地调用 read 方法,将数据取出来:
1 | +---------+ |
这个函数会一直阻塞,直到:
- input data is available
- the end of the stream is detected
- an exception is thrown
If no byte is available because the end of the stream has been reached, the value -1 is returned.
当 read 到数据源的 EOF 时,这个函数将返回 -1.
由于 read 方法并没有任何参数,所以,对于数据源的数据读取,只能是顺序的读取,其数据源内部的下一个返回字节的offset则由具体的实现来维护。例如 FileInputStream中,打开的文件,其默认 offset 就是第一个字节,随着不断 read , offset 由 ReadFile (win32) 和 read (linux) 的实现来维护。
同时,从 InputStream 这个抽象来说,其设计的目的就是提供数据的流式访问,而不是随机访问。所以,InputStream 中并没有提供类似 lseek 等,可以直接设置,下次读取数据位置的方法。
mark
虽然,InputStream 是流式(顺序)读取数据,但是其仍然,提供了一些可以跳跃式读取数据的方法
1 |
|
long skip(long n)
Skips over and discards n bytes of data from this input stream. 从 stream 的当前位置起 skip n 个字节,所以在 skip 调用之后,stream 的 offset 自然也就偏移了 n 个字节,所以 read 调用,将返回 skip 之后的字节。
boolean markSupported()
Tests if this input stream supports the mark and reset methods. 如果 stream 支持 mark 和 reset 操作,则这个方法返回 true, 否则返回 false.
void mark(int readlimit)
Marks the current position in this input stream.当这个方法被调用的时候,mark 会将当前的 stream 的读取的 position 记录下来,注意,mark的stream的位置和参数 readlimit 没有任何关系。
mark 下来的当前位置是有一个有效期的,在这个有效期内,调用 reset 方法可以恢复到 mark 的 position,否则,reset 将抛出异常。这个有效期就是 mark 的参数 readlimit:
The readlimit arguments tells this input stream to allow that many bytes to be read before the mark position gets invalidated.
也就是说在 mark 之后,如果有 readlimit 个字节,已经被读取,那么 mark 调用记录下来的那个 position 将变成 invalidated. 所以 reset 可能抛出 IOException
void reset()
Repositions this stream to the position at the time the mark method was last called on this input stream.
注意上面的描述是来自 InputStream 的文档,也就是说这相当于接口的约定。而具体的行为,则需要参考具体的实现。
例如:InputStream 的一个实现类 ByteArrayInputStream 是支持 mark & reset 操作的,但是其在实现 mark 方法时,并没有使用参数 readlimit. 也就是说这个参数并没有干预到 reset 方法的实现。而在 InputStream 对接口的 reset 的描述中,readlimit 是起作用的。
所以,对一个 stream 的 mark & reset 操作,一定要参考其具体的实现的方式。而不是 InputStream 中的描述。
InputStream 对于 mark & reset 的实现是:
- markSupported return false
- mark does nothing
- reset does nothing except throw an IOException.
所以,如果InputStream的子类没有 override 这几个方法,则这个stream 就不支持 mark & reset.
InputStream 子类
InputStream 子类, 以数据源为分类有
字节数组,文件,java对象序列化文件流,管道,等等。
ByteArrayInputStream
构造过程
1 | public |
1 | pos, mark count |
1 | // 当使用下面的构造函数来构造,ByteArrayInputStream 时 |
mark & reset
这个类支持 mark & reset
1 | public boolean markSupported() { |
notes
close
调用 close 方法并没有任何作用。close 方法是空实现。
内部直接引用的,参数传递过来的数组。如果这个字节数组在其它被修改了,对应 的流内部读取到的数据,也是被修改后的数据。
mark的位置,不会因为 readlimt 参数而失效,其内部没有使用 readlimt.
SequenceInputStream
1 | public class SequenceInputStream extends InputStream { |
这个类的作用是:将流串连起来。
1 | InputStream in; |
内部使用 Enumeration 来存储这些流的引用。
将流串连起来的核心方法:
1 | /** |
PipedInputStream
这个类和 PipedOutputStream 在不同的线程中配合使用。表示管道。
流从 PipedOutputStream 中流入,从 PipedInputStream 流出。
1 | PipedInputStream 将在 out 处读取数据 |
PipedInputStream 的初始状态。
- in 表示 PipedOutputStream.write 方法写入buffer中下一个字节的索引。
- out 表示 PipedInputStream.read 方法读取buffer中下一个字节的索引。
初始状态下:
buffer 中没有数据,所以 in = -1 表示流是空的。
out = 0, 表示可以向 buffer[0] 处写入数据。
此时,如果从 pipe 中 read 数据,则 reader 线程将进入循环等待状态。直到 in > 0,也就是 pipe 中有数据。如果从 pipe 中 write 数据,则 write 完成之后, out = 0, in = 1. 表示,此时若写入数据则应该写入 buffer[in] == buffer[1], 如果读取数据则应该读取,buffer[out] == buffer[0]。并且此后,将保持, out < in, 直到 out 读到 in 的位置,也就是 out == in, 此时,相当于 buffer 中的数据已经全部读完。所以,重置 in = -1.
PipedOutputStream.write
PipedOutputStream.write 调用 PipedInputStream.receive 完成向 Pipe 中写入数据的功能。
1 | protected synchronized void receive(int b) throws IOException { |
PipedInputStream.read
1 | public synchronized int read() throws IOException { |
connect
PipedInputStream 和 PipedOutputStream 如果连接(connect)起来。
1 | // PipedInputStream.connect |
对于 每一个 PipedOutputStream 对象,内部持有一个 PipedInputStream 对象
1 | private PipedInputStream sink; |
表示这个 OutputStream 的输出的目的地。
由上面的 connect 的实现,可知,PipedOutputStream 和 PipedInputStream 永远是 1:1 的关系,一个 PipedOutputStream 对象只能和一个 PipedInputStream 进行 connect,反之亦然。否则,直接 IOException(“Already connected”);
所以,这里的 pipe 就像管道一样,一个出口,一个入口。而不是像消息系统,那样,可以有多个消费者(PipedInputStream),多个生产者(PipedOutputStream)。
使用场景
Typically, data is written to a PipedOutputStream object by one thread and data is read from the connected PipedInputStream by some other thread
Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread.
所以通常是使用两个线程来使用 PipedOutputStream 和 PipedInputStream。对于使用 PipedOutputStream 向 pipe 中写入数据的线程可以认为是生产者,使用 PipedInputStream 读取数据的线程可以认为是 消费者。
pipe 是线程安全的:
尽管 pipe 是 1:1 的 PipedOutputStream 和 PipedInputStream 互相连接,但是 PipedOutputStream 和 PipedInputStream 的对象,可以由多个线程,同时使用。也就是说 PipedOutputStream 和 PipedInputStream 两个对象,构成了一个线程安全的 pipe , 然后多个线程可以通过这个 pipe 来生产和消费数据。
1 | +-----------------------------------------------------+ |
如上图所示,ThreadA, ThreadB,ThreadC,ThreadD 持有PipedOutputStream 可以向 pipe 中写入数据,而 Thread1,Thread2,Thread3 持有 PipedInputStream 可以从 pipe 中读取数据。
虽然,我们可以这样使用pipe, 但是这里存在一个问题:
例如 对于 Thread2 这个 reader ,如果它没有调用了 read 之后,没有调用 close 方法,然后线程 Thread2 终止了。则此时:
1 | public synchronized int read() throws IOException { |
其实,造成这种状况的根本,原因,在于:
1 | public class PipedInputStream extends InputStream { |
PipedInputStream 只跟踪了最近一次在 pipe 上执行 read 和 write 操作的线程。所以,也就无法,在其中一个 reader 或者, writer 出现问题的时候,通知,其它的线程。
再考虑,如果 Thread2 在终止之前调用了 close 方法呢,其实也还是有问题,Thread2 在没有通过其它 reader 的情况下,就直接将 pipe 给关闭了,这是否对其它正在工作的 reader 造成影响。所以,对于在同一个 pipe 上进行 read 和 write 的线程,维护 pipe 的状态,是一个大的问题。
所以,pipe 最好,还是一个线程进行 write 操作,另一个线程进行 read 操作,比较好。或者 一个线程 write , 并执行 close 操作,多个线程 read. 或者 多个线程 write, 然后一个线程 read, 由这个 read 线程来维护 pipe 的 close 操作.
对于 pipe 来说,是有吞吐量的,其吞吐量就是 创建 PipedInputStream 时的参数:
1 | public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { |
就是 pipeSize, 默认是 1024 个字节。一旦,向这个 pipe 写入 1024 个字节之后,如果不进行消费,则 PipedOutputStream.write 操作将进入 wait 状态,直到,有数据被消费,write 才有空间,继续写入数据。
同理,如果 pipe 中没有数据,没有进行 write 操作,或者 pipe 中的 byte 被消费完毕,则 PipedInputStream.read 操作将进入 wait 状态。直到,有数据被写入,read 才可以读取到数据。
PipedInputStream 不支持,mark & reset.
pipe的关闭:
PipedOutputStream 关闭:
当 PipedOutputStream 调用 close 时,使得 PipedInputStream.closedByWriter = true.
表明,pipe 被 writer 线程关闭了,也就是 write 线程调用了 PipedOutputStream.close 关闭了 pipe. 如果是这样,说明流,再不会产生数据了,则当前 PipedInputStream,直到数据读完之后。再次调用 read时, 将得到 -1,说明流已经读完毕了。对于 PipedOutputStream 如果在 close 之后,调用,write, 则会产生: IOException(“Pipe closed”);
PipedInputStream 关闭
设置 closedByReader = true; ,表示 pipe 被 read 线程关闭了。此时,如果 PipedInputStream 调用 read 将直接:IOException(“Pipe closed”); 而不管流是否被读完。
当然,对于 PipedOutputStream 写线程来说,其调用
write 也会是 IOException(“Pipe closed”);