001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package io.archivesunleashed.mapreduce; 018 019import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat; 020import io.archivesunleashed.io.GenericArchiveRecordWritable; 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.util.Iterator; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.fs.Seekable; 029import org.apache.hadoop.io.LongWritable; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.hadoop.mapreduce.JobContext; 032import org.apache.hadoop.mapreduce.RecordReader; 033import org.apache.hadoop.mapreduce.TaskAttemptContext; 034import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 035import org.apache.hadoop.mapreduce.lib.input.FileSplit; 036import org.archive.io.ArchiveReader; 037import org.archive.io.ArchiveReaderFactory; 038import org.archive.io.ArchiveRecord; 039import org.archive.io.arc.ARCReader; 040import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 041import org.archive.io.warc.WARCReader; 042import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 043 044/** 045 * Extends FileInputFormat for Web Archive Commons Generic InputFormat. 046 */ 047public class WacGenericInputFormat extends FileInputFormat<LongWritable, 048 GenericArchiveRecordWritable> { 049 @Override 050 public final RecordReader<LongWritable, 051 GenericArchiveRecordWritable> createRecordReader(final InputSplit split, 052 final TaskAttemptContext context) throws IOException, 053 InterruptedException { 054 return new GenericArchiveRecordReader(); 055 } 056 057 @Override 058 protected final boolean isSplitable(final JobContext context, 059 final Path filename) { 060 return false; 061 } 062 063 /** 064 * Extends RecordReader for Generic Record Reader. 065 */ 066 public class GenericArchiveRecordReader extends RecordReader<LongWritable, 067 GenericArchiveRecordWritable> { 068 069 /** 070 * Generic archive reader. 071 */ 072 private ArchiveReader reader; 073 074 /** 075 * Generic archive format. 076 */ 077 private ArchiveFormat format; 078 079 /** 080 * Start position of generic archive being read. 081 */ 082 private long start; 083 084 /** 085 * A given position of a generic archive being read. 086 */ 087 private long pos; 088 089 /** 090 * End position of a generic archive being read. 091 */ 092 private long end; 093 094 /** 095 * LongWritable key. 096 */ 097 private LongWritable key = null; 098 099 /** 100 * GenericArchiveRecordWritable value. 101 */ 102 private GenericArchiveRecordWritable value = null; 103 104 /** 105 * Seekable file position. 106 */ 107 private Seekable filePosition; 108 109 /** 110 * Iterator for generic archive record. 111 */ 112 private Iterator<ArchiveRecord> iter; 113 114 @Override 115 public final void initialize(final InputSplit genericSplit, 116 final TaskAttemptContext context) 117 throws IOException { 118 FileSplit split = (FileSplit) genericSplit; 119 Configuration job = context.getConfiguration(); 120 start = split.getStart(); 121 end = start + split.getLength(); 122 final Path file = split.getPath(); 123 124 FileSystem fs = file.getFileSystem(job); 125 FSDataInputStream fileIn = fs.open(split.getPath()); 126 127 reader = ArchiveReaderFactory.get(split.getPath().toString(), 128 new BufferedInputStream(fileIn), true); 129 130 if (reader instanceof ARCReader) { 131 format = ArchiveFormat.ARC; 132 iter = reader.iterator(); 133 } 134 135 if (reader instanceof WARCReader) { 136 format = ArchiveFormat.WARC; 137 iter = reader.iterator(); 138 } 139 140 this.pos = start; 141 } 142 143 /** 144 * Determins if generic archive is compressed. 145 * 146 * @return instanceof if ARC/WARC 147 */ 148 private boolean isCompressedInput() { 149 if (format == ArchiveFormat.ARC) { 150 return reader instanceof CompressedARCReader; 151 } else { 152 return reader instanceof CompressedWARCReader; 153 } 154 } 155 156 /** 157 * Get file position of generic archive. 158 * 159 * @return retVal position of generic archive 160 * @throws IOException if there is an issue 161 */ 162 private long getFilePosition() throws IOException { 163 long retVal; 164 if (isCompressedInput() && null != filePosition) { 165 retVal = filePosition.getPos(); 166 } else { 167 retVal = pos; 168 } 169 return retVal; 170 } 171 172 @Override 173 public final boolean nextKeyValue() throws IOException { 174 if (!iter.hasNext()) { 175 return false; 176 } 177 178 if (key == null) { 179 key = new LongWritable(); 180 } 181 key.set(pos); 182 183 ArchiveRecord record = null; 184 try { 185 record = iter.next(); 186 } catch (Exception e) { 187 return false; 188 } 189 190 if (record == null) { 191 return false; 192 } 193 194 if (value == null) { 195 value = new GenericArchiveRecordWritable(); 196 } 197 value.setRecord(record); 198 199 return true; 200 } 201 202 @Override 203 public final LongWritable getCurrentKey() { 204 return key; 205 } 206 207 @Override 208 public final GenericArchiveRecordWritable getCurrentValue() { 209 return value; 210 } 211 212 @Override 213 public final float getProgress() throws IOException { 214 if (start == end) { 215 return 0.0f; 216 } else { 217 return Math.min(1.0f, (getFilePosition() - start) / (float) 218 (end - start)); 219 } 220 } 221 222 @Override 223 public final synchronized void close() throws IOException { 224 reader.close(); 225 } 226 } 227}