asynchronous - niubods/playframework-notes GitHub Wiki

用HTTP进行异步编程

这一章阐述了如何在一个Play应用中处理异步问题,从而实现典型的长轮询、流还有其他可以负担千人同时在线的 长连接类型 应用。

挂起HTTP请求

Play旨在处理非常短的请求。它使用一个固定的线程池来处理由HTTP连接器列队的请求。为了得到最优的结果,线程池应该尽可能小。我们一般使用 number of processors + 1 这个优化值来设置缺省线程池的大小。

也就是说如果一个请求的处理时间非常长(例如在等待一个很耗时的计算),它会阻塞线程池并严重影响你应用的响应能力。当然你可以对线程池扩容,但这会导致资源浪费,而且无论如何线程池也不可能是无限大的。

假如有一个聊天应用,浏览器发出了一个阻塞性的HTTP请求,等待一个要展示的的新消息。这些请求可能会非常非常的长(一般需要好几秒钟)而且会阻塞线程池。如果你计划允许100个用户同时连接你的聊天应用,你需要至少提供100个线程。好吧,这是可行的。但是如果是1000个用户呢?10000个呢?

为了处理这类情况,Play允许你暂时挂起一个请求。HTTP请求会保持连接,但是请求的执行会弹出线程池并稍后再试。你既可以告诉Play在一个固定的时间延迟后尝试处理这个请求,也可以等到 Promise 值可用的时候。

小贴士 。你可以在 samples-and-tests/chat 中看到真实的例子。

例如,这个action会启动一个非常长的作业并等待HTTP响应返回结果。

public static void generatePDF(Long reportId) {
    Promise<InputStream> pdf = new ReportAsPDFJob(report).now();
    InputStream pdfStream = await(pdf);
    renderBinary(pdfStream);
}

这里我们使用了 await(…) 来要求Play挂起请求直到 Promise<InputStream> 值返回。

延续(Continuations)

因为框架需要收回你正在使用的线程来服务其他请求,所以它需要挂起你的代码。在以前版本的Play中 await(...) 等价于 waitFor(...), 它会挂起你的action,过一会再从头调用它。

为了可以更容易地处理异步代码,我们引入了延续。延续可以让你的代码透明地挂起和恢复。如此一来你可以以非常指令化的方式写代码,比如:

public static void computeSomething() {
    Promise<String> delayedResult = veryLongComputation(…);
    String result = await(delayedResult);
    render(result);
}

事实上在这里,你的代码会在两个不同的线程中分两步执行。但是正如你所看到的,这对你的应用代码来说是透明的。

Using await(…) and continuations, you could write a loop:

使用 await(…) 和延续,你可以写一个循环:

public static void loopWithoutBlocking() {
    for(int i=0; i<=10; i++) { 
         Logger.info(i);
         await("1s");
    }
    renderText("Loop finished");
}

当只使用单线程来处理请求时,这也是开发模式下的默认设置,Play可以同时并发地针对好几个请求执行这些循环。

一个更加现实的例子是异步地从远程URL中提取内容。下面的例子并行执行了三个远程请求:每一个都调用 play.libs.WS.WSRequest.getAsync() 方法来异步执行一个GET请求并返回一个 play.libs.F.Promise 。控制器方法联合使用三个 Promise 实例,通过调用 await(…) 挂起收到的HTTP请求。当三个远程调用都收到响应后,会恢复一个线程来处理和渲染响应内容。

public class AsyncTest extends Controller {
 
  public static void remoteData() {
    F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
    F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
    F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
  
    F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
 
    // Suspend processing here, until all three remote calls are complete.
    List<WS.HttpResponse> httpResponses = await(promises);
 
    render(httpResponses);
  }
}

回调函数

A different way to implement the previous example of three asynchronous remote requests is to use a callback. This time, the call to await(…) includes a play.libs.F.Action implementation, which is a callback that is executed when the promises are done.

另一种实现上面那种三个异步远程请求的方法是使用回调函数。这一次,对 await(…) 的调用中包含了一个 play.libs.F.Action 的实现类,这是一个当 promises 完成时就会执行的的回调函数。

public class AsyncTest extends Controller {
 
  public static void remoteData() {
    F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
    F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
    F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
 
    F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
 
    // Suspend processing here, until all three remote calls are complete.
    await(promises, new F.Action<List<WS.HttpResponse>>() {
      public void invoke(List<WS.HttpResponse> httpResponses) {
        render(httpResponses);
      }
    });
  }
}

HTTP响应流

目前你已经会执行一个不会阻塞请求的循环了,下面你可能希望当得到部分结果的时候就立马发送数据给浏览器。这正是 Content-type:Chunked HTTP响应类型的要点。它允许你用多个块分几次发送你的HTTP响应。浏览器在他们发布之后马上就能收到。

使用 await(…) 和延续,你可以通过这样做到:

public static void generateLargeCSV() {
    CSVGenerator generator = new CSVGenerator();
    response.contentType = "text/csv";
    while(generator.hasMoreData()) {
          String someCsvData = await(generator.nextDataChunk());
          response.writeChunk(someCsvData);
    }
}

Even if the CSV generation takes one hour, Play is able to simultaneously process several requests using a single thread, sending back the generated data to the client as soon as they are available.

就算CSV的生成花上一个小时,Play也可以用单线程同时处理好几个请求,当数据生成完毕后马上就能发回去。

使用WebSockets

WebSocket是一种在浏览器和你的应用间开启双向通信的方法。在浏览器端你要使用一个"ws://"url打开一个socket。

new Socket("ws://localhost:9000/helloSocket?name=Guillaume")

在Play端你要这样声明一条WS路由:

WS   /helloSocket            MyWebSocket.hello

MyWebSocket 是一个 @WebSocketController@。 一个WebSocket控制器和标准HTTP控制器类似,但是运用了一些不同的概念。

  • 它有一个request对象,但是没有response对象。
  • 它可以访问session但是只读的。
  • 他没有 renderArgsrouteArgs 和flash作用域。
  • 它只能从路由模式或请求字符串中读取参数。
  • 它有两条通信通道:inbound和outbound。

当客户端链接到 ws://localhost:9000/helloSocket socket时,Play会调用 MyWebSocket.hello 动作方法。当 MyWebSocket.hello 动作方法退出时,socket也就关闭了。

那么一个基本的socket例子应该是这样:

public class MyWebSocket extends WebSocketController {
 
    public static void hello(String name) {
        outbound.send("Hello %s!", name);
    }
}

当客户端连接到这个socket时,它会收到'Hello Guillaume'这条消息,之后Play就关闭了socket。

当然通常来说你不会马上就关闭socket。但是这个用 @await(…)@ 和延续也可以很容易办到。

举个例子,一个基本的Echo服务器:
public class MyWebSocket extends WebSocketController {
 
    public static void echo() {
        while(inbound.isOpen()) {
             WebSocketEvent e = await(inbound.nextEvent());
             if(e instanceof WebSocketFrame) {
                  WebSocketFrame frame = (WebSocketFrame)e;
                  if(!e.isBinary) {
                      if(frame.textData.equals("quit")) {
                          outbound.send("Bye!");
                          disconnect();
                      } else {
                          outbound.send("Echo: %s", frame.textData);
                      }
                  }
             }
             if(e instanceof WebSocketClose) {
                 Logger.info("Socket closed!");
             }
        }
    }
 
}

在上面的例子中,嵌套的’if’和’cast’不但冗长乏味而且容易出错。在这里Java真是烂透了。就连这种简单的情况它也不能简单地处理。而在更复杂的情况下,在你联合使用了好几个流,甚至还有多种事件类型的时候,它就变成了一场噩梦。

这也是在 Play的库 ’函数式编程’中我们为什么向Java引入基本模式匹配的原因。

那么我们可以把上面的echo的例子重写成这样:

public static void echo() {
    while(inbound.isOpen()) {
         WebSocketEvent e = await(inbound.nextEvent());
         
         for(String quit: TextFrame.and(Equals("quit")).match(e)) {
             outbound.send("Bye!");
             disconnect();
         }
 
         for(String msg: TextFrame.match(e)) {
             outbound.send("Echo: %s", frame.textData);
         }
         
         for(WebSocketClose closed: SocketClosed.match(e)) {
             Logger.info("Socket closed!");
         }
    }
}
⚠️ **GitHub.com Fallback** ⚠️