介绍
- 进程间的通信在许多科技论坛的帖子上都已经说的很详细,主要有:共享内存,管道,信号量,信号,socket等通信方式
- 但题主又觉得他们说的不够详细,在什么场景下使用,如何使用,这是许多帖子没有说清楚的
目录
共享内存
- 说明
- java中没有专门的共享内存方法。 MappedByteBuffer 是为了文件映射,加快大文件读写速度。 共享内存,有许多种实现方法,在java中可以使用文件映射来实现共享内存,缺点是文件映射必须有文件, 同时有其他开销。 文件映射的方式读写文件其实是对内存的操作,所以速度与读写内存是一致的, 多余的开销在内存数据还是会同步到硬盘的,这个开销是异步的,影响不大。
- MappedByteBuffer其实是利用了操作系统底层的虚拟内存技术,将需要的磁盘文件(空间),映射到内存中, 内存与磁盘的同步由操作系统负责,所以这种技术下,数据在多个进程之间有一定的同步延迟。
- 特点
- 可被多个进程打开访问;
- 读写操作的进程在执行读写操作时,其他进程不能进行写操作,这个需要在管理程序中自己实现;
- 多个进程可以交替对某一共享内存执行写操作;
- 一个进程执行内存写操作后,不影响其他进程对该内存的访问,同时其他进程对更新后的内存具有可见性;
- 说明 的言外之意是MappedByteBuffer并不能保证多个进程共享内存的强一致性
- 不要经常调用MappedByteBuffer.force()方法,这个方法强制操作系统将内存中的内容写入硬盘, 所以如果你在每次写内存映射文件后都调用force()(同步刷磁盘)方法,你就不能真正从内存映射文件中获益, 而是跟disk IO差不多
- 共享内存管理类MemShared.class
import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; public class MemShared { //第0-3个字节控制版本 // 4-7个字节为size,表示内存可写大小 // 第8-11个字节为position 当前读取内存获取写内存的位置 // 第12-15个字节为limit 当前内存最大可读取位置 // 以后为数据控制 //createMem(File file,int size)用于初始化共享内存,对应主要方法为initSharedMem() //getMem(File file)用于加载已经初始化共享内存,对应主要方法为setSharedMem() private MemShared(){ } private File file; private RandomAccessFile randomAccessFile; private FileChannel fc; private int off=16;//数据偏移量 private MappedByteBuffer map;//实际的操作内存 public File getFile() { return file; } public static int byteArrayToInt(byte[] bytes) {//字节转int int value=0; //由高位到低位 for (int i = 0; i < bytes.length; i++) { value |= bytes[i]; if(i<bytes.length-1) value <<= 8;//往高位游 } return value; } public int getSize() { byte[] bytes = new byte[4]; for (int i = 4; i <= 7; i++) { bytes[i-4]=this.map.get(i); } return byteArrayToInt(bytes); } public void setSize(int size) { for (int i = 7; i >= 4; i--) { int val = size & 0xFF; this.map.put(i,(byte)val); size>>=8; } } public int getLimit(){ byte[] bytes = new byte[4]; for (int i = 12; i <= 15; i++) { bytes[i-12]=this.map.get(i); } return byteArrayToInt(bytes); } public void setLimit(int limit){ for (int i = 15; i >= 12; i--) { int val = limit & 0xFF; this.map.put(i,(byte)val); limit>>=8; } } public int getVersion() { byte[] bytes = new byte[4]; for (int i = 0; i <= 3; i++) { bytes[i]=this.map.get(i); } return byteArrayToInt(bytes); } private void setVersion(int version){ for (int i = 3; i >= 0; i--) { int val = version & 0xFF; this.map.put(i,(byte)val); version>>=8; } } public int getPosition(){ byte[] bytes = new byte[4]; for (int i = 8; i <=11; i++) { bytes[i-8]=this.map.get(i); } return byteArrayToInt(bytes); } public void setPosition(int position){ for (int i = 11; i >= 8; i--) { int val = position & 0xFF; this.map.put(i,(byte)val); position>>=8; } } static MemShared createMem(File file,int size,int version) throws IOException { MemShared memShared = new MemShared(); memShared.file=file; memShared.initSharedMem(size,version); return memShared; } static MemShared createMem(File file,int size) throws IOException { return createMem(file,size,0); } static MemShared getMem(File file) throws IOException { MemShared memShared = new MemShared(); memShared.file=file; memShared.setSharedMem(); return memShared; } private void setSharedMem() throws IOException { //获取随机存取文件对象,建立文件和内存的映射,即时双向同步 this.randomAccessFile=new RandomAccessFile(file,"rw"); this.fc = this.randomAccessFile.getChannel(); byte[] bytes = new byte[4]; this.fc.read(ByteBuffer.wrap(bytes),4); int size=byteArrayToInt(bytes); this.map = fc.map(FileChannel.MapMode.READ_WRITE, 0, size); } private void initSharedMem(int size,int version) throws IOException { //获取随机存取文件对象,建立文件和内存的映射,即时双向同步 this.randomAccessFile=new RandomAccessFile(file,"rw"); this.fc = this.randomAccessFile.getChannel(); this.map = this.fc.map(FileChannel.MapMode.READ_WRITE, 0, this.off+size); this.map.put((byte) 0); this.setVersion(version); this.setPosition(0); this.setSize(size); } public void flip(){ this.setPosition(0); } public void write(byte[] bytes) throws Exception { FileLock lock = fc.lock(); if (bytes.length>getSize()) throw new Exception("bytes.length>getSize()"); this.map.put(bytes); this.setLimit(bytes.length); lock.release(); } public void write(byte b) throws Exception { FileLock lock = fc.lock(); int position = getPosition(); if (position>=getSize()-1) throw new Exception("positoin超出内存大小"); this.map.put(++position+this.off,b); this.setPosition(position); lock.release(); } public byte get(int position) throws Exception { if (position>=getLimit())throw new Exception("position>=getLimit()"); return this.map.get(position+this.off); } public byte get(){ return this.map.get(getPosition()+this.off); } public void getBytes(byte[] bytes){ this.map.get(bytes); } }
- 共享内存创建NIOWrite.class
import java.io.File; public class NIOWrite { public static void main(String[] args) throws Exception { File file = new File("/home/wangwen/Memshared"); MemShared mem = MemShared.createMem(file, 2048); int cur=0; for (int i = 0; i < 10; i++) { mem.write((byte) cur); int position = mem.getPosition(); byte b = mem.get(position); System.out.println("index:"+position+" "+b); cur=b+1; Thread.sleep(1000); } } } /*结果 index:1 0 index:2 1 index:3 2 index:4 3 index:5 4 index:7 5 index:9 6 index:11 7 index:13 8 index:15 9*/
- 共享内存读取NIORead.class
import java.io.File; public class NIORead { public static void main(String[] args) throws Exception { File file = new File("/home/wangwen/Memshared"); MemShared mem = MemShared.getMem(file); int cur; for (int i = 0; i < 10; i++) { int position = mem.getPosition(); byte b = mem.get(position); System.out.println("index:"+position+" "+b); cur=b+1; mem.write((byte) cur); Thread.sleep(1000); } } } /*结果 index:5 4 index:7 5 index:9 6 index:11 7 index:13 8 index:15 9 index:16 10 index:17 11 index:18 12 index:19 13 */
- 备注 先运行NIOWrite.class,再运行NIORead.class
管道
- 说明
管道是进程间通信的一种重要方式,linux的bash shell中“|”就是管道的意思, 管道分为无名管道和有名管道,前者只能用于具有亲缘关系的进程之间使用 从管道读数据是一次性操作,数据一旦被读,它就从管道中被抛 弃,释放空间以便写更多的数据 - Linux管道的实现机制
- 本质上说,管道也是一种虚拟的文件,在程序中表现为文件句柄。它是一个固定大小的缓冲区 在写管道时可能变满,当这种情况发生时,随后对管道的write()调用将默认地被阻塞,等待 某些数据被读取,以便腾出足够的空间供write()调用写;读取进程也可能工作得比写进程快。当所有当前进程数据已被读取时, 管道变空。当这种情况发生时,一个随后的read()调用将默认地被阻塞。
- 无名管道:主要用于父进程与子进程之间,或者两个兄弟进程之间。在linux系统中可以通过系统调用建立起一个单向的通信管道,且这种关系只能由父进程来建立。
- 命名管道:命名管道是建立在实际的磁盘介质或文件系统(而不是只存在于内存中)上有自己名字的文件,任何进程可以在任何时间通过文件名或路径名与该文件建立联系。 为了实现命名管道,引入了一种新的文件类型——FIFO文件(遵循先进先出的原则)。实现一个命名管道实际上就是实现一个FIFO文件。 命名管道一旦建立,之后它的读、写以及关闭操作都与普通管道完全相同。虽然FIFO文件的inode节点在磁盘上,但是仅是一个节点而已,文件的数据还是存在于内存缓冲页面中,和普通管道相同。
- 管道是由内核管理的一个缓冲区,相当于我们放入内存中的一个纸条。管道的一端连接一个进程的输出。这个进程会向管道中放入信息。 管道的另一端连接一个进程的输入,这个进程取出被放入管道的信息。一个缓冲区不需要很大一般为4K大小,它被设计成为环形的数据结构,以便管道可以被循环利用。 当管道中没有信息的话,从管道中读取的进程会等待,直到另一端的进程放入信息。当管道被放满信息的时候,尝试放入信息的进程会等待,直到另一端的进程取出信息。 当两个进程都终结的时候,管道也自动消失。
- Java使用管道通讯
- 很不幸Java没有提供管道通信API。。。。。
- 使用FileInputStream和FileOutStream可以模拟实现有名管道,配合MapedByteBuffer和队列结构, 可以实现管道的功能,但是该功能借助真实磁盘文件同步信息,信息的同步会有延迟
- 下边使用Runtime,ProcessBuilder模拟实现匿名管道
Runtime rt = Runtime.getRuntime(); Process process = rt.exec("cmd"); ProcessBuilder pb = new ProcessBuilder("exeProcess", "param"); Process p = pb.start();
- 以Main.java作为父进程
package com.laowng.msg.pipe; import java.io.*; public class Pipe { public static void main(String[] args) throws IOException, InterruptedException { Runtime rt = Runtime.getRuntime(); Process a = rt.exec(new String[]{"java","-classpath","/home/laowng/IdeaProjects/algrithm/out/production/algrithm","com.laowng.msg.pipe.A"}); //ProcessBuilder A = new ProcessBuilder("java", "com.laowng.msg.pipe.A"); //Process a = A.start(); System.out.println("启动主类"); InputStream aPipInStream = a.getInputStream(); OutputStream aPipOutStream = a.getOutputStream(); InputStream aerrorStream = a.getErrorStream(); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(aPipOutStream)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(aPipInStream)); //检查错误流 StringBuilder res= new StringBuilder(); byte[] bytes = new byte[1024]; int read; Thread.sleep(100); int available = aerrorStream.available(); if(available>0) { while ((read = aerrorStream.read(bytes)) != -1) res.append(new String(bytes, 0, read)); System.out.println(res); }else {//脚本正常运行 System.out.println("[a]接受消息<------:"+bufferedReader.readLine()); for (int i = 0; i < 10; i++) { String msg=""+i; System.out.println("[a]发送消息------>:"+msg); bufferedWriter.write(msg); bufferedWriter.newLine(); bufferedWriter.flush(); String s = bufferedReader.readLine(); System.out.println("[a]接受消息<------:"+s); } } } }
- A.class作为子进程
package com.laowng.msg.pipe; import java.io.*; public class A { public static void main(String[] args) throws IOException { System.out.println("[a]被调用成功!"); BufferedReader fatherPipInStream = new BufferedReader(new InputStreamReader(System.in)); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(System.out)); //fatherPipOutStream使用sout实现 for (int i = 0; i < 10; i++) { String s = fatherPipInStream.readLine(); bufferedWriter.write("[a]收到了"+s); bufferedWriter.newLine(); bufferedWriter.flush(); } } }
- Main的执行结果
启动主类 [a]接受消息<------:[a]被调用成功! [a]发送消息------>:0 [a]接受消息<------:[a]收到了0 [a]发送消息------>:1 [a]接受消息<------:[a]收到了1 [a]发送消息------>:2 [a]接受消息<------:[a]收到了2 [a]发送消息------>:3 [a]接受消息<------:[a]收到了3 [a]发送消息------>:4 [a]接受消息<------:[a]收到了4 [a]发送消息------>:5 [a]接受消息<------:[a]收到了5 [a]发送消息------>:6 [a]接受消息<------:[a]收到了6 [a]发送消息------>:7 [a]接受消息<------:[a]收到了7 [a]发送消息------>:8 [a]接受消息<------:[a]收到了8 [a]发送消息------>:9 [a]接受消息<------:[a]收到了9
- 这种捕获子进程输入输出流的方式可以模拟实现匿名管道,并且如果把父进程作为管道管理进程(种转输入输出流), 可以实现兄弟进程间的管道流传输。
- A.class作为子进程
信号量
- 信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。 当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时, 计数值加一。当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态。 semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态。
-
信号量的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明的, 广泛的应用于不同的操作系统中。在系统中,给予每一个进程一个信号量,代表每个进程目前的状态, 未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来。如果信号量是一个任意的整数, 通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的0或1, 称为二进制信号量(binary semaphore)。
-
java线程的状态
状态 说明 初始(NEW) 新创建了一个线程对象,但还没有调用start()方法。 运行(RUNNABLE) Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。 阻塞(BLOCKED) 表示线程阻塞于锁。 等待(WAITING) 进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。 超时等待(TIMED_WAITING) 该状态不同于WAITING,它可以在指定的时间后自行返回。 终止(TERMINATED) 表示该线程已经执行完毕。 - Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。作用:给其它线程执行机会的最佳方式。
- Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
- t.join()/t.join(long millis),当前线程里调用其它线程t的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程t执行完毕或者millis时间到,当前线程进入就绪状态。
- obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
- obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。
- java进程的状态 与操作系统相关,但一般与线程状态一致,区别是信号量由操作系统管理,线程的信号量由JVM进程管理。
信号
- 对于 Linux来说,实际信号是软中断,许多重要的程序都需要处理信号。信号,为 Linux 提供了一种处理异步事件的方法。比如,终端用户输入了 ctrl+c 来中断程序,会通过信号机制停止一个程序。
- 每个信号都有一个名字和编号,这些名字都以“SIG”开头
laowng@laowng-ThinkCentre-M8600t-N000:~$ kill -l 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP 21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR 31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3 38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8 43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7 58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2 63) SIGRTMAX-1 64) SIGRTMAX
3.信号的处理: 信号的处理有三种方法,分别是:忽略、捕捉和默认动作
- 忽略信号,大多数信号可以使用这个方式来处理,但是有两种信号不能被忽略(分别是 SIGKILL和SIGSTOP)。因为他们向内核和超级用户提供了进程终止和停止的可靠方法,如果忽略了,那么这个进程就变成了没人能管理的的进程,显然是内核设计者不希望看到的场景
- 捕捉信号,需要告诉内核,用户希望如何处理某一种信号,说白了就是写一个信号处理函数,然后将这个函数告诉内核。当该信号产生时,由内核来调用用户自定义的函数,以此来实现某种信号的处理。
- 系统默认动作,对于每个信号来说,系统都对应由默认的处理动作,当发生了该信号,系统会自动执行。
First the signals described in the original POSIX.1-1990 standard. Signal Value Action Comment ────────────────────────────────────────────────────────────────────── SIGHUP 1 Term Hangup detected on controlling terminal or death of controlling process SIGINT 2 Term Interrupt from keyboard SIGQUIT 3 Core Quit from keyboard SIGILL 4 Core Illegal Instruction SIGABRT 6 Core Abort signal from abort(3) SIGFPE 8 Core Floating-point exception SIGKILL 9 Term Kill signal SIGSEGV 11 Core Invalid memory reference SIGPIPE 13 Term Broken pipe: write to pipe with no readers; see pipe(7) SIGALRM 14 Term Timer signal from alarm(2) SIGTERM 15 Term Termination signal SIGUSR1 30,10,16 Term User-defined signal 1 SIGUSR2 31,12,17 Term User-defined signal 2 SIGCHLD 20,17,18 Ign Child stopped or terminated SIGCONT 19,18,25 Cont Continue if stopped SIGSTOP 17,19,23 Stop Stop process SIGTSTP 18,20,24 Stop Stop typed at terminal SIGTTIN 21,21,26 Stop Terminal input for background process SIGTTOU 22,22,27 Stop Terminal output for background process The signals SIGKILL and SIGSTOP cannot be caught, blocked, or ignored.
socket通信
- Socket可以看成在两个程序进行通讯连接中的一个端点(endpoint),一个程序将一段信息写入Socket中,该Socket将这段信息发送给另外一个Socket中,使这段信息能传送到其他程序中。一般一个server服务器对应很多客户端client连接,服务器必须维护一张客户连接列表,每增加一个客户端连接服务器端都要新建一个套接字负责与新增客户端进行对话通信。
- 传输套接字主要有两类:流式套接字(SOCK_STREAM)和数据报套接字(SOCK_DGRAM)。流类型的套接字是为需要可靠连接的应用程序设计的。这些程序通常使用连续的数据流。用于这种类型套接字的协议是TCP,适合FTP这类实现。流套接字是最常用的,一些众所周知的协议如HTTP、TCP、SMTP、POP3等都是基于面向流的协议。
-
数据报套接字使用UDP做为下层协议,是无连接的,有一个最大缓冲区大小(数据包大小的最大值)。它是为那些需要发送小数据包,并且对可靠性要求不高的应用程序设计的。与流式套接字不同,数据报套接字并不保证数据会到达终端,也不保证它是以正确的顺序到来的。数据报套接字的传输效率相当高,它经常用于音频或视频应用程序。对这些程序来说,速度比可靠性更加重要。
- 下边的案例是copy的,写的挺好
- IP地址使用InetAddress类来表示,获取InetAddress实例的两个方法为:
- getByName(String host) 根据主机获取对应的InetAddress对象
- getByAddress(byte[] addr)根据IP地址获取InetAddress对象
- InetAddress提供了三个方法来获取InetAddress实例对应的IP地址和主机名
- String getCanonicalHostName()获取此IP地址的权限定域名
- String getHostAddress()获取InetAddress实例对应的IP地址
- String getHostName()获取此IP地址的主机名
- 服务端代码
import java.net.*; import java.io.*; /** 手机端代码 手机端作为服务器,获取自己的ip地址,并显示以供客户端连接 */ public class phone_Server { public static void main(String[] args) throws IOException { //打印本机的IP地址 InetAddress address=InetAddress.getLocalHost(); System.out.println("本机的IP地址是"+address.getHostAddress()); // 创建一个ServerSocket,用于监听客户端Socket的连接请求 ServerSocket ss = new ServerSocket(30000); // 采用循环不断接受来自客户端的请求 while (true) { // 每当接受到客户端Socket的请求,服务器端也对应产生一个Socket Socket s = ss.accept(); // 将Socket对应的输出流包装成PrintStream PrintStream ps = new PrintStream(s.getOutputStream()); // 进行普通IO操作 ps.println("您好,您收到了服务器的新年祝福!"); // 关闭输出流,关闭Socket ps.close(); s.close(); } } }
- 客户端代码
/** PC端代码 PC作为客户端,根据服务器的IP地址和端口号连接服务器 */ import java.net.*; import java.io.*; public class PC_Client { public static void main(String[] args) throws IOException { //Socket socket = new Socket("127.0.0.1" , 30000); Socket socket = new Socket("192.168.47.1" , 30000);//这里的IP地址填写手机端服务器的IP地址 // 将Socket对应的输入流包装成BufferedReader BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 进行普通IO操作 String line = br.readLine(); System.out.println("来自服务器的数据:" + line); // 关闭输入流、socket br.close(); socket.close(); } }
- socket大数据发送
//发送 DataOutputStream out = new DataOutputStream(socket.getOutputStream()); int start=0; while((start+1024)<data.length) { out.write(data, start,1024); start=start+1024; } if(start<data.length) { out.write(data, start,(data.length-start+1)); } //String str = new String(data); //out.writeUTF(str); }catch (Exception e) { Log.d(TAG, "文件传输异常"); }
- socket大数据接受
//接收 DataInputStream input = new DataInputStream(socket.getInputStream()); byte []buf=new byte[1024]; int readnum=0; while(true) { readnum=input.read(buf); if(readnum>0) { System.out.println(Arrays.toString(buf)); while((readnum=input.read(buf))>0) { System.out.println(Arrays.toString(buf)); } } }
- IP地址使用InetAddress类来表示,获取InetAddress实例的两个方法为: