线程的基本操做

2021年11月22日 阅读数:3
这篇文章主要向大家介绍线程的基本操做,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

前言

高并发开发的基础即是看开发者如何很好地去掌控线程,线程安全,数据安全以及一次请求处理的效率优化,本篇文章为如何去简单地操做线程,同时引出下一篇关于线程池的文章的讲解。java

在下面的示例程序中,我会用到许多的工具类方法来避免代码的冗余以及耦合度太高。工具类方法过多便以伸缩代码块的形式呈现:node

工具类定义

public class ThreadUtil  {

    static public class CustomThreadFactory implements ThreadFactory {

        private AtomicInteger threadNo = new AtomicInteger(1);
        private String threadName;
        private String tmp;

        public CustomThreadFactory(String name){
            this.tmp = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            this.threadName = this.tmp+threadNo.get();
            threadNo.incrementAndGet();
            Print.tco("新建了一个线程,名称为: "+this.threadName);
            Thread thread = new Thread(r,threadName);
            return thread;
        }
    }

    public static void shutdownThreadPoolGracefully(ExecutorService threadPool){

        //若已经关闭则返回
        if (!(threadPool instanceof ExecutorService) || threadPool.isTerminated()){
            return;
        }
        try {
            threadPool.shutdown();
        }catch (SecurityException e){
            return;
        }catch (NullPointerException e){
            return;
        }
        try {
            //等待60秒,等待线程池中的任务完成执行
            if (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
                threadPool.shutdownNow();
                if (threadPool.awaitTermination(60,TimeUnit.SECONDS)){
                    System.err.println("线程池任务未正常执行结束");
                }
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
        }
        try {
            if (!threadPool.isTerminated()) {
                for (int i = 0; i < 100; i++) {
                    if (threadPool.awaitTermination(10,TimeUnit.MILLISECONDS)){
                        break;
                    }
                    threadPool.shutdownNow();
                }
            }
        }catch (InterruptedException e){
            System.err.println(e.getMessage());
        }catch (Throwable e){
            System.err.println(e.getMessage());
        }
    }

    //懒汉式单例建立线程池:用于执行定时,顺序执行
    static class SeqOrScheduledTargetThreadPoolLazyHolder{
        static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1,
                new CustomThreadFactory("seq"));

        static {
            Runtime.getRuntime().addShutdownHook(
                    new ShutdownHookThread("定时和顺序任务线程池",()->{

                        shutdownThreadPoolGracefully(EXECUTOR);
                        return null;
                    })
            );
        }
    }

    //CPU核数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    //IO处理线程数
    private static final int IO_MAX = Math.max(2,CPU_COUNT*2);

    /**
     * 空闲保活时限,单位秒
     */
    private static final int KEEP_ALIVE_SECONDS = 30;

    /**
     * 有界队列size
     */
    private static final int QUEUE_SIZE = 128;

    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT;

    //懒汉式单例建立线程池:用于IO密集型任务
    private static class IoIntenseTargetThreadPoolLazyHolder{

        private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                IO_MAX,
                IO_MAX,
                KEEP_ALIVE_SECONDS,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE),
                new CustomThreadFactory("io")
        );

        static {
            EXECUTOR.allowCoreThreadTimeOut(true);
            Runtime.getRuntime().addShutdownHook(
                    new ShutdownHookThread("IO密集型任务线程池", new Callable() {
                        @Override
                        public Object call() throws Exception {
                            shutdownThreadPoolGracefully(EXECUTOR);
                            return null;
                        }
                    }));
        }
    }

    //懒汉式单例建立线程池:用于CPU密集型任务
    private static class CpuIntenseTargetThreadPoolLazyHolder{

        private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                MAXIMUM_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_SECONDS,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE),
                new CustomThreadFactory("cpu")
        );

        static {
            EXECUTOR.allowCoreThreadTimeOut(true);
            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("CPU密集型任务线程池", new Callable() {
                @Override
                public Object call() throws Exception {
                    shutdownThreadPoolGracefully(EXECUTOR);
                    return null;
                }
            }));
        }
    }

    private static final int MIXED_MAX = 128;//最大线程

    private static final String MIXED_THREAD_AMOUNT = "thread.amount";

    private static class MixedTargetThreadPoolLazyHolder{
        //首先从环境变量 thread.amount中获取预配置的线程数
        //若是没有对thread.amount进行配置,就使用常量MIXED_MAX做为线程数
        private static final int max = (null != System.getProperty(MIXED_THREAD_AMOUNT) ?
                Integer.parseInt(System.getProperty(MIXED_THREAD_AMOUNT)) : MIXED_MAX);

        public static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
                max,
                max,
                KEEP_ALIVE_SECONDS,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE),
                new CustomThreadFactory("mixed")
        );

        public MixedTargetThreadPoolLazyHolder(){
            System.out.println("Mix类建立啦");
        }

        static {
            EXECUTOR.allowCoreThreadTimeOut(true);
            Runtime.getRuntime().addShutdownHook(
                    new ShutdownHookThread("混合型任务线程池", new Callable() {
                        @Override
                        public Object call() throws Exception {
                            shutdownThreadPoolGracefully(EXECUTOR);
                            return null;
                        }
                    })
            );
        }
    }

    public static ThreadPoolExecutor getMixedTargetThreadPool(){
       return ThreadUtil.MixedTargetThreadPoolLazyHolder.EXECUTOR;
    }

    public static String getCurThreadName(){
        return Thread.currentThread().getName();
    }


    public static void sleepMilliSeconds(int millisecond){
        LockSupport.parkNanos(millisecond*1000L*1000L);
    }


    public static void execute(String cft){
        synchronized (System.out){
            System.out.println(cft);
        }
    }

    public static Thread getCurThread(){
        return Thread.currentThread();
    }

    public static Boolean getCyrThreadDaemon(){
        return getCurThread().isDaemon();
    }

    public static void sleepSeconds(int second){
        LockSupport.parkNanos(second * 1000L * 1000L * 1000L);
    }
}


若是读者要以笔者的代码来演示一下各个功能的话,建议导包形式为:
import static *.ThreadUtil.*;redis

工具类中的ShutdownHookThread类:安全

package com.qzlnode.util;

import java.util.concurrent.Callable;

public class ShutdownHookThread extends Thread{

    private final Callable callable;

    private volatile boolean target = false;

    public ShutdownHookThread(String name, Callable callable) {
        super(name);
        this.callable = callable;

    }

    @Override
    public void run() {
        synchronized (this){
            try {
                if (!target){
                    target = true;
                    callable.call();
                }
            }catch (Exception e){
                System.err.println(e.getMessage());
            }
        }
    }
}

为线程设置名称

Thread类为设置线程的名称提供了三种方式:多线程

  • 在新建线程时,能够经过Thread类构造方法去初始化新建线程的名称。
  • 能够在新建线程后,在启动线程以前使用Thread类提供的setName方法为线程设置名称。
  • 在前面两种方法都不用的状况下,系统会自动为线程设置名称。默认是Thread-i。i为你顺序建立的线程的序号

在线程执行用户代码逻辑块的时候,代码逻辑块能够有setName()方法,容许在线程运行的过程当中,动态地为当前运行的线程设置名称。架构

不建议为线程取相同的名称,前面的文章提到线程的名称实际上是给用户看的,起相同的名称容易在用户使用jstack工具查看线程时迷糊。并发

简单线程名称操做的例子ide

public class ThreadNameDemo {

    private static int SLEEP_GAP = 500;

    static class RunTarget implements Runnable{



        @Override
        public void run() {
            Print.tco(getCurThreadName()+" doing~~~");
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(getCurThreadName()+" 执行完成.");
        }
    }

    public static void main(String[] args) {
        RunTarget target = new RunTarget();
        new Thread(target).start();
        new Thread(target).start();
        new Thread(target, "线程-A").start();
        new Thread(target,"线程-B").start();
        sleepMilliSeconds(Integer.MAX_VALUE);
    }
}

咱们能够来看一下结果:高并发

当咱们以构造方法的方式为新建线程设置名称时,咱们在运行线程获取当前线程名称,就会看到咱们本身设置的名称,但当咱们并无为新建线程作出设置名称的动做时,获取到的就是系统自动给咱们赋的名称:Thread-i。工具

让线程休息一会

若是你有接触过线程池的话,就知道每个用户代码逻辑块就是一个工做,我很喜欢把线程看作是流水线员工,它不知疲惫地处理着手中的工做,只有主管(调度程序)不给它继续工做下去(没有得到到CPU时间片)了,它才停下来但心里仍是期待着下一次的分配来处理手上的工做。线程那么地累,不如咱们人为地让它休息一会。

sleep的做用是让目前正在执行的线程休眠,就线程状态来讲就是从执行状态变为限时等待状态。咱们从Thread来查看sleep究竟有几种用法。

  • public static native void sleep(long millis) throws InterruptedException;
  • public static void sleep(long millis,int nanos) throws InterruptedException;

从Thread类中咱们能够看到有两个sleep重载方法,一个是让线程休息用户自定的毫秒时间,另外一个则是让线程休息用户自定的毫秒加纳秒时间。同时咱们能够看到每个方法都抛出了InterruptedException异常,这也要求咱们要对此异常进行捕获并处理。

线程在sleep的时候,线程状态处于限时等待状态,在等待状态中,操做系统并不会为线程分配CPU时间片,而时间结束,中途没有如interrupt唤醒操做的话,线程状态会自动变为就绪状态,等待操做系统为其分配CPU时间片。

接下来咱们来写个例子:

public class Sleep {

    static class SleepThread extends Thread{
        static int threadNo = 1;

        public SleepThread() {
            super("thread-"+threadNo);
            threadNo++;
        }

        @Override
        public void run() {
            Print.tco("这个线程要开始睡觉咯~~~"+getState());
            sleepSeconds(10);//每一个线程休息10秒
            Print.tco("这个线程睡起来了~~~"+getState());
        }
    }

    public static void main(String[] args) {
        Thread thread = null;
        for (int i = 0; i < 5; i++) {
            thread = new SleepThread();
            thread.start();
        }
        Print.tco(getCurThreadName()+" 执行结束.");
    }
}

在程序执行时,咱们使用Jstack工具查看JVM中的线程状态,从下面的第一张提咱们能够很明显地看到线程处于TIME_WAITING状态,随着程序运行结束,咱们发现输出在控制台中的结果显示每个线程都是Runnable状态。这是正常的,只有线程获取到CPU时间片的时候才会运行Print.tco()语句,此时的线程确定是运行状态。

但须要注意的是:当线程休息时间到了后,线程的状态应该是就绪状态,等到操做系统给了线程CPU时间片才会去输出这个线程睡起来了这个语句,此时线程的状态确定也是运行状态(Runnable)。

叫醒线程的美梦

忽视线程池技术,一个线程何时执行完成后退出呢?这个问题只有线程本身能知道,当咱们不当心未给线程设置睡醒的时间,线程何时才能退出WAITING状态呢?如今咱们来介绍一下interrupt方法,interrupt与stop方法不一样在于:

  • stop方法就像你写文章的时候把你电脑电源拔了,它会致使数据不安全,数据的不一致。
  • interrupt方法并非直接中断线程而是将线程设置为中断标志位,用户能够循环检查这个状态,而后作出相应的处理。

如今咱们来讲明一下两种场景:

(1) 当线程处于BLOACKED状态时,使用interrupt方法,会当即退出阻塞,并抛出InterruptedException异常,此时咱们能够捕获该异常而后作出一些处理,而后让线程退出。固然事先线程得准备好处理InterruptedException异常的准备。通常来讲当咱们调用无限时的sleep、wait、join操做时,线程会进入到阻塞状态。

为何使用interrupt方法只是设置中断标志位却可以使阻塞退出,而且抛出InterruptException异常,咱们从JDK源码解答:

    public void interrupt() {
        if (this != Thread.currentThread()) {
            checkAccess();

            // thread may be blocked in an I/O operation
            synchronized (blockerLock) {
                Interruptible b = blocker;
                if (b != null) {
                    interrupted = true;
                    interrupt0();  // inform VM of interrupt
                    b.interrupt(this);
                    return;
                }
            }
        }
        interrupted = true;
        // inform VM of interrupt
        interrupt0();
    }
  • 首先进入interrupt方法,会判断你要唤醒的线程对象是否是等于当前得到到CPU时间片正在运行的线程对象,若是不是则先获取安全管理文件检查你的权限。
    private volatile Interruptible blocker;
    private final Object blockerLock = new Object();
  • 后得到锁 (咱们能够看到blockerLock是被final修饰的,也就是说后面线程要访问blockerLock变量的话,都会进入阻塞状态),执行同步代码块,会给b赋值一个Interruptible类型的bloack。( 咱们也能够看到block是由volatile修饰的,它是轻量级锁,它保证了共享变量在多线程的可视性。)

什么是Interruptible类型,从JDK的注释中咱们知道它是能够中断有着可中断I/O操做的线程的对象。sleep、wait、join方法就是能够中断的I/O操做,所以这个b并非空的。

  • 因为运用无限时的sleep、wait、join等可中断I/O操做方法,所以b不是空,进入if代码块,首先将boolean类型的interrupted设置为ture,方便后面清除中断标志位。后调用interrupt0方法设置中断标志位,最后调用Interruptible接口的interrupt方法去真正中断线程,并接收一个InterruptedException异常,也就是说线程会抛出一个InterruptedException异常。

(2) 从上面的源码读下去,咱们得知当线程正常运行时,使用interrupt方法,仅仅会设置中断标志位,不会进入到里面的if代码块。因此用户程序能够在适当位置经过调用isInterrupted()方法来查看本身是否被中断,并退出线程。

这里咱们能够一个例子来演示:

    @Test
    @DisplayName("用户程序自动检测Interrupt属性来判断是否退出线程")
    public void testInterrupted2() throws InterruptedException {
        Thread thread = new Thread(){//匿名类

            @Override
            public void run() {
                Print.tco("我起了");
                //一直循环
                while (true){
                    Print.tco(isInterrupted());
                    sleepMilliSeconds(5000);

                    if (isInterrupted()){
                        Print.tco("我被秒了");
                        return;
                    }
                }
            }
        };
        thread.start();
        sleepSeconds(2);//等待2秒
        thread.interrupt();//中断线程
        sleepSeconds(2);//等待2秒
        thread.interrupt();
    }

这里@Test使用是Junit5版本,这个版本功能更丰富,也兼容其余检测引擎。

一块儿工做的线程俩

有时候一个线程要依赖于其余的线程来完成本身的工做,具体的依赖为:一个线程须要将另外一个线程的执行流程合并到本身的执行流程中,这就涉及到了线程的合并。在Thread类中提供了join方法来让线程之间进行合并,咱们进入Thread类来看一下其方法的定义:

  • public final synchronized void join(final long millis) throw InterruptedException;
  • public final synchronized void join(long millis, int nanos) throw InterruptedException;
  • public final void join() throws InterruptedException;

咱们能够看到有三种重载的join()方法。

  • 调用第一种方法,调用线程会进入TIME_WAITING状态,直到合并的线程完成了本身执行的任务或者等待合并的线程执行了mills(毫秒)的时间,调用线程才会回到就绪状态,直到被分配到CPU时间片,才会执行本身后面的用户代码逻辑块。
  • 调用第二种方法,调用线程会进入TIME_WAITING状态,直到合并的线程完成了本身执行的任务或者等待合并的线程执行了mills(毫秒)+nanos(纳秒)的时间,调用线程才会回到就绪状态,直到被分配到CPU时间片,才会执行本身后面的用户代码逻辑块。
  • 调用第三种方法,调用线程会进入WAITING状态,直到合并的线程完成了本身执行的任务,调用线程才会回到就绪状态,直到被分配到CPU时间片,才会执行本身后面的用户代码逻辑块。

join()方法是实例方法,须要使用被合并的线程的句柄(引用对象)去调用。

咱们写一个简单的代码去演示其功能。

public class Join {

    public static final int SLEEP_GAP = 5000;//睡眠时长

    public static final int MAX_TURN = 50;//睡眠次数

    static class SleepThread extends Thread {

        static int threadNo = 0;

        public SleepThread() {
            super("sleepThread-" + threadNo++);
        }

        @Override
        public void run() {
            Print.tco(getName() + " 进行睡眠.");
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(getName() + " 执行结束.")
    }

    public static void main(String[] args) {
        Thread thread1 = new SleepThread();
        Print.tco("启动tread1");
        thread1.start();
        try {
            thread1.join();//合并线程1,不限时
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Print.tco("启动thread2");
        Thread thread2 = new SleepThread();
        thread2.start();
        try {
            thread2.join(1000);//限时合并,限时1秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Print.tco("线程运行结束.");
    }
}

让出机会的线程

当前执行的线程舍己为人,认为本身的任务还没那么重要,或者说运行到某个阶段发现当前的任务不知足本身的理想抱负,因而主动让出CPU时间片,让本身的状态变为就绪状态,此时的调度程序就会从新从线程们中选一个线程去分配那位有着舍己为人精神的线程留下来的CPU时间片。请注意,那个舍己为人的线程并非丧失了被操做系统分配CPU时间片的权利,也就是说,即便它让出了机会,在操做系统从新调度线程时,也是会有必定概率继续分配给那位舍己为人线程CPU时间片的。

也就是说,yield()操做和sleep(),wait(),join()操做不同,它不会使得当前线程进入限时等待状态,而是变为就绪状态,依然拥有着被分配到CPU时间片的权力。

这个yield()方法在Thread类中只有一个版本,因此读者能够本身去尝试写程序来体验一下yield()方法的特性和功能,这里就不贴笔者写的代码了。

redis单线程架构尽可能在明后天写出来。