Steps

  • 导入依赖
  • 重写Apache Commons Pools2 的 方法
  • 测试

导入依赖

我还用了 Hutool 的包,和SpringBoot的基础jar包

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
</dependencies>

重写Apache Commons Pools2 的 方法

所有的类

  1. 继承BasePooledObjectFactory 这个类,实现我们自己的sftp 工厂 (SftpPooledObjectFactory)

主要重写下面四个方法

  • create :创建我们需要的连接对象
  • warp :将我们创建的对象,包装成PooledObject对象
  • 另外两个顾名思义

总之这个 Factory 是 抽象工厂方法的实现,对 sftp 连接对象的生命周期进行管理

用了 ConfigurationProperties 去配置 sftp 连接的参数

  1. 继承ObjectPool对象

实现 连接池的配置,具体配置代码,还有一个拒绝策略

  1. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
	@Slf4j
@Component
public class SftpPooledObjectFactory extends BasePooledObjectFactory<ChannelSftp> {


public SftpPooledObjectFactory() {}

@Override
public ChannelSftp create() throws Exception {
return sftpCreate();
}

@Override
public PooledObject<ChannelSftp> wrap(ChannelSftp obj) {
return new DefaultPooledObject<>(obj);
}


@Override
public void destroyObject(PooledObject<ChannelSftp> p) throws Exception {
ChannelSftp object = p.getObject();
if (ObjectUtil.isNotNull(object)||(object.isConnected())) {
object.disconnect();
super.destroyObject(p);
}
}

@Override
public boolean validateObject(PooledObject<ChannelSftp> p) {
ChannelSftp object = p.getObject();
return object.isConnected();
}

private ChannelSftp sftpCreate() throws JSchException {
SftpContractProperties properties = SpringUtil.getBean("sftpContractProperties");
JSch jsch = new JSch();
Session session = jsch.getSession(properties.getUsername(), properties.getHostname(), properties.getPort());
session.setPassword(properties.getPassword());

Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
int sftpTimeout = 5000;
session.setTimeout(sftpTimeout);
session.connect();
log.debug("sftp session connected");

log.debug("opening channel");
ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
channel.connect();
log.debug("connected successfully");
return channel;

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Slf4j
@Component
public class SftpPool implements ObjectPool<ChannelSftp> {
private GenericObjectPool<ChannelSftp> sftpConnectPool;

private final SftpPooledObjectFactory factory;


@Autowired
public SftpPool(SftpPooledObjectFactory factory) {
this.factory = factory;

AbandonedConfig abandonedConfig = new AbandonedConfig();
abandonedConfig.setRemoveAbandonedOnMaintenance(true);
abandonedConfig.setRemoveAbandonedOnBorrow(true);
this.sftpConnectPool = new GenericObjectPool<>(factory, getPoolConfiguration(), abandonedConfig);
}


@Override
public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
sftpConnectPool.addObject();
}

@Override
public ChannelSftp borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
return sftpConnectPool.borrowObject();
}


@Override
public void clear() throws Exception, UnsupportedOperationException {
sftpConnectPool.clear();
}

@Override
public void close() {
sftpConnectPool.close();
}

@Override
public int getNumActive() {
return sftpConnectPool.getNumActive();
}

@Override
public int getNumIdle() {
return sftpConnectPool.getNumIdle();
}

@Override
public void invalidateObject(ChannelSftp obj) throws Exception {
sftpConnectPool.invalidateObject(obj);
}

@Override
public void returnObject(ChannelSftp obj) throws Exception {
sftpConnectPool.returnObject(obj);
}

private GenericObjectPoolConfig<ChannelSftp> getPoolConfiguration() {

SftpPoolProperties poolProperties = SpringUtil.getBean("sftpPoolProperties");

GenericObjectPoolConfig<ChannelSftp> config = new GenericObjectPoolConfig<>();
config.setMinIdle(poolProperties.getMinIdle());
config.setMaxIdle(poolProperties.getMaxIdle());
config.setMaxTotal(poolProperties.getMaxActive());
config.setMaxWait(Duration.ofMillis(poolProperties.getMaxWait()));
config.setTestOnBorrow(poolProperties.isTestOnBorrow());
config.setTestOnReturn(poolProperties.isTestOnReturn());
config.setTestWhileIdle(poolProperties.isTestWhileIdle());

config.setEvictionPolicyClassName(SftpEvictionPolicy.class.getName());


return config;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component("sftpEvictionPolicy")
public class SftpEvictionPolicy implements EvictionPolicy<ChannelSftp> {
@Override
public boolean evict(EvictionConfig config, PooledObject<ChannelSftp> underTest, int idleCount) {
try {
// 连接失效时进行驱逐
if (!underTest.getObject().isConnected()) {
log.warn("connect time out, evict the connection. time={}",System.currentTimeMillis() - underTest.getLastReturnTime());
return true;
}
}catch (Exception e){
return true;
}
return false;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
@Data
@ConfigurationProperties(prefix = SftpContractProperties.SFTP_PREFIX)
@Component
public class SftpContractProperties {
final static String SFTP_PREFIX = "zxh.sftp";

private String username;
private String password;
private String hostname;
private int port;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ConfigurationProperties(SftpPoolProperties.POOL_CONFIG_PREFIX)
@Component
@Data
public class SftpPoolProperties {
public static final String POOL_CONFIG_PREFIX = "zxh.sftp.pool";

private int minIdle;
private int maxIdle;
private int maxActive;
private long maxWait;
private boolean testOnBorrow;
private boolean testOnReturn;
private boolean testWhileIdle;
private long timeBetweenEvictionRuns;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Component
public class SftpUtil {

@Autowired
private SftpPool sftpPool;

private ChannelSftp sftp;

public SftpUtil() {
}

public void showPwd() throws Exception {
String pwd = getClient().pwd();
log.info("pwd:{}", pwd);
if (null != sftp) {
disConnected();
}
}
private ChannelSftp getClient() throws Exception {
log.info("拿到sftp");
sftp = sftpPool.borrowObject();
return sftp;
}
public void disConnected() throws Exception {
sftpPool.returnObject(sftp);
log.info("归还sftp");
}


}

Junit 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RunWith(SpringRunner.class)
@SpringBootTest(classes = LearningApplication.class)
@Slf4j
public class SftpTest {


@Autowired
private SftpPool sftpPool;

@Test
public void testSftpConnection() throws Exception {
SftpUtil sftpUtil = SpringUtil.getBean("sftpUtil");
log.info("================================================================");
isConnected();
for (int i = 0; i < 100; i++) {
sftpUtil.showPwd();
isConnected();
}
int numActive2 = sftpPool.getNumActive();
log.info("连接数量 numActive:{}", numActive2);
// sftpUtil.returnObject();
isConnected();
int numActive1 = sftpPool.getNumActive();
log.info("连接数量 numActive:{}", numActive1);
log.info("================================================================");

}
private void isConnected() {
SftpUtil sftpUtil = SpringUtil.getBean("sftpUtil");
int numActive = sftpPool.getNumActive();
log.info("连接数量 numActive:{}", numActive);
if (sftpUtil.isConnect()) {
log.info("Sftp connection established");
} else {
log.info("Sftp connection closed");

}
}

}

测试结果

结果错乱,因为多线程没有加锁,不过能跑成功即可

Q:遇到的坑

  • 多线程不能运行 @Autowired 中注入的Bean,需要 用SpringUitl 从容器中拿,再运行。

  • Junit 不能多线程运行bean ,我会的方式是不行的,需要增加配置,或者让Junit 等待。

  • Spring中的 注入问题,我有的用 @Autowired ,有的用构造方法注入,并不是随意的,因为@autowired 注入失败,才用构造方法的。失败原因,了解但是不深入 。

TODO

  1. 多线程使用 自定义sftp线程池 的优化。用 ThreadLocal
  2. 让 Junit 多线程执行 Bean
  3. 深入研究bean的 注入