001/* 002 * Archives Unleashed Toolkit (AUT): 003 * An open-source platform for analyzing web archives. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 /** 019 * @deprecated as of 0.12.0 and will be removed 020 * in a future release. Use WacGenericArchiveInputFormat instead. 021 */ 022 023 024package io.archivesunleashed.mapreduce; 025 026import io.archivesunleashed.io.ArcRecordWritable; 027import java.io.BufferedInputStream; 028import java.io.IOException; 029import java.util.Iterator; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.Seekable; 035import org.apache.hadoop.io.LongWritable; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.RecordReader; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 041import org.apache.hadoop.mapreduce.lib.input.FileSplit; 042import org.archive.io.ArchiveRecord; 043import org.archive.io.arc.ARCReader; 044import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 045import org.archive.io.arc.ARCReaderFactory; 046import org.archive.io.arc.ARCRecord; 047 048/** 049 * Extends FileInputFormat for Web Archive Commons ARC InputFormat. 050 */ 051@Deprecated 052public class WacArcInputFormat extends FileInputFormat<LongWritable, 053 ArcRecordWritable> { 054 @Override 055 public final RecordReader<LongWritable, ArcRecordWritable> 056 createRecordReader( 057 final InputSplit split, 058 final TaskAttemptContext context) throws IOException, 059 InterruptedException { 060 return new ArcRecordReader(); 061 } 062 063 @Override 064 protected final boolean isSplitable(final JobContext context, 065 final Path filename) { 066 return false; 067 } 068 069 /** 070 * Extends RecordReader for ARC Record Reader. 071 */ 072 @Deprecated 073 public class ArcRecordReader extends RecordReader<LongWritable, 074 ArcRecordWritable> { 075 076 /** 077 * ARC reader. 078 */ 079 private ARCReader reader; 080 081 /** 082 * Start position of ARC being read. 083 */ 084 private long start; 085 086 /** 087 * A given position of a ARC being read. 088 */ 089 private long pos; 090 091 /** 092 * End position of a ARC being read. 093 */ 094 private long end; 095 096 /** 097 * LongWritable key. 098 */ 099 private LongWritable key = null; 100 101 /** 102 * ArcRecordWritable value. 103 */ 104 private ArcRecordWritable value = null; 105 106 /** 107 * Seekable file position. 108 */ 109 private Seekable filePosition; 110 111 /** 112 * Iterator for ArchiveRecord. 113 */ 114 private Iterator<ArchiveRecord> iter; 115 116 @Override 117 public final void initialize(final InputSplit genericSplit, 118 final TaskAttemptContext context) 119 throws IOException { 120 FileSplit split = (FileSplit) genericSplit; 121 Configuration job = context.getConfiguration(); 122 start = split.getStart(); 123 end = start + split.getLength(); 124 final Path file = split.getPath(); 125 126 FileSystem fs = file.getFileSystem(job); 127 FSDataInputStream fileIn = fs.open(split.getPath()); 128 129 reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), 130 new BufferedInputStream(fileIn), true); 131 132 iter = reader.iterator(); 133 //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), 134 //fileIn, true); 135 136 this.pos = start; 137 } 138 139 /** 140 * Determins if ARC is compressed. 141 * 142 * @return reader true/false 143 */ 144 private boolean isCompressedInput() { 145 return reader instanceof CompressedARCReader; 146 } 147 148 /** 149 * Get file postion of ARC. 150 * 151 * @return retVal position of ARC 152 * @throws IOException is there is an issue 153 */ 154 private long getFilePosition() throws IOException { 155 long retVal; 156 if (isCompressedInput() && null != filePosition) { 157 retVal = filePosition.getPos(); 158 } else { 159 retVal = pos; 160 } 161 return retVal; 162 } 163 164 @Override 165 public final boolean nextKeyValue() throws IOException { 166 if (!iter.hasNext()) { 167 return false; 168 } 169 170 if (key == null) { 171 key = new LongWritable(); 172 } 173 key.set(pos); 174 175 ARCRecord record = (ARCRecord) iter.next(); 176 if (record == null) { 177 return false; 178 } 179 180 if (value == null) { 181 value = new ArcRecordWritable(); 182 } 183 value.setRecord(record); 184 185 return true; 186 } 187 188 @Override 189 public final LongWritable getCurrentKey() { 190 return key; 191 } 192 193 @Override 194 public final ArcRecordWritable getCurrentValue() { 195 return value; 196 } 197 198 @Override 199 public final float getProgress() throws IOException { 200 if (start == end) { 201 return 0.0f; 202 } else { 203 return Math.min(1.0f, (getFilePosition() - start) / (float) 204 (end - start)); 205 } 206 } 207 208 @Override 209 public final synchronized void close() throws IOException { 210 reader.close(); 211 } 212 } 213}