View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package de.softwareforge.testing.postgres.embedded;
16  
17  import java.io.BufferedReader;
18  import java.io.Closeable;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.InputStreamReader;
22  import java.io.UncheckedIOException;
23  import java.nio.charset.StandardCharsets;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.Future;
26  import java.util.function.BiConsumer;
27  import java.util.function.Consumer;
28  
29  import com.google.common.util.concurrent.ListeningExecutorService;
30  import com.google.common.util.concurrent.MoreExecutors;
31  import com.google.common.util.concurrent.ThreadFactoryBuilder;
32  import org.slf4j.Logger;
33  
34  /**
35   * Read from an {@link InputStream} and send the information to the logger supplied.
36   * <p>
37   * The use of the input stream is thread safe since it's used only in a single thread&mdash;the one launched by this code.
38   */
39  class ProcessOutputLogger implements Closeable {
40  
41      private final Logger logger;
42      private final ListeningExecutorService executorService;
43  
44      ProcessOutputLogger(final Logger errorLogger) {
45          this.logger = errorLogger;
46          this.executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
47                  .setDaemon(true)
48                  .setNameFormat("logger-thread-%d")
49                  .build()));
50      }
51  
52      @Override
53      public void close() {
54          this.executorService.shutdownNow();
55      }
56  
57      StreamCapture captureStreamAsLog() {
58          return new StreamCapture(logger::debug);
59      }
60  
61      StreamCapture captureStreamAsConsumer(Consumer<String> consumer) {
62          return new StreamCapture(consumer);
63      }
64  
65  
66      class StreamCapture implements BiConsumer<String, InputStream> {
67  
68          private final Consumer<String> consumer;
69          private volatile Future<?> completionFuture = null;
70  
71          private StreamCapture(Consumer<String> consumer) {
72              this.consumer = consumer;
73          }
74  
75          @Override
76          public void accept(String name, InputStream inputStream) {
77              this.completionFuture = executorService.submit(new LogRunnable(name, inputStream, consumer));
78          }
79  
80          public Future<?> getCompletion() {
81              return completionFuture;
82          }
83      }
84  
85      private class LogRunnable implements Runnable {
86  
87          private final InputStream inputStream;
88          private final String name;
89          private final Consumer<String> consumer;
90  
91          private LogRunnable(String name, InputStream inputStream, Consumer<String> consumer) {
92              this.name = name;
93              this.inputStream = inputStream;
94              this.consumer = consumer;
95          }
96  
97          @Override
98          public void run() {
99              String oldName = Thread.currentThread().getName();
100             Thread.currentThread().setName(name);
101             try (InputStreamReader isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
102                     BufferedReader reader = new BufferedReader(isr)) {
103                 try {
104                     reader.lines().forEach(consumer::accept);
105                 } catch (final UncheckedIOException e) {
106                     logger.error("while reading output:", e);
107                 }
108             } catch (IOException e) {
109                 logger.error("while opening log stream", e);
110             } finally {
111                 Thread.currentThread().setName(oldName + " (" + name + ")");
112             }
113         }
114     }
115 }