001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package io.archivesunleashed.data; 019 020import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.fs.Seekable; 026import org.apache.hadoop.io.LongWritable; 027import org.apache.hadoop.mapreduce.InputSplit; 028import org.apache.hadoop.mapreduce.JobContext; 029import org.apache.hadoop.mapreduce.RecordReader; 030import org.apache.hadoop.mapreduce.TaskAttemptContext; 031import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 032import org.apache.hadoop.mapreduce.lib.input.FileSplit; 033import org.archive.io.ArchiveReader; 034import org.archive.io.ArchiveReaderFactory; 035import org.archive.io.ArchiveRecord; 036import org.archive.io.arc.ARCReader; 037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 038import org.archive.io.warc.WARCReader; 039import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 040 041import java.io.BufferedInputStream; 042import java.io.IOException; 043import java.util.Iterator; 044 045/** 046 * Extends FileInputFormat for Web Archive Commons InputFormat. 047 */ 048public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable, 049 ArchiveRecordWritable> { 050 @Override 051 public final RecordReader<LongWritable, 052 ArchiveRecordWritable> createRecordReader(final InputSplit split, 053 final TaskAttemptContext context) throws IOException, 054 InterruptedException { 055 return new ArchiveRecordReader(); 056 } 057 058 @Override 059 protected final boolean isSplitable(final JobContext context, 060 final Path filename) { 061 return false; 062 } 063 064 /** 065 * Extends RecordReader for Record Reader. 066 */ 067 public class ArchiveRecordReader extends RecordReader<LongWritable, 068 ArchiveRecordWritable> { 069 070 /** 071 * Archive reader. 072 */ 073 private ArchiveReader reader; 074 075 /** 076 * Archive format. 077 */ 078 private ArchiveFormat format; 079 080 /** 081 * Start position of archive being read. 082 */ 083 private long start; 084 085 /** 086 * A given position of an archive being read. 087 */ 088 private long pos; 089 090 /** 091 * End position of an archive being read. 092 */ 093 private long end; 094 095 /** 096 * LongWritable key. 097 */ 098 private LongWritable key = null; 099 100 /** 101 * ArchiveRecordWritable value. 102 */ 103 private ArchiveRecordWritable value = null; 104 105 /** 106 * Seekable file position. 107 */ 108 private Seekable filePosition; 109 110 /** 111 * Iterator for an archive record. 112 */ 113 private Iterator<ArchiveRecord> iter; 114 115 @Override 116 public final void initialize(final InputSplit archiveRecordSplit, 117 final TaskAttemptContext context) 118 throws IOException { 119 FileSplit split = (FileSplit) archiveRecordSplit; 120 Configuration job = context.getConfiguration(); 121 start = split.getStart(); 122 end = start + split.getLength(); 123 final Path file = split.getPath(); 124 125 FileSystem fs = file.getFileSystem(job); 126 FSDataInputStream fileIn = fs.open(split.getPath()); 127 128 reader = ArchiveReaderFactory.get(split.getPath().toString(), 129 new BufferedInputStream(fileIn), true); 130 131 if (reader instanceof ARCReader) { 132 format = ArchiveFormat.ARC; 133 iter = reader.iterator(); 134 } 135 136 if (reader instanceof WARCReader) { 137 format = ArchiveFormat.WARC; 138 iter = reader.iterator(); 139 } 140 141 this.pos = start; 142 } 143 144 /** 145 * Determines if archive is compressed. 146 * 147 * @return instanceof if ARC/WARC 148 */ 149 private boolean isCompressedInput() { 150 if (format == ArchiveFormat.ARC) { 151 return reader instanceof CompressedARCReader; 152 } else { 153 return reader instanceof CompressedWARCReader; 154 } 155 } 156 157 /** 158 * Get file position of archive. 159 * 160 * @return retVal position of archive 161 * @throws IOException if there is an issue 162 */ 163 private long getFilePosition() throws IOException { 164 long retVal; 165 if (isCompressedInput() && null != filePosition) { 166 retVal = filePosition.getPos(); 167 } else { 168 retVal = pos; 169 } 170 return retVal; 171 } 172 173 @Override 174 public final boolean nextKeyValue() throws IOException { 175 if (!iter.hasNext()) { 176 return false; 177 } 178 179 if (key == null) { 180 key = new LongWritable(); 181 } 182 key.set(pos); 183 184 ArchiveRecord record = null; 185 try { 186 record = iter.next(); 187 } catch (Exception e) { 188 return false; 189 } 190 191 if (record == null) { 192 return false; 193 } 194 195 if (value == null) { 196 value = new ArchiveRecordWritable(); 197 } 198 value.setRecord(record); 199 200 return true; 201 } 202 203 @Override 204 public final LongWritable getCurrentKey() { 205 return key; 206 } 207 208 @Override 209 public final ArchiveRecordWritable getCurrentValue() { 210 return value; 211 } 212 213 @Override 214 public final float getProgress() throws IOException { 215 if (start == end) { 216 return 0.0f; 217 } else { 218 return Math.min(1.0f, (getFilePosition() - start) / (float) 219 (end - start)); 220 } 221 } 222 223 @Override 224 public final synchronized void close() throws IOException { 225 reader.close(); 226 } 227 } 228}