Steps
- 导入依赖
- 重写Apache Commons Pools2 的 方法
- 测试
导入依赖
我还用了 Hutool 的包,和SpringBoot的基础jar包
1 2 3 4 5 6 7 8 9 10 11 12
| <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.55</version> </dependency> </dependencies>
|
重写Apache Commons Pools2 的 方法
所有的类

- 继承BasePooledObjectFactory 这个类,实现我们自己的sftp 工厂 (SftpPooledObjectFactory)
主要重写下面四个方法
- create :创建我们需要的连接对象
- warp :将我们创建的对象,包装成PooledObject对象
- 另外两个顾名思义
总之这个 Factory 是 抽象工厂方法的实现,对 sftp 连接对象的生命周期进行管理
用了 ConfigurationProperties 去配置 sftp 连接的参数
- 继承ObjectPool对象
实现 连接池的配置,具体配置代码,还有一个拒绝策略
- 配置文件

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Slf4j @Component public class SftpPooledObjectFactory extends BasePooledObjectFactory<ChannelSftp> {
public SftpPooledObjectFactory() {}
@Override public ChannelSftp create() throws Exception { return sftpCreate(); }
@Override public PooledObject<ChannelSftp> wrap(ChannelSftp obj) { return new DefaultPooledObject<>(obj); }
@Override public void destroyObject(PooledObject<ChannelSftp> p) throws Exception { ChannelSftp object = p.getObject(); if (ObjectUtil.isNotNull(object)||(object.isConnected())) { object.disconnect(); super.destroyObject(p); } }
@Override public boolean validateObject(PooledObject<ChannelSftp> p) { ChannelSftp object = p.getObject(); return object.isConnected(); }
private ChannelSftp sftpCreate() throws JSchException { SftpContractProperties properties = SpringUtil.getBean("sftpContractProperties"); JSch jsch = new JSch(); Session session = jsch.getSession(properties.getUsername(), properties.getHostname(), properties.getPort()); session.setPassword(properties.getPassword());
Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); int sftpTimeout = 5000; session.setTimeout(sftpTimeout); session.connect(); log.debug("sftp session connected");
log.debug("opening channel"); ChannelSftp channel = (ChannelSftp) session.openChannel("sftp"); channel.connect(); log.debug("connected successfully"); return channel;
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| @Slf4j @Component public class SftpPool implements ObjectPool<ChannelSftp> { private GenericObjectPool<ChannelSftp> sftpConnectPool;
private final SftpPooledObjectFactory factory;
@Autowired public SftpPool(SftpPooledObjectFactory factory) { this.factory = factory;
AbandonedConfig abandonedConfig = new AbandonedConfig(); abandonedConfig.setRemoveAbandonedOnMaintenance(true); abandonedConfig.setRemoveAbandonedOnBorrow(true); this.sftpConnectPool = new GenericObjectPool<>(factory, getPoolConfiguration(), abandonedConfig); }
@Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { sftpConnectPool.addObject(); }
@Override public ChannelSftp borrowObject() throws Exception, NoSuchElementException, IllegalStateException { return sftpConnectPool.borrowObject(); }
@Override public void clear() throws Exception, UnsupportedOperationException { sftpConnectPool.clear(); }
@Override public void close() { sftpConnectPool.close(); }
@Override public int getNumActive() { return sftpConnectPool.getNumActive(); }
@Override public int getNumIdle() { return sftpConnectPool.getNumIdle(); }
@Override public void invalidateObject(ChannelSftp obj) throws Exception { sftpConnectPool.invalidateObject(obj); }
@Override public void returnObject(ChannelSftp obj) throws Exception { sftpConnectPool.returnObject(obj); }
private GenericObjectPoolConfig<ChannelSftp> getPoolConfiguration() {
SftpPoolProperties poolProperties = SpringUtil.getBean("sftpPoolProperties");
GenericObjectPoolConfig<ChannelSftp> config = new GenericObjectPoolConfig<>(); config.setMinIdle(poolProperties.getMinIdle()); config.setMaxIdle(poolProperties.getMaxIdle()); config.setMaxTotal(poolProperties.getMaxActive()); config.setMaxWait(Duration.ofMillis(poolProperties.getMaxWait())); config.setTestOnBorrow(poolProperties.isTestOnBorrow()); config.setTestOnReturn(poolProperties.isTestOnReturn()); config.setTestWhileIdle(poolProperties.isTestWhileIdle());
config.setEvictionPolicyClassName(SftpEvictionPolicy.class.getName());
return config; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component("sftpEvictionPolicy") public class SftpEvictionPolicy implements EvictionPolicy<ChannelSftp> { @Override public boolean evict(EvictionConfig config, PooledObject<ChannelSftp> underTest, int idleCount) { try { if (!underTest.getObject().isConnected()) { log.warn("connect time out, evict the connection. time={}",System.currentTimeMillis() - underTest.getLastReturnTime()); return true; } }catch (Exception e){ return true; } return false; } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @ConfigurationProperties(prefix = SftpContractProperties.SFTP_PREFIX) @Component public class SftpContractProperties { final static String SFTP_PREFIX = "zxh.sftp";
private String username; private String password; private String hostname; private int port; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @ConfigurationProperties(SftpPoolProperties.POOL_CONFIG_PREFIX) @Component @Data public class SftpPoolProperties { public static final String POOL_CONFIG_PREFIX = "zxh.sftp.pool";
private int minIdle; private int maxIdle; private int maxActive; private long maxWait; private boolean testOnBorrow; private boolean testOnReturn; private boolean testWhileIdle; private long timeBetweenEvictionRuns; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j @Component public class SftpUtil {
@Autowired private SftpPool sftpPool;
private ChannelSftp sftp;
public SftpUtil() { }
public void showPwd() throws Exception { String pwd = getClient().pwd(); log.info("pwd:{}", pwd); if (null != sftp) { disConnected(); } } private ChannelSftp getClient() throws Exception { log.info("拿到sftp"); sftp = sftpPool.borrowObject(); return sftp; } public void disConnected() throws Exception { sftpPool.returnObject(sftp); log.info("归还sftp"); }
}
|
Junit 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @RunWith(SpringRunner.class) @SpringBootTest(classes = LearningApplication.class) @Slf4j public class SftpTest {
@Autowired private SftpPool sftpPool;
@Test public void testSftpConnection() throws Exception { SftpUtil sftpUtil = SpringUtil.getBean("sftpUtil"); log.info("================================================================"); isConnected(); for (int i = 0; i < 100; i++) { sftpUtil.showPwd(); isConnected(); } int numActive2 = sftpPool.getNumActive(); log.info("连接数量 numActive:{}", numActive2);
isConnected(); int numActive1 = sftpPool.getNumActive(); log.info("连接数量 numActive:{}", numActive1); log.info("================================================================"); } private void isConnected() { SftpUtil sftpUtil = SpringUtil.getBean("sftpUtil"); int numActive = sftpPool.getNumActive(); log.info("连接数量 numActive:{}", numActive); if (sftpUtil.isConnect()) { log.info("Sftp connection established"); } else { log.info("Sftp connection closed");
} }
}
|
测试结果
结果错乱,因为多线程没有加锁,不过能跑成功即可

Q:遇到的坑
多线程不能运行 @Autowired 中注入的Bean,需要 用SpringUitl 从容器中拿,再运行。
Junit 不能多线程运行bean ,我会的方式是不行的,需要增加配置,或者让Junit 等待。
Spring中的 注入问题,我有的用 @Autowired ,有的用构造方法注入,并不是随意的,因为@autowired 注入失败,才用构造方法的。失败原因,了解但是不深入 。
TODO
- 多线程使用 自定义sftp线程池 的优化。用 ThreadLocal
- 让 Junit 多线程执行 Bean
- 深入研究bean的 注入