001/* 002 * Copyright © 2017 The Archives Unleashed Project 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package io.archivesunleashed.data; 018 019import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat; 020import org.apache.log4j.Logger; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.fs.Seekable; 026import org.apache.hadoop.io.LongWritable; 027import org.apache.hadoop.mapreduce.InputSplit; 028import org.apache.hadoop.mapreduce.JobContext; 029import org.apache.hadoop.mapreduce.RecordReader; 030import org.apache.hadoop.mapreduce.TaskAttemptContext; 031import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 032import org.apache.hadoop.mapreduce.lib.input.FileSplit; 033import org.archive.io.ArchiveReader; 034import org.archive.io.ArchiveReaderFactory; 035import org.archive.io.ArchiveRecord; 036import org.archive.io.arc.ARCReader; 037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 038import org.archive.io.warc.WARCReader; 039import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 040 041import java.io.BufferedInputStream; 042import java.io.IOException; 043import java.util.Iterator; 044 045/** 046 * Extends FileInputFormat for Web Archive Commons InputFormat. 047 */ 048public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable, 049 ArchiveRecordWritable> { 050 /** 051 * Setup logger. 052 */ 053 private static final Logger LOG = 054 Logger.getLogger(ArchiveRecordInputFormat.class); 055 056 @Override 057 public final RecordReader<LongWritable, 058 ArchiveRecordWritable> createRecordReader(final InputSplit split, 059 final TaskAttemptContext context) throws IOException, 060 InterruptedException { 061 return new ArchiveRecordReader(); 062 } 063 064 @Override 065 protected final boolean isSplitable(final JobContext context, 066 final Path filename) { 067 return false; 068 } 069 070 /** 071 * Extends RecordReader for Record Reader. 072 */ 073 public class ArchiveRecordReader extends RecordReader<LongWritable, 074 ArchiveRecordWritable> { 075 076 /** 077 * Archive reader. 078 */ 079 private ArchiveReader reader; 080 081 /** 082 * Archive format. 083 */ 084 private ArchiveFormat format; 085 086 /** 087 * Start position of archive being read. 088 */ 089 private long start; 090 091 /** 092 * A given position of an archive being read. 093 */ 094 private long pos; 095 096 /** 097 * End position of an archive being read. 098 */ 099 private long end; 100 101 /** 102 * LongWritable key. 103 */ 104 private LongWritable key = null; 105 106 /** 107 * ArchiveRecordWritable value. 108 */ 109 private ArchiveRecordWritable value = null; 110 111 /** 112 * Archive file name. 113 */ 114 private String fileName; 115 116 /** 117 * Seekable file position. 118 */ 119 private Seekable filePosition; 120 121 /** 122 * Iterator for an archive record. 123 */ 124 private Iterator<ArchiveRecord> iter; 125 126 @Override 127 public final void initialize(final InputSplit archiveRecordSplit, 128 final TaskAttemptContext context) 129 throws IOException { 130 FileSplit split = (FileSplit) archiveRecordSplit; 131 Configuration job = context.getConfiguration(); 132 start = split.getStart(); 133 end = start + split.getLength(); 134 final Path file = split.getPath(); 135 136 FileSystem fs = file.getFileSystem(job); 137 FSDataInputStream fileIn = fs.open(split.getPath()); 138 fileName = split.getPath().toString(); 139 140 reader = ArchiveReaderFactory.get(fileName, 141 new BufferedInputStream(fileIn), true); 142 143 if (reader instanceof ARCReader) { 144 format = ArchiveFormat.ARC; 145 iter = reader.iterator(); 146 } 147 148 if (reader instanceof WARCReader) { 149 format = ArchiveFormat.WARC; 150 iter = reader.iterator(); 151 } 152 153 this.pos = start; 154 } 155 156 /** 157 * Determines if archive is compressed. 158 * 159 * @return instanceof if ARC/WARC 160 */ 161 private boolean isCompressedInput() { 162 if (format == ArchiveFormat.ARC) { 163 return reader instanceof CompressedARCReader; 164 } else { 165 return reader instanceof CompressedWARCReader; 166 } 167 } 168 169 /** 170 * Get file position of archive. 171 * 172 * @return retVal position of archive 173 * @throws IOException if there is an issue 174 */ 175 private long getFilePosition() throws IOException { 176 long retVal; 177 if (isCompressedInput() && null != filePosition) { 178 retVal = filePosition.getPos(); 179 } else { 180 retVal = pos; 181 } 182 return retVal; 183 } 184 185 @Override 186 public final boolean nextKeyValue() throws IOException { 187 if (!iter.hasNext()) { 188 return false; 189 } 190 191 if (key == null) { 192 key = new LongWritable(); 193 } 194 key.set(pos); 195 196 ArchiveRecord record = null; 197 try { 198 record = iter.next(); 199 } catch (Exception e) { 200 return false; 201 } 202 203 if (record == null) { 204 return false; 205 } 206 207 if (value == null) { 208 value = new ArchiveRecordWritable(); 209 } 210 value.setRecord(record); 211 212 return true; 213 } 214 215 @Override 216 public final LongWritable getCurrentKey() { 217 return key; 218 } 219 220 @Override 221 public final ArchiveRecordWritable getCurrentValue() { 222 return value; 223 } 224 225 @Override 226 public final float getProgress() throws IOException { 227 if (start == end) { 228 return 0.0f; 229 } else { 230 return Math.min(1.0f, (getFilePosition() - start) / (float) 231 (end - start)); 232 } 233 } 234 235 @Override 236 public final synchronized void close() throws IOException { 237 reader.close(); 238 LOG.info("Closed archive file " + fileName); 239 } 240 } 241}