通过Socket,实现多线程数据接收_.NET_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > .NET > 通过Socket,实现多线程数据接收

通过Socket,实现多线程数据接收

 2010/12/12 15:05:16  jonsion  http://jonsion.javaeye.com  我要评论(0)
  • 摘要:网络上的两个程序通过一个双向的通讯连接实现数据的交换,这个双向链路的一端称为一个Socket。Socket通常用来实现客户方和服务方的连接。Socket是TCP/IP协议的一个十分流行的编程界面,一个Socket由一个IP地址和一个端口号唯一确定。ps:在传统的UNIX环境下可以操作TCP/IP协议的接口不止Socket一个,Socket所支持的协议种类也不光TCP/IP一种,因此两者之间是没有必然联系的。在Java环境下,Socket编程主要是指基于TCP/IP协议的网络编程
  • 标签:实现 多线程 数据 socket

网络上的两个程序通过一个双向的通讯连接实现数据的交换,这个双向链路的一端称为一个Socket。Socket通常用来实现客户方和服务方的连接。Socket是TCP/IP协议的一个十分流行的编程界面,一个Socket由一个IP地址和一个端口号唯一确定。ps: 在传统的UNIX环境下可以操作TCP/IP协议的接口不止Socket一个,Socket所支持的协议种类也不光TCP/IP一种,因此两者之间是没有必然联系的。在Java环境下,Socket编程主要是指基于TCP/IP协议的网络编程。使用Socket进行Client/Server程序设计的一般连接过程是这样的:Server端Listen(监听)某个端口是否有连接请求,Client端向Server端发出Connect(连接)请求,Server端向Client端发回Accept(接受)消息。一个连接就建立起来了。Server端和Client端都可以通过Send,Write等方法与对方通信。对于一个功能齐全的Socket,都要包含以下基本结构,其工作过程包含以下四个基本的步骤:
  1. 创建Socket;
  2. 打开连接到Socket的输入/出流;
  3. 按照一定的协议对Socket进行读/写操作;
  4. 关闭Socket.

Socket普通应用

最普通的Socket应用在C/S上,服务端和客户端双方启动Socket相互通信,完成后关闭Socket。下面给出一个小例子:Server端:
CODE:

import java.io.*;
import java.net.*;public class Server {
public static void main(String args[]) {
try {
ServerSocket server = null;
try {
//创建一个ServerSocket在端口8888监听客户请求
server = new ServerSocket(8888);?
} catch (Exception e) {
System.out.println("can not listen to:" + e);?
}Socket socket = null;
try {
//使用accept()阻塞等待客户请求,有客户
//请求到来则产生一个Socket对象,并继续执行
socket = server.accept();?
} catch (Exception e) {
System.out.println("Error." + e);?
}String line;
BufferedReader is = new BufferedReader(new InputStreamReader(socket
.getInputStream())); //由系统标准输入设备构造BufferedReader对象
BufferedReader sin = new BufferedReader(new InputStreamReader(
System.in));//在标准输出上打印从客户端读入的字符串
System.out.println("Client:" + is.readLine());//从标准输入读入一字符串
line = sin.readLine();// 如果该字符串为 "bye",则停止循环
while (!line.equals("bye")) {
// 从Client读入一字符串,并打印到标准输出上
System.out.println("Client:" + is.readLine());line = sin.readLine();
}
is.close(); // 关闭Socket输入流
socket.close(); // 关闭Socket
server.close(); // 关闭ServerSocket
} catch (Exception e) {
System.out.println("Error:" + e);
}
}
}

Client端

CODE: import java.io.*;
import java.net.*;public class Client {
public static void main(String args[]) {
try {
//向172.16.199.153的8888端口发出客户请求
Socket socket = new Socket("172.16.199.153", 8888);//由系统标准输入设备构造BufferedReader对象
BufferedReader sin = new BufferedReader(new InputStreamReader(
System.in));//由Socket对象得到输出流,并构造PrintWriter对象
PrintWriter os = new PrintWriter(socket.getOutputStream());//由Socket对象得到输入流,并构造相应的BufferedReader对象
BufferedReader is = new BufferedReader(new InputStreamReader(socket
.getInputStream()));String readline;// 从系统标准输入读入一字符串
readline = sin.readLine(); //若从标准输入读入的字符串为 "bye"则停止循环
while (!readline.equals("bye")) {//将从系统标准输入读入的字符串输出到Server
os.println(readline);//刷新输出流,使Server马上收到该字符串
os.flush();//在系统标准输出上打印读入的字符串
System.out.println("Client:" + readline);//从Server读入一字符串,并打印到标准输出上
System.out.println("Server:" + is.readLine());//从系统标准输入读入一字符串
readline = sin.readLine();?
} //关闭Socket输出流
os.close();?
//关闭Socket输入流
is.close();?
//关闭Socket
socket.close();?
} catch (Exception e) {
System.out.println("Error" + e);?
}
}

以上的例子服务端只能和一个客户端进行通信,而实际应用的情况下可能是一个服务端运行一个长久运行的程序,可以接收不同的客户端发送的数据。应用线程池的实线,在服务端运行一个监听程序并管理一个线程池,一旦有客户端发送数据的请求,则往线程池中放入一个任务,由线程池中负责运行任务的线程去执行这个任务。

先给出线程池的管理类
CODE: package com.webex.globalwatch.mdp.common;import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;public class ThreadPoolManager {?
???
private static int DEFAULT_POOL_SIZE = 4;?
private List<WorkThread> threadPool;?
private Queue<Task> taskQueue;?
private int poolSize;?
???
public ThreadPoolManager(){?
?? ??? this(DEFAULT_POOL_SIZE);?
}?
???
public ThreadPoolManager(int poolSize){?
?? ??? if(poolSize <= 0){?
?? ?? ?? this.poolSize = DEFAULT_POOL_SIZE;?
?? ??? }else{?
?? ?? ?? this.poolSize = poolSize;?
?? ??? }?
?? ?? ???
?? ??? threadPool = new ArrayList<WorkThread>(this.poolSize);?
?? ??? taskQueue = new ConcurrentLinkedQueue<Task>();?
?? ?? ???
?? ??? startup();?
}?
???
/**??
??? * 启动线程池 开始处理任务??
??? */??
private void startup(){?
?? ??? System.out.println("启动工作线程。。。");?
?? ??? synchronized(taskQueue){?
?? ?? ?? for(int i = 0; i < DEFAULT_POOL_SIZE; i++){?
?? ?? ?? ?? WorkThread workThread = new WorkThread(taskQueue);?
?? ?? ?? ?? threadPool.add( workThread );?
?? ?? ?? ?? workThread.start();?
?? ?? ?? }?
?? ??? }?
?? ?? ???
}?
???
/**??
??? * 停止工作线程。工作线程不一定立即停止,只有在线程处于运行状态时会立即停止??
??? */??
public void shutdown(){?
?? ??? System.out.println("停止工作线程.");?
?? ??? synchronized(taskQueue){?
?? ?? ?? for(int i = 0; i < DEFAULT_POOL_SIZE; i++){?
?? ?? ?? ?? threadPool.get(i).shutdown();?
?? ?? ?? }?
?? ??? }?
}?
???
/**??
??? * 添加消息到队尾,?
??? */??
public void addTask(Task task){?
?? ??? synchronized(taskQueue){?
?? ?? ?? taskQueue.add(task);?
?? ?? ?? taskQueue.notifyAll(); ???
?? ??? }?
} ???
}?
线程池构造函数中,创建了一个WorkThread对象WorkThread 类
CODE: package com.webex.globalwatch.mdp.common;import java.util.Queue;public class WorkThread extends Thread{?
private boolean shutdown = false;?
private Queue<Task> queue;?
???
public WorkThread(Queue<Task> queue){?
?? ??? this.queue = queue;?
}?
???
public void run(){?
?? ??? while(!shutdown){?
?? ?? ?? synchronized(queue){ //获得对象锁 禁止其他线程访问?
?? ?? ?? ?? if(!queue.isEmpty()){?
?? ?? ?? ?? ?? ??? //处理任务?
?? ?? ?? ?? ?? ??? Task task = queue.poll();?
?? ?? ?? ?? ?? ??? task.execute();?
?? ?? ?? ?? }else{?
?? ?? ?? ?? ?? ??? try {?
?? ?? ?? ?? ?? ?? ?? queue.wait(); //释放锁 线程处于阻赛状态 等待notify唤醒?
?? ?? ?? ?? ?? ??? } catch (InterruptedException e) {?
?? ?? ?? ?? ?? ??? }?
?? ?? ?? ?? }?
?? ?? ?? }?
?? ??? }//end while?
}?
???
/**??
??? * 调用该方法后不一定会立即结束线程, 只有在线程处于运行状态且处理完当前任务后才结束??
??? */??
public void shutdown(){?
?? ??? shutdown = true;?
}?
}?
Task接口,很简单
CODE: package com.webex.globalwatch.mdp.common;public interface Task {?
void execute();?
}?
Task接口的一个实现,ReceiveTask CODE: package com.webex.globalwatch.mdp.meetinglogprocess;import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Socket;import com.webex.globalwatch.mdp.common.Task;public class ReceiveTask implements Task {private Socket socket = null;public ReceiveTask(Socket socket){
this.socket=socket;
}@Override
public void execute() {
// TODO Auto-generated method stub
try {
while (!socket.isClosed()) {
ObjectInputStream inStream = null;
try{
?? ??? inStream = new ObjectInputStream(socket.getInputStream());
?? ??? String temp = (String)inStream.readObject();
System.out.println(temp);?
}catch(IOException e){
socket.close();
} }
} catch (Throwable t) {
//t.printStackTrace();
System.out.println("has some error.");
//System.exit(0);
}
}
}?
调用的代码调用的代码(有些地方用到了Spring的BeanFactory)
CODE: ServerSocket sc = new ServerSocket(5600);?
ThreadPoolManager pool = (ThreadPoolManager)MDPBeanFactory.getBeanByName("ThreadPoolManager");
while(true){
Socket socket = null;
socket = sc.accept();
pool.addTask(new ReceiveTask(socket));
}?
启动主程序之后while(true)表示一直在监听。当有客户端通过Socket通过5600端口向Server端发送数据时,则往ThreadPool里放入这个Task输出如下: QUOTE: MDP System has been run.
启动工作线程。。。?
现在,写一个Client程序测试一下
CODE: import java.io.*;
import java.net.*;public class Client {
public static void main(String args[]) {
try {
//向172.16.199.153的5600端口发出客户请求
Socket socket = new Socket("172.16.199.153", 5600);for(int i=0; i<100; i++){
ObjectOutputStream os = new ObjectOutputStream(socket.getOutputStream());
os.writeObject("this is Client's "+ i + " data");
os.flush();?
//等待一秒钟再发下一条数据
Thread.sleep(1000);?
}
//关闭Socket
socket.close();?
} catch (Exception e) {
System.out.println("Error" + e);?
}
}
}?
把Client部署到几台电脑上测试一下先运行一个Client,Server端开始响应,输出:
。。。。。。
this is Client's 61 data
this is Client's 62 data
this is Client's 63 data
。。。。。。
再从另一台Client运行,则在第一个Client完成输出之后,才会做出响应,输出:
。。。。。。
this is Client's 99 data
a new client access data!
this is Client's 0 data
this is Client's 1 data
。。。。。。?
注意事项!Client通过ObjectOutputStream发送数据,Server端通过ObjectInputStream接收数据,ObjectOutputStream每次发送必须创建一个ObjectOutputStream对象,如
ObjectOutputStream os = new ObjectOutputStream(socket.getOutputStream());
?? ?? ?? ?? ?? ?? ?? ?? ?? ??? os.writeObject("this is Client's "+ i + " data");
?? ?? ?? ?? ?? ?? ?? ?? ?? ??? os.flush(); ?? ???
如果只创建一个ObjectOutputStream 对象,在服务端接收时,就会出现一个Invalid Stream head的Error. 这个问题困扰我昨天大半天的时间!!!

发表评论
用户名: 匿名