001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package io.archivesunleashed.mapreduce; 018 019import io.archivesunleashed.io.ArcRecordWritable; 020import java.io.BufferedInputStream; 021import java.io.IOException; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FSDataInputStream; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.Seekable; 028import org.apache.hadoop.io.LongWritable; 029import org.apache.hadoop.mapreduce.InputSplit; 030import org.apache.hadoop.mapreduce.JobContext; 031import org.apache.hadoop.mapreduce.RecordReader; 032import org.apache.hadoop.mapreduce.TaskAttemptContext; 033import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 034import org.apache.hadoop.mapreduce.lib.input.FileSplit; 035import org.archive.io.ArchiveRecord; 036import org.archive.io.arc.ARCReader; 037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 038import org.archive.io.arc.ARCReaderFactory; 039import org.archive.io.arc.ARCRecord; 040 041/** 042 * Extends FileInputFormat for Web Archive Commons ARC InputFormat. 043 */ 044public class WacArcInputFormat extends FileInputFormat<LongWritable, 045 ArcRecordWritable> { 046 @Override 047 public final RecordReader<LongWritable, ArcRecordWritable> 048 createRecordReader( 049 final InputSplit split, 050 final TaskAttemptContext context) throws IOException, 051 InterruptedException { 052 return new ArcRecordReader(); 053 } 054 055 @Override 056 protected final boolean isSplitable(final JobContext context, 057 final Path filename) { 058 return false; 059 } 060 061 /** 062 * Extends RecordReader for ARC Record Reader. 063 */ 064 public class ArcRecordReader extends RecordReader<LongWritable, 065 ArcRecordWritable> { 066 067 /** 068 * ARC reader. 069 */ 070 private ARCReader reader; 071 072 /** 073 * Start position of ARC being read. 074 */ 075 private long start; 076 077 /** 078 * A given position of a ARC being read. 079 */ 080 private long pos; 081 082 /** 083 * End position of a ARC being read. 084 */ 085 private long end; 086 087 /** 088 * LongWritable key. 089 */ 090 private LongWritable key = null; 091 092 /** 093 * ArcRecordWritable value. 094 */ 095 private ArcRecordWritable value = null; 096 097 /** 098 * Seekable file position. 099 */ 100 private Seekable filePosition; 101 102 /** 103 * Iterator for ArchiveRecord. 104 */ 105 private Iterator<ArchiveRecord> iter; 106 107 @Override 108 public final void initialize(final InputSplit genericSplit, 109 final TaskAttemptContext context) 110 throws IOException { 111 FileSplit split = (FileSplit) genericSplit; 112 Configuration job = context.getConfiguration(); 113 start = split.getStart(); 114 end = start + split.getLength(); 115 final Path file = split.getPath(); 116 117 FileSystem fs = file.getFileSystem(job); 118 FSDataInputStream fileIn = fs.open(split.getPath()); 119 120 reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), 121 new BufferedInputStream(fileIn), true); 122 123 iter = reader.iterator(); 124 //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), 125 //fileIn, true); 126 127 this.pos = start; 128 } 129 130 /** 131 * Determins if ARC is compressed. 132 * 133 * @return reader true/false 134 */ 135 private boolean isCompressedInput() { 136 return reader instanceof CompressedARCReader; 137 } 138 139 /** 140 * Get file postion of ARC. 141 * 142 * @return retVal position of ARC 143 * @throws IOException is there is an issue 144 */ 145 private long getFilePosition() throws IOException { 146 long retVal; 147 if (isCompressedInput() && null != filePosition) { 148 retVal = filePosition.getPos(); 149 } else { 150 retVal = pos; 151 } 152 return retVal; 153 } 154 155 @Override 156 public final boolean nextKeyValue() throws IOException { 157 if (!iter.hasNext()) { 158 return false; 159 } 160 161 if (key == null) { 162 key = new LongWritable(); 163 } 164 key.set(pos); 165 166 ARCRecord record = (ARCRecord) iter.next(); 167 if (record == null) { 168 return false; 169 } 170 171 if (value == null) { 172 value = new ArcRecordWritable(); 173 } 174 value.setRecord(record); 175 176 return true; 177 } 178 179 @Override 180 public final LongWritable getCurrentKey() { 181 return key; 182 } 183 184 @Override 185 public final ArcRecordWritable getCurrentValue() { 186 return value; 187 } 188 189 @Override 190 public final float getProgress() throws IOException { 191 if (start == end) { 192 return 0.0f; 193 } else { 194 return Math.min(1.0f, (getFilePosition() - start) / (float) 195 (end - start)); 196 } 197 } 198 199 @Override 200 public final synchronized void close() throws IOException { 201 reader.close(); 202 } 203 } 204}