TarXzCompressedBinaryManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.softwareforge.testing.postgres.embedded;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLock;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.XZInputStream;
/**
* Loads a native binary installation and returns the location of it.
*/
public final class TarXzCompressedBinaryManager implements NativeBinaryManager {
private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class);
private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>();
private final Lock prepareBinariesLock = new ReentrantLock();
private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory();
private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME;
private final NativeBinaryLocator nativeBinaryLocator;
/**
* Creates a new binary manager for tar-xz compressed archives.
* <p>
* The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria:
* <ul>
* <li>It must implement {@link #equals(Object)} and {@link #hashCode()}.</li>
* <li>It should implement {@link #toString()} to return meaningful information about the locator.</li>
* <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents.
* The operation should be cheap as it may be called multiple times.</li>
* </ul>
*
* @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null.
*/
public TarXzCompressedBinaryManager(@NonNull NativeBinaryLocator nativeBinaryLocator) {
this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null");
checkState(this.installationBaseDirectory.setWritable(true, false),
"Could not make install base directory %s writable!", this.installationBaseDirectory);
}
@Override
public void setInstallationBaseDirectory(File installationBaseDirectory) {
this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null");
checkState(this.installationBaseDirectory.setWritable(true, false),
"Could not make install base directory %s writable!", this.installationBaseDirectory);
}
/**
* Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}.
*
* @param lockFileName Name of a file to use as file lock when unpacking the distribution.
*/
public void setLockFileName(String lockFileName) {
this.lockFileName = checkNotNull(lockFileName, "lockFileName is null");
}
@Override
@NonNull
public File getLocation() throws IOException {
// the installation cache saves ~ 1% CPU according to the profiler
File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator);
if (installationDirectory != null && installationDirectory.exists()) {
return installationDirectory;
}
prepareBinariesLock.lock();
try {
String installationIdentifier = nativeBinaryLocator.getIdentifier();
installationDirectory = new File(installationBaseDirectory, installationIdentifier);
EmbeddedUtil.mkdirs(installationDirectory);
final File unpackLockFile = new File(installationDirectory, lockFileName);
final File installationExistsFile = new File(installationDirectory, ".exists");
if (!installationExistsFile.exists()) {
try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile);
FileLock unpackLock = lockStream.getChannel().tryLock()) {
if (unpackLock != null) {
checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile);
LOG.info("extracting archive...");
try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) {
extractTxz(archiveStream, installationDirectory.getPath());
checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile);
}
} else {
// the other guy is unpacking for us.
int maxAttempts = 60;
while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD
Thread.sleep(1000L);
}
checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!");
}
} finally {
if (unpackLockFile.exists() && !unpackLockFile.delete()) {
LOG.error(format("could not remove lock file %s", unpackLockFile.getAbsolutePath()));
}
}
}
KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory);
LOG.debug(format("Unpacked archive at %s", installationDirectory));
return installationDirectory;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} finally {
prepareBinariesLock.lock();
}
}
/**
* Unpack archive compressed by tar with xz compression.
*
* @param stream A tar-xz compressed data stream.
* @param targetDir The directory to extract the content to.
*/
private static void extractTxz(InputStream stream, String targetDir) throws IOException {
try (XZInputStream xzIn = new XZInputStream(stream);
TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) {
final Phaser phaser = new Phaser(1);
TarArchiveEntry entry;
while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD
final String individualFile = entry.getName();
final File fsObject = new File(targetDir, individualFile);
final Path fsPath = fsObject.toPath();
if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) {
Files.delete(fsPath);
LOG.debug(format("Deleting existing entry %s", fsPath));
}
if (entry.isSymbolicLink() || entry.isLink()) {
Path target = FileSystems.getDefault().getPath(entry.getLinkName());
Files.createSymbolicLink(fsPath, target);
} else if (entry.isFile()) {
byte[] content = new byte[(int) entry.getSize()];
int read = tarIn.read(content, 0, content.length);
checkState(read != -1, "could not read %s", individualFile);
EmbeddedUtil.mkdirs(fsObject.getParentFile());
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD
final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD
phaser.register();
fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() {
@Override
public void completed(Integer written, Channel channel) {
closeChannel(channel);
}
@Override
public void failed(Throwable error, Channel channel) {
LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error);
closeChannel(channel);
}
private void closeChannel(Channel channel) {
try {
channel.close();
} catch (IOException e) {
LOG.error("While closing channel:", e);
} finally {
phaser.arriveAndDeregister();
}
}
});
} else if (entry.isDirectory()) {
EmbeddedUtil.mkdirs(fsObject);
} else {
throw new IOException(format("Unsupported entry in tar file found: %s", individualFile));
}
if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) {
if (!fsObject.setExecutable(true, false)) {
throw new IOException(format("Could not make %s executable!", individualFile));
}
}
}
phaser.arriveAndAwaitAdvance();
}
}
}