1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package de.softwareforge.testing.postgres.embedded;
16
17 import java.io.BufferedReader;
18 import java.io.Closeable;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.InputStreamReader;
22 import java.io.UncheckedIOException;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
28
29 import com.google.common.util.concurrent.ListeningExecutorService;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.ThreadFactoryBuilder;
32 import org.slf4j.Logger;
33
34
35
36
37
38
39 class ProcessOutputLogger implements Closeable {
40
41 private final Logger logger;
42 private final ListeningExecutorService executorService;
43
44 ProcessOutputLogger(final Logger errorLogger) {
45 this.logger = errorLogger;
46 this.executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
47 .setDaemon(true)
48 .setNameFormat("logger-thread-%d")
49 .build()));
50 }
51
52 @Override
53 public void close() {
54 this.executorService.shutdownNow();
55 }
56
57 StreamCapture captureStreamAsLog() {
58 return new StreamCapture(logger::debug);
59 }
60
61 StreamCapture captureStreamAsConsumer(Consumer<String> consumer) {
62 return new StreamCapture(consumer);
63 }
64
65
66 class StreamCapture implements BiConsumer<String, InputStream> {
67
68 private final Consumer<String> consumer;
69 private volatile Future<?> completionFuture = null;
70
71 private StreamCapture(Consumer<String> consumer) {
72 this.consumer = consumer;
73 }
74
75 @Override
76 public void accept(String name, InputStream inputStream) {
77 this.completionFuture = executorService.submit(new LogRunnable(name, inputStream, consumer));
78 }
79
80 public Future<?> getCompletion() {
81 return completionFuture;
82 }
83 }
84
85 private class LogRunnable implements Runnable {
86
87 private final InputStream inputStream;
88 private final String name;
89 private final Consumer<String> consumer;
90
91 private LogRunnable(String name, InputStream inputStream, Consumer<String> consumer) {
92 this.name = name;
93 this.inputStream = inputStream;
94 this.consumer = consumer;
95 }
96
97 @Override
98 public void run() {
99 String oldName = Thread.currentThread().getName();
100 Thread.currentThread().setName(name);
101 try (InputStreamReader isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
102 BufferedReader reader = new BufferedReader(isr)) {
103 try {
104 reader.lines().forEach(consumer::accept);
105 } catch (final UncheckedIOException e) {
106 logger.error("while reading output:", e);
107 }
108 } catch (IOException e) {
109 logger.error("while opening log stream", e);
110 } finally {
111 Thread.currentThread().setName(oldName + " (" + name + ")");
112 }
113 }
114 }
115 }