001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 * http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package de.softwareforge.testing.postgres.embedded;
015
016import static com.google.common.base.Preconditions.checkNotNull;
017import static com.google.common.base.Preconditions.checkState;
018import static java.lang.String.format;
019import static java.nio.file.StandardOpenOption.CREATE;
020import static java.nio.file.StandardOpenOption.WRITE;
021
022import java.io.File;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.nio.ByteBuffer;
027import java.nio.channels.AsynchronousFileChannel;
028import java.nio.channels.Channel;
029import java.nio.channels.CompletionHandler;
030import java.nio.channels.FileLock;
031import java.nio.file.FileSystems;
032import java.nio.file.Files;
033import java.nio.file.LinkOption;
034import java.nio.file.Path;
035import java.util.Map;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.Phaser;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040
041import edu.umd.cs.findbugs.annotations.NonNull;
042import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
043import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.tukaani.xz.XZInputStream;
047
048/**
049 * Loads a native binary installation and returns the location of it.
050 *
051 * @since 3.0
052 */
053public final class TarXzCompressedBinaryManager implements NativeBinaryManager {
054
055    private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class);
056
057    private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>();
058
059    private final Lock prepareBinariesLock = new ReentrantLock();
060
061    private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory();
062    private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME;
063
064    private final NativeBinaryLocator nativeBinaryLocator;
065
066    /**
067     * Creates a new binary manager for tar-xz compressed archives.
068     * <p>
069     * The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria:
070     * <ul>
071     *     <li>It must override {@link Object#equals(Object)} and {@link Object#hashCode()}.</li>
072     *     <li>It should implement {@link Object#toString()} to return meaningful information about the locator.</li>
073     *     <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents.
074     *     The operation should be cheap as it may be called multiple times.</li>
075     * </ul>
076     *
077     * @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null.
078     */
079    public TarXzCompressedBinaryManager(@NonNull NativeBinaryLocator nativeBinaryLocator) {
080        this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null");
081    }
082
083    @Override
084    public void setInstallationBaseDirectory(File installationBaseDirectory) {
085        this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null");
086    }
087
088    /**
089     * Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}.
090     *
091     * @param lockFileName Name of a file to use as file lock when unpacking the distribution.
092     */
093    public void setLockFileName(String lockFileName) {
094        this.lockFileName = checkNotNull(lockFileName, "lockFileName is null");
095    }
096
097    @Override
098    @NonNull
099    public File getLocation() throws IOException {
100
101        // the installation cache saves ~ 1% CPU according to the profiler
102        File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator);
103        if (installationDirectory != null && installationDirectory.exists()) {
104            return installationDirectory;
105        }
106
107        prepareBinariesLock.lock();
108        try {
109            String installationIdentifier = nativeBinaryLocator.getIdentifier();
110            installationDirectory = new File(installationBaseDirectory, installationIdentifier);
111            EmbeddedUtil.ensureDirectory(installationDirectory);
112
113            final File unpackLockFile = new File(installationDirectory, lockFileName);
114            final File installationExistsFile = new File(installationDirectory, ".exists");
115
116            if (!installationExistsFile.exists()) {
117                try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile);
118                        FileLock unpackLock = lockStream.getChannel().tryLock()) {
119                    if (unpackLock != null) {
120                        checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile);
121                        LOG.info("extracting archive...");
122                        try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) {
123                            extractTxz(archiveStream, installationDirectory.getPath());
124                            checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile);
125                        }
126                    } else {
127                        // the other guy is unpacking for us.
128                        int maxAttempts = 60;
129                        while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD
130                            Thread.sleep(1000L);
131                        }
132                        checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!");
133                    }
134                } finally {
135                    if (unpackLockFile.exists() && !unpackLockFile.delete()) {
136                        LOG.error(format("could not remove lock file %s", unpackLockFile.getAbsolutePath()));
137                    }
138                }
139            }
140
141            KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory);
142            LOG.debug(format("Unpacked archive at %s", installationDirectory));
143            return installationDirectory;
144
145        } catch (final InterruptedException e) {
146            Thread.currentThread().interrupt();
147            throw new IOException(e);
148        } finally {
149            prepareBinariesLock.lock();
150        }
151    }
152
153    /**
154     * Unpack archive compressed by tar with xz compression.
155     *
156     * @param stream    A tar-xz compressed data stream.
157     * @param targetDir The directory to extract the content to.
158     */
159    private static void extractTxz(InputStream stream, String targetDir) throws IOException {
160        try (XZInputStream xzIn = new XZInputStream(stream);
161                TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) {
162            final Phaser phaser = new Phaser(1);
163            TarArchiveEntry entry;
164
165            while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD
166                final String individualFile = entry.getName();
167                final File fsObject = new File(targetDir, individualFile);
168                final Path fsPath = fsObject.toPath();
169                if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) {
170                    Files.delete(fsPath);
171                    LOG.debug(format("Deleting existing entry %s", fsPath));
172                }
173
174                if (entry.isSymbolicLink() || entry.isLink()) {
175                    Path target = FileSystems.getDefault().getPath(entry.getLinkName());
176                    Files.createSymbolicLink(fsPath, target);
177                } else if (entry.isFile()) {
178                    byte[] content = new byte[(int) entry.getSize()];
179                    int read = tarIn.read(content, 0, content.length);
180                    checkState(read != -1, "could not read %s", individualFile);
181                    EmbeddedUtil.ensureDirectory(fsObject.getParentFile());
182
183                    final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD
184                    final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD
185
186                    phaser.register();
187                    fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
188                        @Override
189                        public void completed(Integer written, Channel channel) {
190                            closeChannel(channel);
191                        }
192
193                        @Override
194                        public void failed(Throwable error, Channel channel) {
195                            LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error);
196                            closeChannel(channel);
197                        }
198
199                        private void closeChannel(Channel channel) {
200                            try {
201                                channel.close();
202                            } catch (IOException e) {
203                                LOG.error("While closing channel:", e);
204                            } finally {
205                                phaser.arriveAndDeregister();
206                            }
207                        }
208                    });
209                } else if (entry.isDirectory()) {
210                    EmbeddedUtil.ensureDirectory(fsObject);
211                } else {
212                    throw new IOException(format("Unsupported entry in tar file found: %s", individualFile));
213                }
214
215                if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) {
216                    if (!fsObject.setExecutable(true, false)) {
217                        throw new IOException(format("Could not make %s executable!", individualFile));
218                    }
219                }
220            }
221
222            phaser.arriveAndAwaitAdvance();
223        }
224    }
225}