首页 > 开发 > Java > 正文

Spring Boot整合FTPClient线程池的实现示例

2019-10-21 18:42:26
字体:
来源:转载
供稿:网友

最近在写一个FTP上传工具,用到了Apache的FTPClient,但是每个线程频繁的创建和销毁FTPClient对象对服务器的压力很大,因此,此处最好使用一个FTPClient连接池。仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

我们可以利用Apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的ObjectPool和PoolableObjectFactory两个接口即可。

线程池的意义

为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

pom引入依赖

 <!-- FtpClient依赖包-->    <dependency>      <groupId>commons-net</groupId>      <artifactId>commons-net</artifactId>      <version>3.5</version>    </dependency>    <!-- 线程池-->    <dependency>      <groupId>commons-pool</groupId>      <artifactId>commons-pool</artifactId>      <version>1.6</version>    </dependency>    <dependency>      <groupId>org.apache.commons</groupId>      <artifactId>commons-pool2</artifactId>      <version>2.0</version>    </dependency>

创建ftp配置信息

在resources目录下创建ftp.properties配置文件,目录结构如下:

Spring,Boot,FTPClient,线程池

添加如下的配置信息:

########### FTP用户名称 ###########ftp.userName=hrabbit########### FTP用户密码 ###########ftp.passWord=123456########### FTP主机IP ###########ftp.host=127.0.0.1########### FTP主机端口号 ###########ftp.port=21########### 保存根路径 ###########ftp.baseUrl=/

创建FTPProperties.java配置文件

加载配置内容到Spring中,配置信息基本延用我的就可以。

/** * FTP的配置信息 * @Auther: hrabbit * @Date: 2018-12-03 2:06 PM * @Description: */@Data@Component@PropertySource("classpath:ftp.properties")@ConfigurationProperties(prefix = "ftp")public class FTPProperties {  private String username;  private String password;  private String host;  private Integer port;  private String baseUrl;  private Integer passiveMode = FTP.BINARY_FILE_TYPE;  private String encoding="UTF-8";  private int clientTimeout=120000;  private int bufferSize;  private int transferFileType=FTP.BINARY_FILE_TYPE;  private boolean renameUploaded;  private int retryTime;}

创建FTPClientPool线程池

/** * 自定义实现ftp连接池 * @Auther: hrabbit * @Date: 2018-12-03 3:40 PM * @Description: */@Slf4j@SuppressWarnings("all")public class FTPClientPool implements ObjectPool<FTPClient> {  private static final int DEFAULT_POOL_SIZE = 10;  public BlockingQueue<FTPClient> blockingQueue;  private FTPClientFactory factory;  public FTPClientPool(FTPClientFactory factory) throws Exception {    this(DEFAULT_POOL_SIZE, factory);  }  public FTPClientPool(int poolSize, FTPClientFactory factory) throws Exception {    this.factory = factory;    this.blockingQueue = new ArrayBlockingQueue<FTPClient>(poolSize);    initPool(poolSize);  }  /**   * 初始化连接池   * @param maxPoolSize   *         最大连接数   * @throws Exception   */  private void initPool(int maxPoolSize) throws Exception {    int count = 0;    while(count < maxPoolSize) {      this.addObject();      count++;    }  }  /**   * 从连接池中获取对象   */  @Override  public FTPClient borrowObject() throws Exception {    FTPClient client = blockingQueue.take();    if(client == null) {      client = factory.makeObject();    } else if(!factory.validateObject(client)) {      invalidateObject(client);      client = factory.makeObject();    }    return client;  }  /**   * 返还一个对象(链接)   */  @Override  public void returnObject(FTPClient client) throws Exception {    if ((client != null) && !blockingQueue.offer(client,2,TimeUnit.MINUTES)) {      try {        factory.destroyObject(client);      } catch (Exception e) {        throw e;      }    }  }  /**   * 移除无效的对象(FTP客户端)   */  @Override  public void invalidateObject(FTPClient client) throws Exception {    blockingQueue.remove(client);  }  /**   * 增加一个新的链接,超时失效   */  @Override  public void addObject() throws Exception {    blockingQueue.offer(factory.makeObject(), 2, TimeUnit.MINUTES);  }  /**   * 重新连接   */  public FTPClient reconnect() throws Exception {    return factory.makeObject();  }  /**   * 获取空闲链接数(这里暂不实现)   */  @Override  public int getNumIdle() {    return blockingQueue.size();  }  /**   * 获取正在被使用的链接数   */  @Override  public int getNumActive() {    return DEFAULT_POOL_SIZE - getNumIdle();  }  @Override  public void clear() throws Exception {  }  /**   * 关闭连接池   */  @Override  public void close() {    try {      while(blockingQueue.iterator().hasNext()) {        FTPClient client = blockingQueue.take();        factory.destroyObject(client);      }    } catch(Exception e) {      log.error("close ftp client pool failed...{}", e);    }  }  /**   * 增加一个新的链接,超时失效   */  public void addObject(FTPClient ftpClient) throws Exception {    blockingQueue.put(ftpClient);  }}

创建一个FTPClientFactory工厂类

创建FTPClientFactory实现PoolableObjectFactory的接口,FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁

/** * FTPClient 工厂 * @Auther: hrabbit * @Date: 2018-12-03 3:41 PM * @Description: */@Slf4j@SuppressWarnings("all")public class FTPClientFactory implements PoolableObjectFactory<FTPClient> {  private FTPProperties ftpProperties;  public FTPClientFactory(FTPProperties ftpProperties) {    this.ftpProperties = ftpProperties;  }  @Override  public FTPClient makeObject() throws Exception {    FTPClient ftpClient = new FTPClient();    ftpClient.setControlEncoding(ftpProperties.getEncoding());    ftpClient.setConnectTimeout(ftpProperties.getClientTimeout());    try {      ftpClient.connect(ftpProperties.getHost(), ftpProperties.getPort());      int reply = ftpClient.getReplyCode();      if (!FTPReply.isPositiveCompletion(reply)) {        ftpClient.disconnect();        log.warn("FTPServer refused connection");        return null;      }      boolean result = ftpClient.login(ftpProperties.getUsername(), ftpProperties.getPassword());      ftpClient.setFileType(ftpProperties.getTransferFileType());      if (!result) {        log.warn("ftpClient login failed... username is {}", ftpProperties.getUsername());      }    } catch (Exception e) {      log.error("create ftp connection failed...{}", e);      throw e;    }    return ftpClient;  }  @Override  public void destroyObject(FTPClient ftpClient) throws Exception {    try {      if(ftpClient != null && ftpClient.isConnected()) {        ftpClient.logout();      }    } catch (Exception e) {      log.error("ftp client logout failed...{}", e);      throw e;    } finally {      if(ftpClient != null) {        ftpClient.disconnect();      }    }  }  @Override  public boolean validateObject(FTPClient ftpClient) {    try {      return ftpClient.sendNoOp();    } catch (Exception e) {      log.error("Failed to validate client: {}");    }    return false;  }  @Override  public void activateObject(FTPClient obj) throws Exception {    //Do nothing  }  @Override  public void passivateObject(FTPClient obj) throws Exception {    //Do nothing  }}

创建FTPUtils.java的工具类

FTPUtils.java中封装了上传、下载等方法,在项目启动的时候,在@PostConstruct注解的作用下通过执行init()的方法,创建FTPClientFactory工厂中,并初始化了FTPClientPool线程池,这样每次调用方法的时候,都直接从FTPClientPool中取出一个FTPClient对象

/** * @Auther: hrabbit * @Date: 2018-12-03 3:47 PM * @Description: */@Slf4j@Componentpublic class FTPUtils {  /**   * FTP的连接池   */  @Autowired  public static FTPClientPool ftpClientPool;  /**   * FTPClient对象   */  public static FTPClient ftpClient;  private static FTPUtils ftpUtils;  @Autowired  private FTPProperties ftpProperties;  /**   * 初始化设置   * @return   */  @PostConstruct  public boolean init() {    FTPClientFactory factory = new FTPClientFactory(ftpProperties);    ftpUtils = this;    try {      ftpClientPool = new FTPClientPool(factory);    } catch (Exception e) {      e.printStackTrace();      return false;    }    return true;  }  /**   * 获取连接对象   * @return   * @throws Exception   */  public static FTPClient getFTPClient() throws Exception {    //初始化的时候从队列中取出一个连接    if (ftpClient==null) {      synchronized (ftpClientPool) {        ftpClient = ftpClientPool.borrowObject();      }    }    return ftpClient;  }  /**   * 当前命令执行完成命令完成   * @throws IOException   */  public void complete() throws IOException {    ftpClient.completePendingCommand();  }  /**   * 当前线程任务处理完成,加入到队列的最后   * @return   */  public void disconnect() throws Exception {    ftpClientPool.addObject(ftpClient);  }  /**   * Description: 向FTP服务器上传文件   *   * @Version1.0   * @param remoteFile   *      上传到FTP服务器上的文件名   * @param input   *      本地文件流   * @return 成功返回true,否则返回false   */  public static boolean uploadFile(String remoteFile, InputStream input) {    boolean result = false;    try {      getFTPClient();      ftpClient.enterLocalPassiveMode();      result = ftpClient.storeFile(remoteFile, input);      input.close();      ftpClient.disconnect();    } catch (Exception e) {      e.printStackTrace();    }    return result;  }  /**   * Description: 向FTP服务器上传文件   *   * @Version1.0   * @param remoteFile   *      上传到FTP服务器上的文件名   * @param localFile   *      本地文件   * @return 成功返回true,否则返回false   */  public static boolean uploadFile(String remoteFile, String localFile){    FileInputStream input = null;    try {      input = new FileInputStream(new File(localFile));    } catch (FileNotFoundException e) {      e.printStackTrace();    }    return uploadFile(remoteFile, input);  }  /**   * 拷贝文件   * @param fromFile   * @param toFile   * @return   * @throws Exception   */  public boolean copyFile(String fromFile, String toFile) throws Exception {    InputStream in=getFileInputStream(fromFile);    getFTPClient();    boolean flag = ftpClient.storeFile(toFile, in);    in.close();    return flag;  }  /**   * 获取文件输入流   * @param fileName   * @return   * @throws IOException   */  public static InputStream getFileInputStream(String fileName) throws Exception {    ByteArrayOutputStream fos=new ByteArrayOutputStream();    getFTPClient();    ftpClient.retrieveFile(fileName, fos);    ByteArrayInputStream in=new ByteArrayInputStream(fos.toByteArray());    fos.close();    return in;  }  /**   * Description: 从FTP服务器下载文件   *   * @Version1.0   * @return   */  public static boolean downFile(String remoteFile, String localFile){    boolean result = false;    try {      getFTPClient();      OutputStream os = new FileOutputStream(localFile);      ftpClient.retrieveFile(remoteFile, os);      ftpClient.logout();      ftpClient.disconnect();      result = true;    } catch (Exception e) {      e.printStackTrace();    } finally {      try {      } catch (Exception e) {        e.printStackTrace();      }    }    return result;  }  /**   * 从ftp中获取文件流   * @param filePath   * @return   * @throws Exception   */  public static InputStream getInputStream(String filePath) throws Exception {    getFTPClient();    InputStream inputStream = ftpClient.retrieveFileStream(filePath);    return inputStream;  }  /**   * ftp中文件重命名   * @param fromFile   * @param toFile   * @return   * @throws Exception   */  public boolean rename(String fromFile,String toFile) throws Exception {    getFTPClient();    boolean result = ftpClient.rename(fromFile,toFile);    return result;  }  /**   * 获取ftp目录下的所有文件   * @param dir   * @return   */  public FTPFile[] getFiles(String dir) throws Exception {    getFTPClient();    FTPFile[] files = new FTPFile[0];    try {      files = ftpClient.listFiles(dir);    }catch (Throwable thr){      thr.printStackTrace();    }    return files;  }  /**   * 获取ftp目录下的某种类型的文件   * @param dir   * @param filter   * @return   */  public FTPFile[] getFiles(String dir, FTPFileFilter filter) throws Exception {    getFTPClient();    FTPFile[] files = new FTPFile[0];    try {      files = ftpClient.listFiles(dir, filter);    }catch (Throwable thr){      thr.printStackTrace();    }    return files;  }  /**   * 创建文件夹   * @param remoteDir   * @return 如果已经有这个文件夹返回false   */  public boolean makeDirectory(String remoteDir) throws Exception {    getFTPClient();    boolean result = false;    try {      result = ftpClient.makeDirectory(remoteDir);    } catch (IOException e) {      e.printStackTrace();    }    return result;  }  public boolean mkdirs(String dir) throws Exception {    boolean result = false;    if (null == dir) {      return result;    }    getFTPClient();    ftpClient.changeWorkingDirectory("/");    StringTokenizer dirs = new StringTokenizer(dir, "/");    String temp = null;    while (dirs.hasMoreElements()) {      temp = dirs.nextElement().toString();      //创建目录      ftpClient.makeDirectory(temp);      //进入目录      ftpClient.changeWorkingDirectory(temp);      result = true;    }    ftpClient.changeWorkingDirectory("/");    return result;  }}

创建FtpClientTest.java测试类

上传一张图片到FTP服务器,并将文件重新命名为hrabbit.jpg,代码如下:

/** * FtpClient测试 * @Auther: hrabbit * @Date: 2018-12-21 9:14 PM * @Description: */@RunWith(SpringRunner.class)@SpringBootTestpublic class FtpClientTest {  /**   * 测试上传   */  @Test  public void uploadFile(){    boolean flag = FTPUtils.uploadFile("hrabbit.jpg", "/Users/mrotaku/Downloads/klklklkl_4x.jpg");    Assert.assertEquals(true, flag);  }}

程序完美运行,这时候我们查看我们的FTP服务器,http://localhost:8866/hrabbit.jpg

Spring,Boot,FTPClient,线程池

码云地址:https://gitee.com/hrabbit/hrabbit-admin

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持CuoXin错新网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表