001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 /** 019 * @deprecated as of 0.12.0 and will be replaced 020 * with WacInputFormat in a future release. 021 */ 022 023package io.archivesunleashed.mapreduce; 024 025import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat; 026import io.archivesunleashed.io.GenericArchiveRecordWritable; 027import java.io.BufferedInputStream; 028import java.io.IOException; 029import java.util.Iterator; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.Seekable; 035import org.apache.hadoop.io.LongWritable; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.RecordReader; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 041import org.apache.hadoop.mapreduce.lib.input.FileSplit; 042import org.archive.io.ArchiveReader; 043import org.archive.io.ArchiveReaderFactory; 044import org.archive.io.ArchiveRecord; 045import org.archive.io.arc.ARCReader; 046import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 047import org.archive.io.warc.WARCReader; 048import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 049 050/** 051 * Extends FileInputFormat for Web Archive Commons Generic InputFormat. 052 */ 053public class WacGenericInputFormat extends FileInputFormat<LongWritable, 054 GenericArchiveRecordWritable> { 055 @Override 056 public final RecordReader<LongWritable, 057 GenericArchiveRecordWritable> createRecordReader(final InputSplit split, 058 final TaskAttemptContext context) throws IOException, 059 InterruptedException { 060 return new GenericArchiveRecordReader(); 061 } 062 063 @Override 064 protected final boolean isSplitable(final JobContext context, 065 final Path filename) { 066 return false; 067 } 068 069 /** 070 * Extends RecordReader for Generic Record Reader. 071 */ 072 public class GenericArchiveRecordReader extends RecordReader<LongWritable, 073 GenericArchiveRecordWritable> { 074 075 /** 076 * Generic archive reader. 077 */ 078 private ArchiveReader reader; 079 080 /** 081 * Generic archive format. 082 */ 083 private ArchiveFormat format; 084 085 /** 086 * Start position of generic archive being read. 087 */ 088 private long start; 089 090 /** 091 * A given position of a generic archive being read. 092 */ 093 private long pos; 094 095 /** 096 * End position of a generic archive being read. 097 */ 098 private long end; 099 100 /** 101 * LongWritable key. 102 */ 103 private LongWritable key = null; 104 105 /** 106 * GenericArchiveRecordWritable value. 107 */ 108 private GenericArchiveRecordWritable value = null; 109 110 /** 111 * Seekable file position. 112 */ 113 private Seekable filePosition; 114 115 /** 116 * Iterator for generic archive record. 117 */ 118 private Iterator<ArchiveRecord> iter; 119 120 @Override 121 public final void initialize(final InputSplit genericSplit, 122 final TaskAttemptContext context) 123 throws IOException { 124 FileSplit split = (FileSplit) genericSplit; 125 Configuration job = context.getConfiguration(); 126 start = split.getStart(); 127 end = start + split.getLength(); 128 final Path file = split.getPath(); 129 130 FileSystem fs = file.getFileSystem(job); 131 FSDataInputStream fileIn = fs.open(split.getPath()); 132 133 reader = ArchiveReaderFactory.get(split.getPath().toString(), 134 new BufferedInputStream(fileIn), true); 135 136 if (reader instanceof ARCReader) { 137 format = ArchiveFormat.ARC; 138 iter = reader.iterator(); 139 } 140 141 if (reader instanceof WARCReader) { 142 format = ArchiveFormat.WARC; 143 iter = reader.iterator(); 144 } 145 146 this.pos = start; 147 } 148 149 /** 150 * Determins if generic archive is compressed. 151 * 152 * @return instanceof if ARC/WARC 153 */ 154 private boolean isCompressedInput() { 155 if (format == ArchiveFormat.ARC) { 156 return reader instanceof CompressedARCReader; 157 } else { 158 return reader instanceof CompressedWARCReader; 159 } 160 } 161 162 /** 163 * Get file position of generic archive. 164 * 165 * @return retVal position of generic archive 166 * @throws IOException if there is an issue 167 */ 168 private long getFilePosition() throws IOException { 169 long retVal; 170 if (isCompressedInput() && null != filePosition) { 171 retVal = filePosition.getPos(); 172 } else { 173 retVal = pos; 174 } 175 return retVal; 176 } 177 178 @Override 179 public final boolean nextKeyValue() throws IOException { 180 if (!iter.hasNext()) { 181 return false; 182 } 183 184 if (key == null) { 185 key = new LongWritable(); 186 } 187 key.set(pos); 188 189 ArchiveRecord record = null; 190 try { 191 record = iter.next(); 192 } catch (Exception e) { 193 return false; 194 } 195 196 if (record == null) { 197 return false; 198 } 199 200 if (value == null) { 201 value = new GenericArchiveRecordWritable(); 202 } 203 value.setRecord(record); 204 205 return true; 206 } 207 208 @Override 209 public final LongWritable getCurrentKey() { 210 return key; 211 } 212 213 @Override 214 public final GenericArchiveRecordWritable getCurrentValue() { 215 return value; 216 } 217 218 @Override 219 public final float getProgress() throws IOException { 220 if (start == end) { 221 return 0.0f; 222 } else { 223 return Math.min(1.0f, (getFilePosition() - start) / (float) 224 (end - start)); 225 } 226 } 227 228 @Override 229 public final synchronized void close() throws IOException { 230 reader.close(); 231 } 232 } 233}