1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  package de.softwareforge.testing.postgres.embedded;
16  
17  import java.io.BufferedReader;
18  import java.io.Closeable;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.InputStreamReader;
22  import java.io.UncheckedIOException;
23  import java.nio.charset.StandardCharsets;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.Future;
26  import java.util.function.BiConsumer;
27  import java.util.function.Consumer;
28  
29  import com.google.common.util.concurrent.ListeningExecutorService;
30  import com.google.common.util.concurrent.MoreExecutors;
31  import com.google.common.util.concurrent.ThreadFactoryBuilder;
32  import org.slf4j.Logger;
33  
34  
35  
36  
37  
38  
39  final class ProcessOutputLogger implements Closeable {
40  
41      private final Logger logger;
42      private final ListeningExecutorService executorService;
43  
44      ProcessOutputLogger(final Logger errorLogger) {
45          this.logger = errorLogger;
46          this.executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
47                  .setDaemon(true)
48                  .setNameFormat("logger-thread-%d")
49                  .build()));
50      }
51  
52      @Override
53      public void close() {
54          this.executorService.shutdownNow();
55      }
56  
57      StreamCapture captureStreamAsLog() {
58          return new StreamCapture(logger::debug);
59      }
60  
61      StreamCapture captureStreamAsConsumer(Consumer<String> consumer) {
62          return new StreamCapture(consumer);
63      }
64  
65  
66      final class StreamCapture implements BiConsumer<String, InputStream> {
67  
68          private final Consumer<String> consumer;
69          private volatile Future<?> completionFuture = null;
70  
71          private StreamCapture(Consumer<String> consumer) {
72              this.consumer = consumer;
73          }
74  
75          @Override
76          public void accept(String name, InputStream inputStream) {
77              this.completionFuture = executorService.submit(new LogRunnable(name, inputStream, consumer));
78          }
79  
80          public Future<?> getCompletion() {
81              return completionFuture;
82          }
83      }
84  
85      private final class LogRunnable implements Runnable {
86  
87          private final InputStream inputStream;
88          private final String name;
89          private final Consumer<String> consumer;
90  
91          private LogRunnable(String name, InputStream inputStream, Consumer<String> consumer) {
92              this.name = name;
93              this.inputStream = inputStream;
94              this.consumer = consumer;
95          }
96  
97          @Override
98          public void run() {
99              String oldName = Thread.currentThread().getName();
100             Thread.currentThread().setName(name);
101             try (InputStreamReader isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
102                     BufferedReader reader = new BufferedReader(isr)) {
103                 try {
104                     reader.lines().forEach(consumer);
105                 } catch (final UncheckedIOException e) {
106                     logger.error("while reading output:", e);
107                 }
108             } catch (IOException e) {
109                 logger.error("while opening log stream", e);
110             } finally {
111                 Thread.currentThread().setName(oldName + " (" + name + ")");
112             }
113         }
114     }
115 }