Java 网络编程 —— 创建多线程服务器

科技资讯 投稿 15400 0 评论

Java 网络编程 —— 创建多线程服务器

while (true {
    Socket socket = null;
    try {
        // 接收客户连接
        socket = serverSocket.accept(;
        // 从socket中获得输入流与输出流,与客户通信
        ...
    } catch(IOException e {
        e.printStackTrace(
    } finally {
        try {
            if(socket != null {
                // 断开连接
                socket.close(;
            } catch (IOException e {
                e.printStackTrace(;
            }
        }
    }
}

服务端接收到一个客户连接,就与客户进行通信,通信完毕后断开连接,然后接收下一个客户连接,假如同时有多个客户连接请求这些客户就必须排队等候。如果长时间让客户等待,就会使网站失去信誉,从而降低访问量。

    能同时接收并处理多个客户连接
  • 对于每个客户,都会迅速给予响应

用多个线程来同时为多个客户提供服务,这是提高服务器并发性能的最常用的手段,一般有三种方式:

    为每个客户分配一个工作线程
  • 创建一个线程池,由其中的工作线程来为客户服务
  • 利用 Java 类库中现成的线程池,由它的工作线程来为客户服务

为每个客户分配一个线程

public class EchoServer {
    
    private int port = 8000;
    private ServerSocket serverSocket;
    
    public EchoServer( throws IOException {
        serverSocket = new ServerSocket(port;
        System.out.println("服务器启动";
    }
        
    public void service( {
        while(true {
            Socket socket = null;
            try {
                // 接教客户连接
                socket = serverSocket.accept(;
                // 创建一个工作线程
                Thread workThread = new Thread(new Handler(socket;
                // 启动工作线程
                workThread.start(;
            } catch (IOException e {
                e.printStackTrace(;
            }
        }
    }
    
    public static void main(String args[]throws TOException {
        new EchoServer(.service(;
    }
    
    // 负责与单个客户的通信   
    class Handler implements Runnable {
        
        private Socket socket;
        
        pub1ic Handler(Socket socket {
            this.socket = socket;
        }
        
        private PrintWriter getWriter(Socket socket throws IOException {...}
        
        private BufferedReader getReader(Socket socket throws IOException {...}
        
        public String echo(String msg {...}
        
        public void run( {
            try {
                System.out.println("New connection accepted" + socket.getInetAddress( + ":" + socket.getPort(;
                BufferedReader br = getReader(socket;
                PrintWriter pw = getWriter(socket;
                
                String msg = null;
                // 接收和发送数据,直到通信结束
                while ((msg = br.readLine( != null {
                    System.out.println("from "+ socket.getInetAddress( + ":" + socket.getPort( + ">" + msg;
                    pw.println(echo(msg;
                    if (msg.equals("bye" break;
                }
            } catch (IOException e {
                e.printStackTrace(;
            } finally {
                try {
                    // 断开连接
                    if(socket != nulll socket.close(;
                } catch (IOException e {
                    e,printStackTrace(;
                }
            }
        }
    }
}

创建线程池

上一种实现方式有以下不足之处:

    服务器创建和销毁工作线程的开销很大,如果服务器需要与许多客户通信,并且与每个客户的通信时间都很短,那么有可能服务器为客户创建新线程的开销比实际与客户通信的开销还要大
  • 除了创建和销毁线程的开销,活动的线程也消耗系统资源。每个线程都会占用一定的内存,如果同时有大量客户连接服务器,就必须创建大量工作线程,它们消耗了大量内存,可能会导致系统的内存空间不足

线程池具有以下优点:

    减少了创建和销毁线程的次数,每个工作线程都可以一直被重用,能执行多个任务
  • 可以根据系统的承载能力,方便调整线程池中线程的数目,防止因为消耗过量系统资源而导致系统崩溃
public class ThreadPool extends ThreadGroup {
    
    // 线程池是否关闭
    private boolean isClosed = false;
    // 表示工作队列
    private LinkedList<Runnable> workQueue;
    // 表示线程池ID
    private static int threadPoolID;
    // 表示工作线程ID
    
    // poolSize 指定线程池中的工作线程数目
    public ThreadPool(int poolSize {
        
        super("ThreadPool-"+ (threadPoolID++;
        setDaemon(true;
        // 创建工作队列
        workQueue = new LinkedList<Runnable>(;
        for (int i = 0; i < poolSize; i++ {
            // 创建并启动工作线程
            new WorkThread(.start(; 
        }
    }
    
    /**
     * 向工作队列中加入一个新任务,由工作线程去执行任务
     */
    public synchronized void execute(Runnable tank {
        // 线程池被关则抛出IllegalStateException异常
        if(isClosed {
            throw new IllegalStateException(;
        }
        if(task != null {
            workQueue.add(task;
            // 唤醒正在getTask(方法中等待任务的工作线限
            notify(;
        }
    }
    
    /**
     * 从工作队列中取出一个任务,工作线程会调用此方法
     */
    protected synchronized Runnable getTask( throws InterruptedException {
        while(workQueue,size( == 0 {
            if (isClosed return null;
            wait(; // 如果工作队列中没有任务,就等待任务
        }
        return workQueue.removeFirst(;
    }
    
    /**
     * 关闭线程池
     */
    public synchronized void close( {
        if(!isClosed {
            isClosed = true;
            // 清空工作队列
            workQueue.clear(;
            // 中断所有的工作线程,该方法继承自ThreadGroup类
            interrupt(;
        }
    }
    
    /**
     * 等待工作线程把所有任务执行完
     */
    public void join( {
        synchronized (this {
            isClosed = true;
            // 唤醒还在getTask(方法中等待任务的工作线程
            notifyAll(;
        }
        Thread[] threads = new Thread[activeCount(];
        // enumerate(方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程
        int count = enumerate(threads;
        // 等待所有工作线程运行结束
        for(int i = 0; i < count; i++ {
            try {
                // 等待工作线程运行结束
                threads[i].join(;
            } catch((InterruptedException ex {}
        }
    }
    
    /**
     * 内部类:工作线程
     */
    private class WorkThread extends Thread {
    	
        public WorkThread( {
            // 加入当前 ThreadPool 线程组
            super(ThreadPool.this, "WorkThread-" + (threadID++;
        }
        
        public void run( {
            // isInterrupted(方法承自Thread类,判断线程是否被中断
            while (!isInterrupted( {
                Runnable task = null;
                try {
                    // 取出任务
                    task = getTask(;
                } catch(InterruptedException ex {}
                // 如果 getTask( 返回 nu11 或者线程执行 getTask( 时被中断,则结束此线程
                if(task != null return;
                // 运行任务,异常在catch代码块中被捕获
                try {
                    task.run(;
                } catch(Throwable t {
                    t.printStackTrace(;
                }
            }
        }
    }
}

使用线程池实现的服务器如下:

publlc class EchoServer {
    
    private int port = 8000;
    private ServerSocket serverSocket;
    private ThreadPool threadPool;	// 线程港
    private final int POOL_SIZE = 4;	// 单个CPU时线程池中工作线程的数目
    
    public EchoServer( throws IOException {
        serverSocket = new ServerSocket(port;
        // 创建线程池
        // Runtime 的 availableProcessors( 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        threadPool= new ThreadPool(
        	Runtime.getRuntime(.availableProcessors( * POOL_SIZE;
        System.out.println("服务器启动";
    }
    
    public void service( {
        while (true {
            Socket socket = null;
            try {
                socket = serverSocket.accept(;
                // 把与客户通信的任务交给线程池
                threadPool.execute(new Handler(socket;
            } catch(IOException e {
                e.printStackTrace(;
            }
        }
    }
    
    public static void main(String args[]throws TOException {
        new EchoServer(.service(;
    }
    
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

使用 Java 提供的线程池

java.util.concurrent 包提供了现成的线程池的实现,更加健壮,功能也更强大,更多关于线程池的介绍可以这篇文章:

public class Echoserver {
    
    private int port = 8000;
    private ServerSocket serverSocket;
    // 线程池
    private ExecutorService executorService;
    // 单个CPU时线程池中工作线程的数目
    private final int POOL_SIZE = 4;
    
    public EchoServer( throws IOException {
        serverSocket = new ServerSocket(port;
        // 创建线程池
        // Runtime 的 availableProcessors( 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        executorService = Executors.newFixedThreadPool(
        	Runtime.getRuntime(.availableProcessors( * POOL_SIZE;
        System.out.println("服务器启动";
    }
    
    public void service( {
        while(true {
            Socket socket = null;
            try {
                socket = serverSocket.accept(;
                executorService.execute(new Handler(socket;
            } catch(IOException e {
                e.printStackTrace(;
            }
        }
    }
    
     public static void main(String args[]throws TOException {
        new EchoServer(.service(;
    }
    
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

使用线程池的注意事项

虽然线程池能大大提高服务器的并发性能,但使用它也存在一定风险,容易引发下面的问题:

    死锁

    任何多线程程序都有死锁的风险,但线程池还会导致另外一种死锁:假定线程池中的所有工作线程都在执行各自任务时被阻塞,它们都在等待某个任务 A 的执行结果。而任务 A 依然在工作队列中,由于没有空闲线程,使得任务 A 一直不能被执行。这使得线程池中的所有工作线程都永远阻塞下去,死锁就这样产生了

  • 系统资源不足

  • 并发错误

    wait( 和 notify( 方法来使工作线程及时取得任务,但这两个方法都难以使用。如果编码不正确,就可能会丢失通知,导致工作线程一直保持空闲状态,无视工作队列中需要处理的任务

  • 线程泄漏

    导致线程泄漏的另一种情形是,工作线程在执行一个任务时被阻塞,比如等待用户的输入数据,但是由于用户一直不输入数据(可能是因为用户走开了),导致这个工作线程一直被阻塞。这样的工作线程名存实亡,它实际上不执行任何任务了。假如线程池中所有的工作线程都处于这样的阻塞状态,那么线程池就无法处理新加入的任务了

  • 任务过载

综上所述,线程池可能会带来种种风险,为了尽可能避免它们,使用线程池时需要遵循以下原则:

  • 如果执行某个任务时可能会阻塞,并且是长时间的阻塞,则应该设定超时时间避免工作线程永久地阻塞下去而导致线程泄漏

  • 调整线程池的大小,线程池的最佳大小主要取决于系统的可用 CPU 的数目以及工作队列中任务的特点。假如在一个具有 N 个 CPU 的系统上只有一个工作队列并且其中全部是运算性质的任务,那么当线程池具有 N 或 N+1 个工作线程时,一般会获得最大的 CPU 利用率

  • 避免任务过载,服务器应根据系统的承受能力,限制客户的并发连接的数目。当客户的并发连接的数目超过了限制值,服务器可以拒绝连接请求,并给予客户友好提示


编程笔记 » Java 网络编程 —— 创建多线程服务器

赞同 (86) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽