爬虫实战 - 使用线程池抓取

本贴最后更新于 1945 天前,其中的信息可能已经斗转星移

前言

以前在写爬虫的时候,要么是直接使用 HTTPClient 循环去抓取,这样有个弊端,就是人物是阻塞的,后一个任务必须等待上一个任务结束。后面使用了 Thread 线程去执行任务,有多少个任务就开多少条线程,但是频繁线程的创建与销毁又造成了不必要的资源浪费。于是我们用到了线程池,同时使用到 Guava 的并发类 ListenableFuture。

单线程

直接开一条线程去爬虫

new Thread(() -> {
  //爬虫任务
});

多线程

当然想爬多个页面也可以使用这样的方式开多条线程。。。

while (条件) {
  new Thread(() -> {
  //爬虫任务
  });
}

那你就 out 太多了,new Thread 的弊端如下:

a. 每次 new Thread 新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或 oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。

相比 new Thread,Java 提供的四种线程池的好处在于:

a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。

线程池

无执行结果

无返回结果是实现了 Runable 接口,重写的 run 方法返回值为 void,线程执行完毕无返回结果。同时线程池 exeute 方法也是无返回值的。

//创建条三个线程的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//提交3个任务
for (int i = 0; i < 3; i++) {
  fixedThreadPool.execute(new Thread(() -> {
  //爬虫任务
  })
	);
}

有执行结果

线程实现 Calable 接口,重写 cal 方法,并且在类上定义返回值泛型。将结果再 call 方法中返回即可。同时使用线程池有返回值的 Submit 方法。

public static void main(String[] args) throws ExecutionException, InterruptedException {
	//创建条三个线程的线程池
	ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
	//提交3个任务
	for (int i = 0; i < 3; i++) {
	//无返回值
	fixedThreadPool.execute(new Thread(() -> {
	//爬虫任务
	})
	);
	//有返回值
	Future<String> future = fixedThreadPool.submit(new Crawler());
	System.out.println(future.get());
	}
}

static class Crawler implements Callable<String> {

  @Override
  public String call() {
  //执行爬虫任务
  return "成功";
  }
}

输出

上面的线程池创建方式是有很多弊端的,一般使用都会根据自己的场景配置线程池。

这里就不细写了,网上很多文章讲都非常不错。下面贴两篇

Java 并发编程:线程池的使用

Java 线程池

Guava 线程池实战

google Guava 包的 ListenableFuture 解析

接口

传统 JDK 中的 Future 通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。

ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。

ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。

添加回调(Callbacks)

多数用户喜欢使用 Futures.addCallback(ListenableFuture, FutureCallback, Executor)的方式, 或者 另外一个版本 version(译者注:addCallback(ListenableFuture future,FutureCallback callback)),默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback 采用轻量级的设计. FutureCallback 中实现了两个方法:

  • onSuccess(V),在 Future 成功的时候执行,根据 Future 结果来判断。
  • onFailure(Throwable), 在 Future 失败的时候执行,根据 Future 结果来判断。

爬虫妹子图全站图片

public class meizitu {

  //保存路径
  static String path = "E:\\\\meizitu\\\\";
  //图片地址
  static String page = "http://meizitu.com/a/more_%d.html";
  //起始页
  static int pageCouont = 1;
  //当前可用CPU数
  static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
  //线程名
  static ThreadFactory threadName = new ThreadFactoryBuilder().setNameFormat("当前线程-%d").build();
  /**
 * 合理配置线程池核心数:
  *  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
 *  如果是IO密集型任务,参考值可以设置为2*NCPU || 3*NCPU
 * 
  * 最大线程池为CPU*5
 * 
  * 最大线程池-核心线程的线程,空闲200毫秒,没有任务则收回。
  * 
  * 工作队列最大1000任务
  * 
  * 超过则采用丢弃策略:任务并抛出RejectedExecutionException异常。
  * 
  * 线程池基本知识:https://www.cnblogs.com/dolphin0520/p/3932921.html
 */  static ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESSORS * 3, PROCESSORS * 5, 200L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), threadName, new ThreadPoolExecutor.AbortPolicy());
  //guava并发
  static ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);

  public static void main(String[] args) throws IOException {
  while (true) {
  //初始化httpClient
  CloseableHttpClient httpClient = HttpClients.createDefault();
  //格式化请求的图片页面地址
  String url = String.format(page, pageCouont);
  //请求地址
  CloseableHttpResponse httpResponse = httpClient.execute(new HttpGet(url));
  //如果该页不是404则表示页面有数据
  if (404 == httpResponse.getStatusLine().getStatusCode()) {
  break;
  }
  String html = EntityUtils.toString(httpResponse.getEntity());
  //解析每个图片的URL
  Elements item = Jsoup.parse(html).getElementsByClass("wp-item");
  for (Element element : item) {
  String href = element.getElementsByTag("a").attr("href");
  //任务结果回调 把每个图片的URL提交到线程池
  Futures.addCallback(guavaExecutor.submit(new Crawler(href, pageCouont)), new FutureCallback<String>() {
  @Override
  public void onSuccess(@Nullable String result) {
  //任务执行成功
  System.out.println(result);
  }

  @Override
  public void onFailure(Throwable t) {
  //执行失败
  System.out.println(t.getMessage());
  }
 }, executor);
  }
  pageCouont++;
  }
  System.out.println("任务页数:" + pageCouont);
  }

  static class Crawler implements Callable<String> {
  private String url;
  private int pageCount;

  public Crawler(String url, int pageCount) {
  this.url = url;
  this.pageCount = pageCount;
  }

  @Override
  public String call() {
  FileOutputStream output = null;
  ArrayList<String> list = Lists.newArrayList();
  try {
  /**
 * 以下是分析图片地址并下载代码
  */
  String body = HttpRequest.get(this.url).body();
  Document parse = Jsoup.parse(body);
  String title = parse.title();
  Element picture = parse.getElementById("picture");
  Elements imgs = picture.getElementsByTag("img");
  for (Element e : imgs) {
  String src = e.attr("src");
  String alt = e.attr("alt");
  byte[] bytes = HttpRequest.get(src).bytes();
  //本来是打算获取到文章标题和图片的介绍保存为图片名字的,获取到的页面是乱码,暂时未解决。
  //String imgPath = path.concat("第" + this.pageCount + "页").concat("\\").concat(title).concat("\\").concat(alt);
  String fileDirs = path.concat("第" + this.pageCount + "页");
  String imgPath = fileDirs.concat("\\\\").concat(System.currentTimeMillis() + ".jpg");
  File file = new File(imgPath);
  if (!new File(fileDirs).exists()) {
  file.mkdirs();
  }
  File storeFile = new File(imgPath);
  output = new FileOutputStream(storeFile);
  //得到网络资源的字节数组,并写入文件
  output.write(bytes);
  list.add(imgPath);
  String s = "CPU数:" + PROCESSORS + ", 当前线程:" + Thread.currentThread().getName() + ", 线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
  executor.getQueue().size() + ",已执行任务数目:" + executor.getCompletedTaskCount() + ",请求页面:" + url + ",地址:" + imgPath;
  System.out.println(s);
  }
  return "任务执行成功:" + url + ":" + JSON.toJSONString(list);
  } catch (IOException e) {
  throw new RuntimeException("任务异常:" + this.url);
  } finally {
  if (Objects.equals(output, null)) {
  try {
  output.close();
  } catch (IOException e) {
  throw new RuntimeException("任务异常:" + this.url);
  }
 } }
 } }
}

结果

这里设置的工作队列有点小了,差点满了。最大 36 线程在执行,满了会抛异常,一定要合理设置该大小!!!不然任务会丢弃

  • 线程
    120 引用 • 111 回帖 • 3 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3165 引用 • 8206 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • 88250

    皮?让你盗链....

  • nopsad

  • scmod

    皮一下很开心..😄

  • someone

    哦哦不好意思,我当时防盗链没有把主站加入白名单里面!现在加进去

  • someone

    请问 HttpRequest.get(this.url).body()这句,HttpRequest 引用的是哪个包的?