001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 * http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014
015package de.softwareforge.testing.postgres.embedded;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.base.Preconditions.checkState;
019import static java.lang.String.format;
020import static java.nio.file.StandardOpenOption.CREATE;
021import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
022import static java.nio.file.StandardOpenOption.WRITE;
023
024import jakarta.annotation.Nonnull;
025import java.io.File;
026import java.io.IOException;
027import java.io.InputStream;
028import java.nio.ByteBuffer;
029import java.nio.channels.AsynchronousFileChannel;
030import java.nio.channels.Channel;
031import java.nio.channels.CompletionHandler;
032import java.nio.channels.FileChannel;
033import java.nio.channels.FileLock;
034import java.nio.file.FileSystems;
035import java.nio.file.Files;
036import java.nio.file.LinkOption;
037import java.nio.file.Path;
038import java.util.Map;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.Phaser;
041import java.util.concurrent.locks.Lock;
042import java.util.concurrent.locks.ReentrantLock;
043
044import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
045import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.tukaani.xz.XZInputStream;
049
050/**
051 * Loads a native binary installation and returns the location of it.
052 *
053 * @since 3.0
054 */
055public final class TarXzCompressedBinaryManager implements NativeBinaryManager {
056
057    private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class);
058
059    private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>();
060
061    private final Lock prepareBinariesLock = new ReentrantLock();
062
063    private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory();
064    private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME;
065
066    private final NativeBinaryLocator nativeBinaryLocator;
067
068    /**
069     * Creates a new binary manager for tar-xz compressed archives.
070     * <p>
071     * The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria:
072     * <ul>
073     *     <li>It must override {@link Object#equals(Object)} and {@link Object#hashCode()}.</li>
074     *     <li>It should implement {@link Object#toString()} to return meaningful information about the locator.</li>
075     *     <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents.
076     *     The operation should be cheap as it may be called multiple times.</li>
077     * </ul>
078     *
079     * @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null.
080     */
081    public TarXzCompressedBinaryManager(@Nonnull NativeBinaryLocator nativeBinaryLocator) {
082        this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null");
083    }
084
085    @Override
086    public void setInstallationBaseDirectory(File installationBaseDirectory) {
087        this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null");
088    }
089
090    /**
091     * Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}.
092     *
093     * @param lockFileName Name of a file to use as file lock when unpacking the distribution.
094     */
095    public void setLockFileName(String lockFileName) {
096        this.lockFileName = checkNotNull(lockFileName, "lockFileName is null");
097    }
098
099    @Override
100    @Nonnull
101    public File getLocation() throws IOException {
102
103        // the installation cache saves ~ 1% CPU according to the profiler
104        File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator);
105        if (installationDirectory != null && installationDirectory.exists()) {
106            return installationDirectory;
107        }
108
109        prepareBinariesLock.lock();
110        try {
111            String installationIdentifier = nativeBinaryLocator.getIdentifier();
112            installationDirectory = new File(installationBaseDirectory, installationIdentifier);
113            EmbeddedUtil.ensureDirectory(installationDirectory);
114
115            final File unpackLockFile = new File(installationDirectory, lockFileName);
116            final File installationExistsFile = new File(installationDirectory, ".exists");
117
118            if (!installationExistsFile.exists()) {
119                try (FileChannel lockChannel = FileChannel.open(unpackLockFile.toPath(), CREATE, WRITE, TRUNCATE_EXISTING);
120                        FileLock unpackLock = lockChannel.tryLock()) {
121                    if (unpackLock != null) {
122                        checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile);
123                        LOG.info("extracting archive...");
124                        try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) {
125                            extractTxz(archiveStream, installationDirectory.getPath());
126                            checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile);
127                        }
128                    } else {
129                        // the other guy is unpacking for us.
130                        int maxAttempts = 60;
131                        while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD
132                            Thread.sleep(1000L);
133                        }
134                        checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!");
135                    }
136                } finally {
137                    Files.deleteIfExists(unpackLockFile.toPath());
138                }
139            }
140
141            KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory);
142            LOG.debug(format("Unpacked archive at %s", installationDirectory));
143            return installationDirectory;
144
145        } catch (final InterruptedException e) {
146            Thread.currentThread().interrupt();
147            throw new IOException(e);
148        } finally {
149            prepareBinariesLock.lock();
150        }
151    }
152
153    /**
154     * Unpack archive compressed by tar with xz compression.
155     *
156     * @param stream    A tar-xz compressed data stream.
157     * @param targetDir The directory to extract the content to.
158     */
159    private static void extractTxz(InputStream stream, String targetDir) throws IOException {
160        try (XZInputStream xzIn = new XZInputStream(stream);
161                TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) {
162            final Phaser phaser = new Phaser(1);
163            TarArchiveEntry entry;
164
165            while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD
166                final String individualFile = entry.getName();
167                final File fsObject = new File(targetDir, individualFile);
168                final Path fsPath = fsObject.toPath();
169                if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) {
170                    Files.delete(fsPath);
171                    LOG.debug(format("Deleting existing entry %s", fsPath));
172                }
173
174                if (entry.isSymbolicLink() || entry.isLink()) {
175                    Path target = FileSystems.getDefault().getPath(entry.getLinkName());
176                    Files.createSymbolicLink(fsPath, target);
177                } else if (entry.isFile()) {
178                    byte[] content = new byte[(int) entry.getSize()];
179                    int read = tarIn.read(content, 0, content.length);
180                    checkState(read != -1, "could not read %s", individualFile);
181                    EmbeddedUtil.ensureDirectory(fsObject.getParentFile());
182
183                    final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD
184                    final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD
185
186                    phaser.register();
187                    fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
188                        @Override
189                        public void completed(Integer written, Channel channel) {
190                            closeChannel(channel);
191                        }
192
193                        @Override
194                        public void failed(Throwable error, Channel channel) {
195                            LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error);
196                            closeChannel(channel);
197                        }
198
199                        private void closeChannel(Channel channel) {
200                            try {
201                                channel.close();
202                            } catch (IOException e) {
203                                LOG.error("While closing channel:", e);
204                            } finally {
205                                phaser.arriveAndDeregister();
206                            }
207                        }
208                    });
209                } else if (entry.isDirectory()) {
210                    EmbeddedUtil.ensureDirectory(fsObject);
211                } else {
212                    throw new IOException(format("Unsupported entry in tar file found: %s", individualFile));
213                }
214
215                if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) {
216                    if (!fsObject.setExecutable(true, false)) {
217                        throw new IOException(format("Could not make %s executable!", individualFile));
218                    }
219                }
220            }
221
222            phaser.arriveAndAwaitAdvance();
223        }
224    }
225}