【高并发程序设计】 ThreadLocal

mac2024-01-31  42

上一篇【高并发程序设计】锁优化  探讨的是通过控制资源访问来保证线程安全。这一篇我们换个思路,通过增加资源来保证线程的安全: TreadLocal。

1. 简单使用

顾名思义, ThreadLocal是线程的局部变量, 被线程独享, 那么自然而然其是线程安全的。首先我们看一个简单的例子, 看一下下面的代码。 

public class ThreadLocalDemo { private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static class ParseDate implements Runnable { private int i; public ParseDate(int i) { this.i = i; } @Override public void run() { try { Date date = sdf.parse("2019-10-31 00:10:" + i % 60); System.out.println(i + ":" + date); } catch (ParseException e) { System.out.println(e.toString()); e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 50000; i++) { executor.submit(new ParseDate(i)); } } }

上面这段代码在执行过程中很可能会得到一些异常(如NumberFormatException), 其原因在于SimpleDateFormat类的parse方法并不是线程安全的, 因此多线程情况下共享这个对象是有问题的, 故可以使用ThreadLocal对其进行如下的改写。

public class ThreadLocalDemo { private static final ThreadLocal<SimpleDateFormat> tlSdf = new ThreadLocal<>(); public static class ParseDate implements Runnable { private int i; public ParseDate(int i) { this.i = i; } @Override public void run() { try { if (Objects.isNull(tlSdf.get())) { tlSdf.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); } Date date = tlSdf.get().parse("2019-10-31 00:10:" + i % 60); System.out.println(i + ":" + date); } catch (ParseException e) { System.out.println(e.toString()); e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 50000; i++) { executor.submit(new ParseDate(i)); } } }

可以看到,为每个线程分配一个对象的工作并非由ThreadLocal来完成的, 而是需要应用测代码来保证。若应用测为ThreadLocal设置的是同一个对象, 那么依然无法保证线程安全,ThreadLocal仅仅起到了简单容器的作用。

2. 底层原理

本小节从源码入手来探讨ThreadLocal的底层实现。

先来看看set()方法的源码:

ThreadLocalMap getMap(Thread t) { return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }

set()方法先获取到了当前线程, 然后调用getMap()方法获取当前线程的threadLocals对象(其是个ThreadLocalMap类型对象),该map存在, 则将threadlocal对象和value分别做为key/value设置到map中; 不存在, 则new一个新ThreadLocalMap对象, 并将threadlocal对象和value作出初始值设置到map中。

再来看看get方法:

protected T initialValue() { return null; } private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }

get()方法依然是先从当前线程中获取ThreadLocalMap对象, 若map为null, 则对其进行初始化操作: 新创建一个map并赋值给当前线程, 并将当前threadLocal对象作为key, null作为value放入map, 并返回null;  若存在, 则使用当前theadLocal对象从map中获取Entry, 并将Entiry的value作为返回值return回去。 

由此上面的源码可见, 每个线程都维护着自己的ThreadLocalMap对象, 用以存储自己线程内使用到的相关对象(value), 自己线程中的ThreadLocal, 别的线程是无法访问到的。 但若ThreadLocal中的对象是一个对象, 就没办法保证线程安全了。 另一个要注意点是, 若我们使用线程池来进行线程管理, 线程是不会被释放掉的, 这也就意味着ThreadLocal对象不会被释放掉, 那么若 其内部存储了大对象, 很有可能会引起内存泄漏。

 

下面我们来看看上文源码中一直提到的TreadLocalMap类, 可以将其理解为一个类似HashMap的容器, 我们来看看其内部处理, 其内部使用了一个Entry数组来存储ThreadLocal以及value:

static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } private Entry[] table;

Entry类继承了WeakReference类, 其对于ThreadLocal的引用是个弱引用, 这就意味着, 当线程中存储的threadLocal对象失去了强引用时, 即便线程中仍然保留着其引用(实际上是弱引用), 该ThreadLocal对象仍然可能会被回收, 如下图:

 

 

3. 实际应用--动态数据源/表的选择

数据库开发过程中, 经常会有分库分表的需求,常用方案无外乎以下几种:

1. 使用三方插件/服务(如当当的sharding-jdbc、阿里的mycat等);

2. 借助spring.jdbc动态数据源以及mybatis拦截器纯手写实现, 此方法经常会用到ThreadLocal, 下文将详细介绍。 

spring.jdbc中的AbstractRoutingDataSource类提供了动态配置/使用数据源的方案, 其可设置的字段如下: 

public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean { private Map<Object, Object> targetDataSources; private Object defaultTargetDataSource; private boolean lenientFallback = true; private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup(); private Map<Object, DataSource> resolvedDataSources; private DataSource resolvedDefaultDataSource;

 其中通过设置targetDataSources, 可以指定应用程序所连接的所有数据源, 并通过覆写determineCurrentLookupKey()方法, 由应用程序外部根据具体场景决定使用的数据源Key(前面写到targetDataSources中的key), 从而动态确定数据源:

protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; } /** * Determine the current lookup key. This will typically be * implemented to check a thread-bound transaction context. * <p>Allows for arbitrary keys. The returned key needs * to match the stored lookup key type, as resolved by the * {@link #resolveSpecifiedLookupKey} method. */ protected abstract Object determineCurrentLookupKey();

下面的是继承AbstractRoutingDataSource来实现动态数据源的具体代码, 供参考:

public class DynamicDataSource extends AbstractRoutingDataSource { public static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); /** * 主从数据源的对应关系,注意:添加数据源必须在此配置对应主从关系 */ private static Map<String, String> masterSlaveDataSourceMap = Maps.newHashMap(); static { //主从数据源的对应关系,注意:添加数据源必须在此配置对应主从关系 masterSlaveDataSourceMap.put("dataSourceM0", "dataSourceS0"); masterSlaveDataSourceMap.put("dataSourceM1", "dataSourceS1"); } public static <T> T executeSurroundDs(String dataSourceName, boolean useMaster, DynamicDsCallback<T> callback) throws Throwable { // 判断上层是否没有设置数据源 boolean isPreNotSetDataSource = isNotSetDataSource(); try { // 如果上层没有设置数据源, 此处根据主从进行数据源的设置 // 如果上层设置了数据源, 此处略过数据源的设置 if (isPreNotSetDataSource) { if (useMaster) { useMaster(dataSourceName); } else { useSlave(dataSourceName); } } // 进行业务逻辑的执行后返回结果 return callback.execute(); } catch(Throwable th) { throw th; } finally { // 上层没有设置数据源, 退出时就需要清除数据源了. if (isPreNotSetDataSource) { clearUsed(); } } } /** * 使用主数据库 * @param masterDataSourceName 主数据库的数据源名称 */ private static void useMaster(String masterDataSourceName) { contextHolder.set(masterDataSourceName); } /** * 使用从数据库 * @param masterDataSourceName 主数据库的数据源名称 */ private static void useSlave(String masterDataSourceName) { contextHolder.set(masterSlaveDataSourceMap.get(masterDataSourceName)); } public static Object getContextHolderContent() { return contextHolder.get(); } /** * 是否沒有set了dataSource * @return */ private static boolean isNotSetDataSource() { return getContextHolderContent() == null; } /** * 清除数据源设置 */ private static void clearUsed() { contextHolder.set(null); } @Override protected Object determineCurrentLookupKey() { //从threadlocal中获取当前线程正在使用的数据源 return contextHolder.get(); } }

若使用的是spring.mvc, 在配置文件中设置动态数据源:

<bean id="dataSource" class="com.guomingr.course.sharding.datasource.DynamicDataSource"> <property name="targetDataSources"> <map> <entry key="dataSourceM0" value-ref="dataSourceM0"/> <entry key="dataSourceS0" value-ref="dataSourceS0"/> <entry key="dataSourceM1" value-ref="dataSourceM1"/> <entry key="dataSourceS1" value-ref="dataSourceS1"/> </map> </property> <property name="defaultTargetDataSource" ref="dataSourceM0"/> </bean>

应用程序的逻辑代码中, 根据不同情况不同需求, 来动态设置数据源名称。


参考:

《实战Java高并发程序设计》

最新回复(0)