001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 /** 019 * @deprecated as of 0.12.0 and will be removed 020 * in a future release. Use WacGenericArchiveInputFormat instead. 021 */ 022 023package io.archivesunleashed.mapreduce; 024 025import io.archivesunleashed.io.WarcRecordWritable; 026import java.io.BufferedInputStream; 027import java.io.IOException; 028import java.util.Iterator; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.Seekable; 034import org.apache.hadoop.io.LongWritable; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.JobContext; 037import org.apache.hadoop.mapreduce.RecordReader; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 040import org.apache.hadoop.mapreduce.lib.input.FileSplit; 041import org.archive.io.ArchiveRecord; 042import org.archive.io.warc.WARCReader; 043import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 044import org.archive.io.warc.WARCReaderFactory; 045import org.archive.io.warc.WARCRecord; 046 047/** 048 * Extends FileInputFormat for Web Archive Commons WARC InputFormat. 049 */ 050@Deprecated 051public class WacWarcInputFormat extends FileInputFormat<LongWritable, 052 WarcRecordWritable> { 053 @Override 054 public final RecordReader<LongWritable, WarcRecordWritable> 055 createRecordReader( 056 final InputSplit split, 057 final TaskAttemptContext context) throws IOException, 058 InterruptedException { 059 return new WarcRecordReader(); 060 } 061 062 @Override 063 protected final boolean isSplitable(final JobContext context, 064 final Path filename) { 065 return false; 066 } 067 068 /** 069 * Extends RecordReader for WARC Record Reader. 070 */ 071 public class WarcRecordReader extends RecordReader<LongWritable, 072 WarcRecordWritable> { 073 074 /** 075 * WARC reader. 076 */ 077 private WARCReader reader; 078 079 /** 080 * Start position of WARC being read. 081 */ 082 private long start; 083 084 /** 085 * A given position of a WARC being read. 086 */ 087 private long pos; 088 089 /** 090 * End position of a WARC being read. 091 */ 092 private long end; 093 094 /** 095 * LongWritable key. 096 */ 097 private LongWritable key = null; 098 099 /** 100 * WarcRecordWritable value. 101 */ 102 private WarcRecordWritable value = null; 103 104 /** 105 * Seekable file position. 106 */ 107 private Seekable filePosition; 108 109 /** 110 * Iterator for ArchiveRecord. 111 */ 112 private Iterator<ArchiveRecord> iter; 113 114 @Override 115 public final void initialize(final InputSplit genericSplit, 116 final TaskAttemptContext context) 117 throws IOException { 118 FileSplit split = (FileSplit) genericSplit; 119 Configuration job = context.getConfiguration(); 120 start = split.getStart(); 121 end = start + split.getLength(); 122 final Path file = split.getPath(); 123 124 FileSystem fs = file.getFileSystem(job); 125 FSDataInputStream fileIn = fs.open(split.getPath()); 126 127 reader = (WARCReader) WARCReaderFactory.get(split.getPath().toString(), 128 new BufferedInputStream(fileIn), true); 129 130 iter = reader.iterator(); 131 //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), 132 //fileIn, true); 133 134 this.pos = start; 135 } 136 137 /** 138 * Determins if WARC is compressed. 139 * 140 * @return reader true/false 141 */ 142 private boolean isCompressedInput() { 143 return reader instanceof CompressedWARCReader; 144 } 145 146 /** 147 * Get file position of WARC. 148 * 149 * @return retVal position of WARC 150 * @throws IOException is there is an issue 151 */ 152 private long getFilePosition() throws IOException { 153 long retVal; 154 if (isCompressedInput() && null != filePosition) { 155 retVal = filePosition.getPos(); 156 } else { 157 retVal = pos; 158 } 159 return retVal; 160 } 161 162 @Override 163 public final boolean nextKeyValue() throws IOException { 164 if (!iter.hasNext()) { 165 return false; 166 } 167 168 if (key == null) { 169 key = new LongWritable(); 170 } 171 key.set(pos); 172 173 WARCRecord record = (WARCRecord) iter.next(); 174 if (record == null) { 175 return false; 176 } 177 178 if (value == null) { 179 value = new WarcRecordWritable(); 180 } 181 value.setRecord(record); 182 183 return true; 184 } 185 186 @Override 187 public final LongWritable getCurrentKey() { 188 return key; 189 } 190 191 @Override 192 public final WarcRecordWritable getCurrentValue() { 193 return value; 194 } 195 196 @Override 197 public final float getProgress() throws IOException { 198 if (start == end) { 199 return 0.0f; 200 } else { 201 return Math.min(1.0f, (getFilePosition() - start) / (float) 202 (end - start)); 203 } 204 } 205 206 @Override 207 public final synchronized void close() throws IOException { 208 reader.close(); 209 } 210 } 211}