|
| 1 | +package com.kttdevelopment.simplehttpserver.handler; |
| 2 | + |
| 3 | +import com.kttdevelopment.simplehttpserver.SimpleHttpExchange; |
| 4 | +import com.kttdevelopment.simplehttpserver.SimpleHttpHandler; |
| 5 | +import com.kttdevelopment.simplehttpserver.var.HttpCode; |
| 6 | +import com.kttdevelopment.simplehttpserver.var.RequestMethod; |
| 7 | + |
| 8 | +import java.io.IOException; |
| 9 | +import java.io.OutputStream; |
| 10 | +import java.nio.charset.StandardCharsets; |
| 11 | +import java.util.*; |
| 12 | +import java.util.concurrent.TimeUnit; |
| 13 | +import java.util.concurrent.atomic.AtomicInteger; |
| 14 | + |
| 15 | +public class SSEHandler extends SimpleHttpHandler { |
| 16 | + |
| 17 | + private final List<OutputStream> listeners = new ArrayList<>(); |
| 18 | + private final AtomicInteger eventId = new AtomicInteger(-1); |
| 19 | + private final LinkedList<EventStreamRecord> queue = new LinkedList<>(); |
| 20 | + |
| 21 | + @Override |
| 22 | + public final void handle(final SimpleHttpExchange exchange) throws IOException{ |
| 23 | + exchange.getResponseHeaders().add("Access-Control-Allow-Headers", "Content-Type"); |
| 24 | + if(exchange.getRequestHeaders().getFirst("origin") != null) |
| 25 | + exchange.getResponseHeaders().add("Access-Control-Allow-Origin", exchange.getRequestHeaders().getFirst("origin")); |
| 26 | + exchange.getResponseHeaders().add("Access-Control-Allow-Methods","GET, HEAD, POST, PUT, DELETE"); |
| 27 | + exchange.getResponseHeaders().add("Access-Control-Max-Age", String.valueOf(TimeUnit.HOURS.toSeconds(1))); |
| 28 | + |
| 29 | + if(exchange.getRequestMethod() == RequestMethod.OPTIONS){ |
| 30 | + exchange.sendResponseHeaders(HttpCode.HTTP_OK,0); |
| 31 | + return; |
| 32 | + } |
| 33 | + |
| 34 | + exchange.getResponseHeaders().put("content-type", Collections.singletonList("text/event-stream")); |
| 35 | + |
| 36 | + int latest = 0; |
| 37 | + try{ |
| 38 | + latest = Integer.parseInt(exchange.getRequestHeaders().getFirst("Last_Event-ID")); |
| 39 | + }catch(final NumberFormatException | NullPointerException ignored){ } |
| 40 | + |
| 41 | + exchange.sendResponseHeaders(200,0); |
| 42 | + for(int index = latest; index < queue.size(); index++){ |
| 43 | + exchange.getOutputStream().write(queue.get(index).toString(eventId.get()).getBytes(StandardCharsets.UTF_8)); |
| 44 | + exchange.getOutputStream().flush(); |
| 45 | + } |
| 46 | + |
| 47 | + listeners.add(exchange.getOutputStream()); |
| 48 | + } |
| 49 | + |
| 50 | + public synchronized final void push(final String data){ |
| 51 | + push(data,0,""); |
| 52 | + } |
| 53 | + |
| 54 | + public synchronized final void push(final String data, final int retry, final String event){ |
| 55 | + eventId.addAndGet(1); |
| 56 | + final EventStreamRecord record = new EventStreamRecord(retry,event,data); |
| 57 | + queue.add(record); |
| 58 | + listeners.forEach(stream -> { |
| 59 | + try{ |
| 60 | + stream.write(record.toString(eventId.get()).getBytes(StandardCharsets.UTF_8)); |
| 61 | + stream.flush(); |
| 62 | + }catch(final IOException ignored){ } |
| 63 | + }); |
| 64 | + } |
| 65 | + |
| 66 | + private static class EventStreamRecord { |
| 67 | + |
| 68 | + private final int retry; |
| 69 | + private final String event; |
| 70 | + private final String data; |
| 71 | + |
| 72 | + public EventStreamRecord(final int retry, final String event, final String data){ |
| 73 | + this.retry = retry; |
| 74 | + this.event = event; |
| 75 | + this.data = data; |
| 76 | + } |
| 77 | + |
| 78 | + public final String toString(final int id){ |
| 79 | + return |
| 80 | + "id: " + id + '\n' + |
| 81 | + (retry > 0 ? "retry: " + retry + '\n' : "") + |
| 82 | + (!event.isBlank() ? "event: " + event + '\n' : "") + |
| 83 | + (!data.isBlank() ? "data: " + data + '\n' : "") + |
| 84 | + '\n'; |
| 85 | + } |
| 86 | + |
| 87 | + } |
| 88 | + |
| 89 | +} |
0 commit comments