Java Socket学习笔记(三)- TCP服务端线程池_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java Socket学习笔记(三)- TCP服务端线程池

Java Socket学习笔记(三)- TCP服务端线程池

 2014/7/31 21:17:47  kingxss  程序员俱乐部  我要评论(0)
  • 摘要:一、服务端回传服务类:importjava.io.IOException;importjava.io.InputStream;importjava.io.OutputStream;importjava.net.Socket;importjava.util.logging.Level;importjava.util.logging.Logger;publicclassEchoProtocolimplementsRunnable{privatestaticfinalintBUFSIZE=32
  • 标签:笔记 学习 服务端 Java 服务 socket 学习笔记 线程

一、服务端回传服务类:

class="java">import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EchoProtocol implements Runnable {
    private static final int BUFSIZE = 32; // Size (in bytes) of I/O buffer
    private Socket clientSocket; // Socket connect to client
    private Logger logger; // Server logger

    public EchoProtocol(Socket clientSocket, Logger logger) {
        this.clientSocket = clientSocket;
        this.logger = logger;
    }

    public static void handleEchoClient(Socket clientSocket, Logger logger) {
        try {
            // Get the input and output I/O streams from socket
            InputStream in = clientSocket.getInputStream();
            OutputStream out = clientSocket.getOutputStream();

            int recvMsgSize; // Size of received message
            int totalBytesEchoed = 0; // Bytes received from client
            byte[] echoBuffer = new byte[BUFSIZE]; // Receive Buffer
            // Receive until client closes connection, indicated by -1
            while ((recvMsgSize = in.read(echoBuffer)) != -1) {
                out.write(echoBuffer, 0, recvMsgSize);
                totalBytesEchoed += recvMsgSize;
            }

            logger.info("Client " + clientSocket.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");
            
        } catch (IOException ex) {
            logger.log(Level.WARNING, "Exception in echo protocol", ex);
        } finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
            }
        }
    }

    public void run() {
        handleEchoClient(this.clientSocket, this.logger);
    }
}

?

?

二、每个客户端请求都新启一个线程的Tcp服务端:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Logger;

public class TCPEchoServerThread {

    public static void main(String[] args) throws IOException {
        // Create a server socket to accept client connection requests
        ServerSocket servSock = new ServerSocket(5500);

        Logger logger = Logger.getLogger("practical");

        // Run forever, accepting and spawning a thread for each connection
        while (true) {
            Socket clntSock = servSock.accept(); // Block waiting for connection
            // Spawn thread to handle new connection
            Thread thread = new Thread(new EchoProtocol(clntSock, logger));
            thread.start();
            logger.info("Created and started Thread " + thread.getName());
        }
        /* NOT REACHED */
    }
}

?

三、固定线程数的Tcp服务端:

?

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TCPEchoServerPool {
    public static void main(String[] args) throws IOException {
        int threadPoolSize = 3; // Fixed ThreadPoolSize

        final ServerSocket servSock = new ServerSocket(5500);
        final Logger logger = Logger.getLogger("practical");

        // Spawn a fixed number of threads to service clients
        for (int i = 0; i < threadPoolSize; i++) {
            Thread thread = new Thread() {
                public void run() {
                    while (true) {
                        try {
                            Socket clntSock = servSock.accept(); // Wait for a connection
                            EchoProtocol.handleEchoClient(clntSock, logger); // Handle it
                        } catch (IOException ex) {
                            logger.log(Level.WARNING, "Client accept failed", ex);
                        }
                    }
                }
            };
            thread.start();
            logger.info("Created and started Thread = " + thread.getName());
        }
    }
}

?

?

四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念),

1.线程池工具类:

import java.util.concurrent.*;

/**
 * 任务执行者
 * 
 * @author Watson Xu
 * @since 1.0.0 <p>2013-6-8 上午10:33:09</p>
 */
public class ThreadPoolTaskExecutor {

    private ThreadPoolTaskExecutor() {

    }

    private static ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
        int count;

        /* 执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable */
        public Thread newThread(Runnable r) {
            count++;
            Thread invokeThread = new Thread(r);
            invokeThread.setName("Courser Thread-" + count);
            invokeThread.setDaemon(false);// //????????????

            return invokeThread;
        }
    });

    public static void invoke(Runnable task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
        invoke(task, null, unit, timeout);
    }

    public static <T> T invoke(Runnable task, T result, TimeUnit unit, long timeout) throws TimeoutException,
            RuntimeException {
        Future<T> future = executor.submit(task, result);
        T t = null;
        try {
            t = future.get(timeout, unit);
        } catch (TimeoutException e) {
            throw new TimeoutException("Thread invoke timeout ...");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return t;
    }

    public static <T> T invoke(Callable<T> task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
        // 这里将任务提交给执行器,任务已经启动,这里是异步的。
        Future<T> future = executor.submit(task);
        // System.out.println("Task aready in thread");
        T t = null;
        try {
            /*
             * 这里的操作是确认任务是否已经完成,有了这个操作以后 
             * 1)对invoke()的调用线程变成了等待任务完成状态
             * 2)主线程可以接收子线程的处理结果
             */
            t = future.get(timeout, unit);
        } catch (TimeoutException e) {
            throw new TimeoutException("Thread invoke timeout ...");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return t;
    }
}

?

2.具有伸缩性的Tcp服务端:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import demo.callable.ThreadPoolTaskExecutor;


public class TCPEchoServerExecutor {

    public static void main(String[] args) throws IOException {
        // Create a server socket to accept client connection requests
        ServerSocket servSock = new ServerSocket(5500);

        Logger logger = Logger.getLogger("practical");
        
        // Run forever, accepting and spawning threads to service each connection
        while (true) {
            Socket clntSock = servSock.accept(); // Block waiting for connection
            //executorService.submit(new EchoProtocol(clntSock, logger));
            try {
                ThreadPoolTaskExecutor.invoke(new EchoProtocol(clntSock, logger), TimeUnit.SECONDS, 3);
            } catch (Exception e) {
            } 
            //service.execute(new TimelimitEchoProtocol(clntSock, logger));
        }
        /* NOT REACHED */
    }
}

?

?参考:

1.《Java TCP/IP Socket编程(原书第2版)》

发表评论
用户名: 匿名