001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package io.archivesunleashed.data;
019
020import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.fs.Seekable;
026import org.apache.hadoop.io.LongWritable;
027import org.apache.hadoop.mapreduce.InputSplit;
028import org.apache.hadoop.mapreduce.JobContext;
029import org.apache.hadoop.mapreduce.RecordReader;
030import org.apache.hadoop.mapreduce.TaskAttemptContext;
031import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
032import org.apache.hadoop.mapreduce.lib.input.FileSplit;
033import org.archive.io.ArchiveReader;
034import org.archive.io.ArchiveReaderFactory;
035import org.archive.io.ArchiveRecord;
036import org.archive.io.arc.ARCReader;
037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
038import org.archive.io.warc.WARCReader;
039import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
040
041import java.io.BufferedInputStream;
042import java.io.IOException;
043import java.util.Iterator;
044
045/**
046 * Extends FileInputFormat for Web Archive Commons InputFormat.
047 */
048public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable,
049  ArchiveRecordWritable> {
050  @Override
051  public final RecordReader<LongWritable,
052    ArchiveRecordWritable> createRecordReader(final InputSplit split,
053      final TaskAttemptContext context) throws IOException,
054  InterruptedException {
055    return new ArchiveRecordReader();
056  }
057
058  @Override
059  protected final boolean isSplitable(final JobContext context,
060          final Path filename) {
061    return false;
062  }
063
064  /**
065   * Extends RecordReader for Record Reader.
066   */
067  public class ArchiveRecordReader extends RecordReader<LongWritable,
068    ArchiveRecordWritable> {
069
070    /**
071     * Archive reader.
072     */
073    private ArchiveReader reader;
074
075    /**
076     * Archive format.
077     */
078    private ArchiveFormat format;
079
080    /**
081     * Start position of archive being read.
082     */
083    private long start;
084
085    /**
086     * A given position of an archive being read.
087     */
088    private long pos;
089
090    /**
091     * End position of an archive being read.
092     */
093    private long end;
094
095    /**
096     * LongWritable key.
097     */
098    private LongWritable key = null;
099
100    /**
101     * ArchiveRecordWritable value.
102     */
103    private ArchiveRecordWritable value = null;
104
105    /**
106     * Seekable file position.
107     */
108    private Seekable filePosition;
109
110    /**
111     * Iterator for an archive record.
112     */
113    private Iterator<ArchiveRecord> iter;
114
115    @Override
116    public final void initialize(final InputSplit archiveRecordSplit,
117            final TaskAttemptContext context)
118    throws IOException {
119      FileSplit split = (FileSplit) archiveRecordSplit;
120      Configuration job = context.getConfiguration();
121      start = split.getStart();
122      end = start + split.getLength();
123      final Path file = split.getPath();
124
125      FileSystem fs = file.getFileSystem(job);
126      FSDataInputStream fileIn = fs.open(split.getPath());
127
128      reader = ArchiveReaderFactory.get(split.getPath().toString(),
129          new BufferedInputStream(fileIn), true);
130
131      if (reader instanceof ARCReader) {
132        format = ArchiveFormat.ARC;
133        iter = reader.iterator();
134      }
135
136      if (reader instanceof WARCReader) {
137        format = ArchiveFormat.WARC;
138        iter = reader.iterator();
139      }
140
141      this.pos = start;
142    }
143
144    /**
145     * Determines if archive is compressed.
146     *
147     * @return instanceof if ARC/WARC
148     */
149    private boolean isCompressedInput() {
150      if (format == ArchiveFormat.ARC) {
151        return reader instanceof CompressedARCReader;
152      } else {
153        return reader instanceof CompressedWARCReader;
154      }
155    }
156
157    /**
158     * Get file position of archive.
159     *
160     * @return retVal position of archive
161     * @throws IOException if there is an issue
162     */
163    private long getFilePosition() throws IOException {
164      long retVal;
165      if (isCompressedInput() && null != filePosition) {
166        retVal = filePosition.getPos();
167      } else {
168        retVal = pos;
169      }
170      return retVal;
171    }
172
173    @Override
174    public final boolean nextKeyValue() throws IOException {
175      if (!iter.hasNext()) {
176        return false;
177      }
178
179      if (key == null) {
180        key = new LongWritable();
181      }
182      key.set(pos);
183
184      ArchiveRecord record = null;
185      try {
186        record = iter.next();
187      } catch (Exception e) {
188        return false;
189      }
190
191      if (record == null) {
192        return false;
193      }
194
195      if (value == null) {
196        value = new ArchiveRecordWritable();
197      }
198      value.setRecord(record);
199
200      return true;
201    }
202
203    @Override
204    public final LongWritable getCurrentKey() {
205      return key;
206    }
207
208    @Override
209    public final ArchiveRecordWritable getCurrentValue() {
210      return value;
211    }
212
213    @Override
214    public final float getProgress() throws IOException {
215      if (start == end) {
216        return 0.0f;
217      } else {
218        return Math.min(1.0f, (getFilePosition() - start) / (float)
219                (end - start));
220      }
221    }
222
223    @Override
224    public final synchronized void close() throws IOException {
225      reader.close();
226    }
227  }
228}