View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package de.softwareforge.testing.postgres.embedded;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  import static com.google.common.base.Preconditions.checkState;
19  import static java.lang.String.format;
20  import static java.nio.file.StandardOpenOption.CREATE;
21  import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
22  import static java.nio.file.StandardOpenOption.WRITE;
23  
24  import jakarta.annotation.Nonnull;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.AsynchronousFileChannel;
30  import java.nio.channels.Channel;
31  import java.nio.channels.CompletionHandler;
32  import java.nio.channels.FileChannel;
33  import java.nio.channels.FileLock;
34  import java.nio.file.FileSystems;
35  import java.nio.file.Files;
36  import java.nio.file.LinkOption;
37  import java.nio.file.Path;
38  import java.util.Map;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.Phaser;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
45  import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  import org.tukaani.xz.XZInputStream;
49  
50  /**
51   * Loads a native binary installation and returns the location of it.
52   *
53   * @since 3.0
54   */
55  public final class TarXzCompressedBinaryManager implements NativeBinaryManager {
56  
57      private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class);
58  
59      private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>();
60  
61      private final Lock prepareBinariesLock = new ReentrantLock();
62  
63      private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory();
64      private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME;
65  
66      private final NativeBinaryLocator nativeBinaryLocator;
67  
68      /**
69       * Creates a new binary manager for tar-xz compressed archives.
70       * <p>
71       * The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria:
72       * <ul>
73       *     <li>It must override {@link Object#equals(Object)} and {@link Object#hashCode()}.</li>
74       *     <li>It should implement {@link Object#toString()} to return meaningful information about the locator.</li>
75       *     <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents.
76       *     The operation should be cheap as it may be called multiple times.</li>
77       * </ul>
78       *
79       * @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null.
80       */
81      public TarXzCompressedBinaryManager(@Nonnull NativeBinaryLocator nativeBinaryLocator) {
82          this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null");
83      }
84  
85      @Override
86      public void setInstallationBaseDirectory(File installationBaseDirectory) {
87          this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null");
88      }
89  
90      /**
91       * Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}.
92       *
93       * @param lockFileName Name of a file to use as file lock when unpacking the distribution.
94       */
95      public void setLockFileName(String lockFileName) {
96          this.lockFileName = checkNotNull(lockFileName, "lockFileName is null");
97      }
98  
99      @Override
100     @Nonnull
101     public File getLocation() throws IOException {
102 
103         // the installation cache saves ~ 1% CPU according to the profiler
104         File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator);
105         if (installationDirectory != null && installationDirectory.exists()) {
106             return installationDirectory;
107         }
108 
109         prepareBinariesLock.lock();
110         try {
111             String installationIdentifier = nativeBinaryLocator.getIdentifier();
112             installationDirectory = new File(installationBaseDirectory, installationIdentifier);
113             EmbeddedUtil.ensureDirectory(installationDirectory);
114 
115             final File unpackLockFile = new File(installationDirectory, lockFileName);
116             final File installationExistsFile = new File(installationDirectory, ".exists");
117 
118             if (!installationExistsFile.exists()) {
119                 try (FileChannel lockChannel = FileChannel.open(unpackLockFile.toPath(), CREATE, WRITE, TRUNCATE_EXISTING);
120                         FileLock unpackLock = lockChannel.tryLock()) {
121                     if (unpackLock != null) {
122                         checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile);
123                         LOG.info("extracting archive...");
124                         try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) {
125                             extractTxz(archiveStream, installationDirectory.getPath());
126                             checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile);
127                         }
128                     } else {
129                         // the other guy is unpacking for us.
130                         int maxAttempts = 60;
131                         while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD
132                             Thread.sleep(1000L);
133                         }
134                         checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!");
135                     }
136                 } finally {
137                     Files.deleteIfExists(unpackLockFile.toPath());
138                 }
139             }
140 
141             KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory);
142             LOG.debug(format("Unpacked archive at %s", installationDirectory));
143             return installationDirectory;
144 
145         } catch (final InterruptedException e) {
146             Thread.currentThread().interrupt();
147             throw new IOException(e);
148         } finally {
149             prepareBinariesLock.lock();
150         }
151     }
152 
153     /**
154      * Unpack archive compressed by tar with xz compression.
155      *
156      * @param stream    A tar-xz compressed data stream.
157      * @param targetDir The directory to extract the content to.
158      */
159     private static void extractTxz(InputStream stream, String targetDir) throws IOException {
160         try (XZInputStream xzIn = new XZInputStream(stream);
161                 TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) {
162             final Phaser phaser = new Phaser(1);
163             TarArchiveEntry entry;
164 
165             while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD
166                 final String individualFile = entry.getName();
167                 final File fsObject = new File(targetDir, individualFile);
168                 final Path fsPath = fsObject.toPath();
169                 if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) {
170                     Files.delete(fsPath);
171                     LOG.debug(format("Deleting existing entry %s", fsPath));
172                 }
173 
174                 if (entry.isSymbolicLink() || entry.isLink()) {
175                     Path target = FileSystems.getDefault().getPath(entry.getLinkName());
176                     Files.createSymbolicLink(fsPath, target);
177                 } else if (entry.isFile()) {
178                     byte[] content = new byte[(int) entry.getSize()];
179                     int read = tarIn.read(content, 0, content.length);
180                     checkState(read != -1, "could not read %s", individualFile);
181                     EmbeddedUtil.ensureDirectory(fsObject.getParentFile());
182 
183                     final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD
184                     final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD
185 
186                     phaser.register();
187                     fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
188                         @Override
189                         public void completed(Integer written, Channel channel) {
190                             closeChannel(channel);
191                         }
192 
193                         @Override
194                         public void failed(Throwable error, Channel channel) {
195                             LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error);
196                             closeChannel(channel);
197                         }
198 
199                         private void closeChannel(Channel channel) {
200                             try {
201                                 channel.close();
202                             } catch (IOException e) {
203                                 LOG.error("While closing channel:", e);
204                             } finally {
205                                 phaser.arriveAndDeregister();
206                             }
207                         }
208                     });
209                 } else if (entry.isDirectory()) {
210                     EmbeddedUtil.ensureDirectory(fsObject);
211                 } else {
212                     throw new IOException(format("Unsupported entry in tar file found: %s", individualFile));
213                 }
214 
215                 if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) {
216                     if (!fsObject.setExecutable(true, false)) {
217                         throw new IOException(format("Could not make %s executable!", individualFile));
218                     }
219                 }
220             }
221 
222             phaser.arriveAndAwaitAdvance();
223         }
224     }
225 }