001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package de.softwareforge.testing.postgres.embedded; 015 016import static com.google.common.base.Preconditions.checkNotNull; 017import static com.google.common.base.Preconditions.checkState; 018import static java.lang.String.format; 019import static java.nio.file.StandardOpenOption.CREATE; 020import static java.nio.file.StandardOpenOption.WRITE; 021 022import java.io.File; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.nio.ByteBuffer; 027import java.nio.channels.AsynchronousFileChannel; 028import java.nio.channels.Channel; 029import java.nio.channels.CompletionHandler; 030import java.nio.channels.FileLock; 031import java.nio.file.FileSystems; 032import java.nio.file.Files; 033import java.nio.file.LinkOption; 034import java.nio.file.Path; 035import java.util.Map; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.Phaser; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040 041import edu.umd.cs.findbugs.annotations.NonNull; 042import org.apache.commons.compress.archivers.tar.TarArchiveEntry; 043import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.tukaani.xz.XZInputStream; 047 048/** 049 * Loads a native binary installation and returns the location of it. 050 */ 051public final class TarXzCompressedBinaryManager implements NativeBinaryManager { 052 053 private static final Logger LOG = LoggerFactory.getLogger(TarXzCompressedBinaryManager.class); 054 055 private static final Map<NativeBinaryLocator, File> KNOWN_INSTALLATIONS = new ConcurrentHashMap<>(); 056 057 private final Lock prepareBinariesLock = new ReentrantLock(); 058 059 private File installationBaseDirectory = EmbeddedUtil.getWorkingDirectory(); 060 private String lockFileName = EmbeddedPostgres.LOCK_FILE_NAME; 061 062 private final NativeBinaryLocator nativeBinaryLocator; 063 064 /** 065 * Creates a new binary manager for tar-xz compressed archives. 066 * <p> 067 * The implementation of {@link NativeBinaryLocator} to locate the stream that gets unpacked must satisfy the following criteria: 068 * <ul> 069 * <li>It must implement {@link #equals(Object)} and {@link #hashCode()}.</li> 070 * <li>It should implement {@link #toString()} to return meaningful information about the locator.</li> 071 * <li>It must allow multiple calls to {@link NativeBinaryLocator#getInputStream()} which all return the same, byte-identical contents. 072 * The operation should be cheap as it may be called multiple times.</li> 073 * </ul> 074 * 075 * @param nativeBinaryLocator An implementation of {@link NativeBinaryLocator} that satisfies the conditions above. Must not be null. 076 */ 077 public TarXzCompressedBinaryManager(@NonNull NativeBinaryLocator nativeBinaryLocator) { 078 this.nativeBinaryLocator = checkNotNull(nativeBinaryLocator, "nativeBinaryLocator is null"); 079 080 checkState(this.installationBaseDirectory.setWritable(true, false), 081 "Could not make install base directory %s writable!", this.installationBaseDirectory); 082 } 083 084 @Override 085 public void setInstallationBaseDirectory(File installationBaseDirectory) { 086 this.installationBaseDirectory = checkNotNull(installationBaseDirectory, "installationBaseDirectory is null"); 087 088 checkState(this.installationBaseDirectory.setWritable(true, false), 089 "Could not make install base directory %s writable!", this.installationBaseDirectory); 090 } 091 092 /** 093 * Sets the lock file name. This method must be called before the first call to {@link TarXzCompressedBinaryManager#getLocation()}. 094 * 095 * @param lockFileName Name of a file to use as file lock when unpacking the distribution. 096 */ 097 public void setLockFileName(String lockFileName) { 098 this.lockFileName = checkNotNull(lockFileName, "lockFileName is null"); 099 } 100 101 @Override 102 @NonNull 103 public File getLocation() throws IOException { 104 105 // the installation cache saves ~ 1% CPU according to the profiler 106 File installationDirectory = KNOWN_INSTALLATIONS.get(nativeBinaryLocator); 107 if (installationDirectory != null && installationDirectory.exists()) { 108 return installationDirectory; 109 } 110 111 prepareBinariesLock.lock(); 112 try { 113 String installationIdentifier = nativeBinaryLocator.getIdentifier(); 114 installationDirectory = new File(installationBaseDirectory, installationIdentifier); 115 EmbeddedUtil.mkdirs(installationDirectory); 116 117 final File unpackLockFile = new File(installationDirectory, lockFileName); 118 final File installationExistsFile = new File(installationDirectory, ".exists"); 119 120 if (!installationExistsFile.exists()) { 121 try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile); 122 FileLock unpackLock = lockStream.getChannel().tryLock()) { 123 if (unpackLock != null) { 124 checkState(!installationExistsFile.exists(), "unpack lock acquired but .exists file is present " + installationExistsFile); 125 LOG.info("extracting archive..."); 126 try (InputStream archiveStream = nativeBinaryLocator.getInputStream()) { 127 extractTxz(archiveStream, installationDirectory.getPath()); 128 checkState(installationExistsFile.createNewFile(), "couldn't create %s file!", installationExistsFile); 129 } 130 } else { 131 // the other guy is unpacking for us. 132 int maxAttempts = 60; 133 while (!installationExistsFile.exists() && --maxAttempts > 0) { // NOPMD 134 Thread.sleep(1000L); 135 } 136 checkState(installationExistsFile.exists(), "Waited 60 seconds for archive to be unpacked but it never finished!"); 137 } 138 } finally { 139 if (unpackLockFile.exists() && !unpackLockFile.delete()) { 140 LOG.error(format("could not remove lock file %s", unpackLockFile.getAbsolutePath())); 141 } 142 } 143 } 144 145 KNOWN_INSTALLATIONS.putIfAbsent(nativeBinaryLocator, installationDirectory); 146 LOG.debug(format("Unpacked archive at %s", installationDirectory)); 147 return installationDirectory; 148 149 } catch (final InterruptedException e) { 150 Thread.currentThread().interrupt(); 151 throw new IOException(e); 152 } finally { 153 prepareBinariesLock.lock(); 154 } 155 } 156 157 /** 158 * Unpack archive compressed by tar with xz compression. 159 * 160 * @param stream A tar-xz compressed data stream. 161 * @param targetDir The directory to extract the content to. 162 */ 163 private static void extractTxz(InputStream stream, String targetDir) throws IOException { 164 try (XZInputStream xzIn = new XZInputStream(stream); 165 TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)) { 166 final Phaser phaser = new Phaser(1); 167 TarArchiveEntry entry; 168 169 while ((entry = tarIn.getNextTarEntry()) != null) { //NOPMD 170 final String individualFile = entry.getName(); 171 final File fsObject = new File(targetDir, individualFile); 172 final Path fsPath = fsObject.toPath(); 173 if (Files.exists(fsPath, LinkOption.NOFOLLOW_LINKS) && !Files.isDirectory(fsPath, LinkOption.NOFOLLOW_LINKS)) { 174 Files.delete(fsPath); 175 LOG.debug(format("Deleting existing entry %s", fsPath)); 176 } 177 178 if (entry.isSymbolicLink() || entry.isLink()) { 179 Path target = FileSystems.getDefault().getPath(entry.getLinkName()); 180 Files.createSymbolicLink(fsPath, target); 181 } else if (entry.isFile()) { 182 byte[] content = new byte[(int) entry.getSize()]; 183 int read = tarIn.read(content, 0, content.length); 184 checkState(read != -1, "could not read %s", individualFile); 185 EmbeddedUtil.mkdirs(fsObject.getParentFile()); 186 187 final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(fsPath, CREATE, WRITE); //NOPMD 188 final ByteBuffer buffer = ByteBuffer.wrap(content); //NOPMD 189 190 phaser.register(); 191 fileChannel.write(buffer, 0, fileChannel, new CompletionHandler<Integer, Channel>() { 192 @Override 193 public void completed(Integer written, Channel channel) { 194 closeChannel(channel); 195 } 196 197 @Override 198 public void failed(Throwable error, Channel channel) { 199 LOG.error(format("could not write file %s", fsObject.getAbsolutePath()), error); 200 closeChannel(channel); 201 } 202 203 private void closeChannel(Channel channel) { 204 try { 205 channel.close(); 206 } catch (IOException e) { 207 LOG.error("While closing channel:", e); 208 } finally { 209 phaser.arriveAndDeregister(); 210 } 211 } 212 }); 213 } else if (entry.isDirectory()) { 214 EmbeddedUtil.mkdirs(fsObject); 215 } else { 216 throw new IOException(format("Unsupported entry in tar file found: %s", individualFile)); 217 } 218 219 if (individualFile.startsWith("bin/") || individualFile.startsWith("./bin/")) { 220 if (!fsObject.setExecutable(true, false)) { 221 throw new IOException(format("Could not make %s executable!", individualFile)); 222 } 223 } 224 } 225 226 phaser.arriveAndAwaitAdvance(); 227 } 228 } 229}