001/* 002 * Copyright © 2017 The Archives Unleashed Project 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package io.archivesunleashed.data; 018 019import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat; 020import java.io.BufferedInputStream; 021import java.io.IOException; 022import java.util.Iterator; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FSDataInputStream; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.Seekable; 028import org.apache.hadoop.io.LongWritable; 029import org.apache.hadoop.mapreduce.InputSplit; 030import org.apache.hadoop.mapreduce.JobContext; 031import org.apache.hadoop.mapreduce.RecordReader; 032import org.apache.hadoop.mapreduce.TaskAttemptContext; 033import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 034import org.apache.hadoop.mapreduce.lib.input.FileSplit; 035import org.apache.log4j.Logger; 036import org.archive.io.ArchiveReader; 037import org.archive.io.ArchiveReaderFactory; 038import org.archive.io.ArchiveRecord; 039import org.archive.io.arc.ARCReader; 040import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; 041import org.archive.io.warc.WARCReader; 042import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; 043 044/** Extends FileInputFormat for Web Archive Commons InputFormat. */ 045public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable, ArchiveRecordWritable> { 046 /** Setup logger. */ 047 private static final Logger LOG = Logger.getLogger(ArchiveRecordInputFormat.class); 048 049 @Override 050 public final RecordReader<LongWritable, ArchiveRecordWritable> createRecordReader( 051 final InputSplit split, final TaskAttemptContext context) 052 throws IOException, InterruptedException { 053 return new ArchiveRecordReader(); 054 } 055 056 @Override 057 protected final boolean isSplitable(final JobContext context, final Path filename) { 058 return false; 059 } 060 061 /** Extends RecordReader for Record Reader. */ 062 public class ArchiveRecordReader extends RecordReader<LongWritable, ArchiveRecordWritable> { 063 064 /** Archive reader. */ 065 private ArchiveReader reader; 066 067 /** Archive format. */ 068 private ArchiveFormat format; 069 070 /** Start position of archive being read. */ 071 private long start; 072 073 /** A given position of an archive being read. */ 074 private long pos; 075 076 /** End position of an archive being read. */ 077 private long end; 078 079 /** LongWritable key. */ 080 private LongWritable key = null; 081 082 /** ArchiveRecordWritable value. */ 083 private ArchiveRecordWritable value = null; 084 085 /** Archive file name. */ 086 private String fileName; 087 088 /** Seekable file position. */ 089 private Seekable filePosition; 090 091 /** Iterator for an archive record. */ 092 private Iterator<ArchiveRecord> iter; 093 094 @Override 095 public final void initialize( 096 final InputSplit archiveRecordSplit, final TaskAttemptContext context) throws IOException { 097 FileSplit split = (FileSplit) archiveRecordSplit; 098 Configuration job = context.getConfiguration(); 099 start = split.getStart(); 100 end = start + split.getLength(); 101 final Path file = split.getPath(); 102 103 FileSystem fs = file.getFileSystem(job); 104 FSDataInputStream fileIn = fs.open(split.getPath()); 105 fileName = split.getPath().toString(); 106 107 reader = ArchiveReaderFactory.get(fileName, new BufferedInputStream(fileIn), true); 108 109 if (reader instanceof ARCReader) { 110 format = ArchiveFormat.ARC; 111 iter = reader.iterator(); 112 } 113 114 if (reader instanceof WARCReader) { 115 format = ArchiveFormat.WARC; 116 iter = reader.iterator(); 117 } 118 119 this.pos = start; 120 } 121 122 /** 123 * Determines if archive is compressed. 124 * 125 * @return instanceof if ARC/WARC 126 */ 127 private boolean isCompressedInput() { 128 if (format == ArchiveFormat.ARC) { 129 return reader instanceof CompressedARCReader; 130 } else { 131 return reader instanceof CompressedWARCReader; 132 } 133 } 134 135 /** 136 * Get file position of archive. 137 * 138 * @return retVal position of archive 139 * @throws IOException if there is an issue 140 */ 141 private long getFilePosition() throws IOException { 142 long retVal; 143 if (isCompressedInput() && null != filePosition) { 144 retVal = filePosition.getPos(); 145 } else { 146 retVal = pos; 147 } 148 return retVal; 149 } 150 151 @Override 152 public final boolean nextKeyValue() throws IOException { 153 if (!iter.hasNext()) { 154 return false; 155 } 156 157 if (key == null) { 158 key = new LongWritable(); 159 } 160 key.set(pos); 161 162 ArchiveRecord record = null; 163 try { 164 record = iter.next(); 165 } catch (Exception e) { 166 return false; 167 } 168 169 if (record == null) { 170 return false; 171 } 172 173 if (value == null) { 174 value = new ArchiveRecordWritable(); 175 } 176 value.setRecord(record); 177 178 return true; 179 } 180 181 @Override 182 public final LongWritable getCurrentKey() { 183 return key; 184 } 185 186 @Override 187 public final ArchiveRecordWritable getCurrentValue() { 188 return value; 189 } 190 191 @Override 192 public final float getProgress() throws IOException { 193 if (start == end) { 194 return 0.0f; 195 } else { 196 return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); 197 } 198 } 199 200 @Override 201 public final synchronized void close() throws IOException { 202 reader.close(); 203 LOG.info("Closed archive file " + fileName); 204 } 205 } 206}