001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 * http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package de.softwareforge.testing.postgres.embedded;
015
016import static com.google.common.base.Preconditions.checkNotNull;
017import static com.google.common.base.Preconditions.checkState;
018import static java.lang.String.format;
019import static java.nio.file.StandardOpenOption.CREATE;
020import static java.nio.file.StandardOpenOption.WRITE;
021
022import java.io.File;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.nio.ByteBuffer;
027import java.nio.channels.AsynchronousFileChannel;
028import java.nio.channels.Channel;
029import java.nio.channels.CompletionHandler;
030import java.nio.channels.FileLock;
031import java.nio.file.FileSystems;
032import java.nio.file.Files;
033import java.nio.file.LinkOption;
034import java.nio.file.Path;
035import java.util.Map;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.Phaser;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040
041import edu.umd.cs.findbugs.annotations.NonNull;
042import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
043import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.tukaani.xz.XZInputStream;
047
048/**
049 * Loads a native binary installation and returns the location of it.
050 */
051public final class TarXzCompressedBinaryManager implements NativeBinaryManager {
052
053    private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class);
054
055    private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>();
056
057    private final Lock prepareBinariesLock = new ReentrantLock();
058
059    private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory();
060    private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME;
061
062    private final NativeBinaryLocator nativeBinaryLocator;
063
064    /**
065     * Creates a new binary manager for tar-xz compressed archives.
066     * <p>
067     * The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria:
068     * <ul>
069     *     <li>It must implement {@link #equals(Object)} and {@link #hashCode()}.</li>
070     *     <li>It should implement {@link #toString()} to return meaningful information about the locator.</li>
071     *     <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents.
072     *     The operation should be cheap as it may be called multiple times.</li>
073     * </ul>
074     *
075     * @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null.
076     */
077    public TarXzCompressedBinaryManager(@NonNull NativeBinaryLocator nativeBinaryLocator) {
078        this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null");
079
080        checkState(this.installationBaseDirectory.setWritable(true, false),
081                "Could not make install base directory %s writable!", this.installationBaseDirectory);
082    }
083
084    @Override
085    public void setInstallationBaseDirectory(File installationBaseDirectory) {
086        this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null");
087
088        checkState(this.installationBaseDirectory.setWritable(true, false),
089                "Could not make install base directory %s writable!", this.installationBaseDirectory);
090    }
091
092    /**
093     * Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}.
094     *
095     * @param lockFileName Name of a file to use as file lock when unpacking the distribution.
096     */
097    public void setLockFileName(String lockFileName) {
098        this.lockFileName = checkNotNull(lockFileName, "lockFileName is null");
099    }
100
101    @Override
102    @NonNull
103    public File getLocation() throws IOException {
104
105        // the installation cache saves ~ 1% CPU according to the profiler
106        File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator);
107        if (installationDirectory != null && installationDirectory.exists()) {
108            return installationDirectory;
109        }
110
111        prepareBinariesLock.lock();
112        try {
113            String installationIdentifier = nativeBinaryLocator.getIdentifier();
114            installationDirectory = new File(installationBaseDirectory, installationIdentifier);
115            EmbeddedUtil.mkdirs(installationDirectory);
116
117            final File unpackLockFile = new File(installationDirectory, lockFileName);
118            final File installationExistsFile = new File(installationDirectory, ".exists");
119
120            if (!installationExistsFile.exists()) {
121                try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile);
122                        FileLock unpackLock = lockStream.getChannel().tryLock()) {
123                    if (unpackLock != null) {
124                        checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile);
125                        LOG.info("extracting archive...");
126                        try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) {
127                            extractTxz(archiveStream, installationDirectory.getPath());
128                            checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile);
129                        }
130                    } else {
131                        // the other guy is unpacking for us.
132                        int maxAttempts = 60;
133                        while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD
134                            Thread.sleep(1000L);
135                        }
136                        checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!");
137                    }
138                } finally {
139                    if (unpackLockFile.exists() && !unpackLockFile.delete()) {
140                        LOG.error(format("could not remove lock file %s", unpackLockFile.getAbsolutePath()));
141                    }
142                }
143            }
144
145            KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory);
146            LOG.debug(format("Unpacked archive at %s", installationDirectory));
147            return installationDirectory;
148
149        } catch (final InterruptedException e) {
150            Thread.currentThread().interrupt();
151            throw new IOException(e);
152        } finally {
153            prepareBinariesLock.lock();
154        }
155    }
156
157    /**
158     * Unpack archive compressed by tar with xz compression.
159     *
160     * @param stream    A tar-xz compressed data stream.
161     * @param targetDir The directory to extract the content to.
162     */
163    private static void extractTxz(InputStream stream, String targetDir) throws IOException {
164        try (XZInputStream xzIn = new XZInputStream(stream);
165                TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) {
166            final Phaser phaser = new Phaser(1);
167            TarArchiveEntry entry;
168
169            while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD
170                final String individualFile = entry.getName();
171                final File fsObject = new File(targetDir, individualFile);
172                final Path fsPath = fsObject.toPath();
173                if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) {
174                    Files.delete(fsPath);
175                    LOG.debug(format("Deleting existing entry %s", fsPath));
176                }
177
178                if (entry.isSymbolicLink() || entry.isLink()) {
179                    Path target = FileSystems.getDefault().getPath(entry.getLinkName());
180                    Files.createSymbolicLink(fsPath, target);
181                } else if (entry.isFile()) {
182                    byte[] content = new byte[(int) entry.getSize()];
183                    int read = tarIn.read(content, 0, content.length);
184                    checkState(read != -1, "could not read %s", individualFile);
185                    EmbeddedUtil.mkdirs(fsObject.getParentFile());
186
187                    final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD
188                    final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD
189
190                    phaser.register();
191                    fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
192                        @Override
193                        public void completed(Integer written, Channel channel) {
194                            closeChannel(channel);
195                        }
196
197                        @Override
198                        public void failed(Throwable error, Channel channel) {
199                            LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error);
200                            closeChannel(channel);
201                        }
202
203                        private void closeChannel(Channel channel) {
204                            try {
205                                channel.close();
206                            } catch (IOException e) {
207                                LOG.error("While closing channel:", e);
208                            } finally {
209                                phaser.arriveAndDeregister();
210                            }
211                        }
212                    });
213                } else if (entry.isDirectory()) {
214                    EmbeddedUtil.mkdirs(fsObject);
215                } else {
216                    throw new IOException(format("Unsupported entry in tar file found: %s", individualFile));
217                }
218
219                if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) {
220                    if (!fsObject.setExecutable(true, false)) {
221                        throw new IOException(format("Could not make %s executable!", individualFile));
222                    }
223                }
224            }
225
226            phaser.arriveAndAwaitAdvance();
227        }
228    }
229}