001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package io.archivesunleashed.mapreduce;
018
019import io.archivesunleashed.io.ArcRecordWritable;
020import java.io.BufferedInputStream;
021import java.io.IOException;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FSDataInputStream;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.fs.Seekable;
028import org.apache.hadoop.io.LongWritable;
029import org.apache.hadoop.mapreduce.InputSplit;
030import org.apache.hadoop.mapreduce.JobContext;
031import org.apache.hadoop.mapreduce.RecordReader;
032import org.apache.hadoop.mapreduce.TaskAttemptContext;
033import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
034import org.apache.hadoop.mapreduce.lib.input.FileSplit;
035import org.archive.io.ArchiveRecord;
036import org.archive.io.arc.ARCReader;
037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
038import org.archive.io.arc.ARCReaderFactory;
039import org.archive.io.arc.ARCRecord;
040
041/**
042 * Extends FileInputFormat for Web Archive Commons ARC InputFormat.
043 */
044public class WacArcInputFormat extends FileInputFormat<LongWritable,
045       ArcRecordWritable> {
046  @Override
047  public final RecordReader<LongWritable, ArcRecordWritable>
048  createRecordReader(
049          final InputSplit split,
050      final TaskAttemptContext context) throws IOException,
051  InterruptedException {
052    return new ArcRecordReader();
053  }
054
055  @Override
056  protected final boolean isSplitable(final JobContext context,
057          final Path filename) {
058    return false;
059  }
060
061  /**
062   * Extends RecordReader for ARC Record Reader.
063   */
064  public class ArcRecordReader extends RecordReader<LongWritable,
065         ArcRecordWritable> {
066
067    /**
068     * ARC reader.
069     */
070    private ARCReader reader;
071
072    /**
073     * Start position of ARC being read.
074     */
075    private long start;
076
077    /**
078     * A given position of a ARC being read.
079     */
080    private long pos;
081
082    /**
083     * End position of a ARC being read.
084     */
085    private long end;
086
087    /**
088     * LongWritable key.
089     */
090    private LongWritable key = null;
091
092    /**
093     * ArcRecordWritable value.
094     */
095    private ArcRecordWritable value = null;
096
097    /**
098     * Seekable file position.
099     */
100    private Seekable filePosition;
101
102    /**
103     * Iterator for ArchiveRecord.
104     */
105    private Iterator<ArchiveRecord> iter;
106
107    @Override
108    public final void initialize(final InputSplit genericSplit,
109            final TaskAttemptContext context)
110    throws IOException {
111      FileSplit split = (FileSplit) genericSplit;
112      Configuration job = context.getConfiguration();
113      start = split.getStart();
114      end = start + split.getLength();
115      final Path file = split.getPath();
116
117      FileSystem fs = file.getFileSystem(job);
118      FSDataInputStream fileIn = fs.open(split.getPath());
119
120      reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(),
121          new BufferedInputStream(fileIn), true);
122
123      iter = reader.iterator();
124      //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(),
125      //fileIn, true);
126
127      this.pos = start;
128    }
129
130    /**
131     * Determins if ARC is compressed.
132     *
133     * @return reader true/false
134     */
135    private boolean isCompressedInput() {
136      return reader instanceof CompressedARCReader;
137    }
138
139    /**
140     * Get file postion of ARC.
141     *
142     * @return retVal position of ARC
143     * @throws IOException is there is an issue
144     */
145    private long getFilePosition() throws IOException {
146      long retVal;
147      if (isCompressedInput() && null != filePosition) {
148        retVal = filePosition.getPos();
149      } else {
150        retVal = pos;
151      }
152      return retVal;
153    }
154
155    @Override
156    public final boolean nextKeyValue() throws IOException {
157      if (!iter.hasNext()) {
158        return false;
159      }
160
161      if (key == null) {
162        key = new LongWritable();
163      }
164      key.set(pos);
165
166      ARCRecord record = (ARCRecord) iter.next();
167      if (record == null) {
168        return false;
169      }
170
171      if (value == null) {
172        value = new ArcRecordWritable();
173      }
174      value.setRecord(record);
175
176      return true;
177    }
178
179    @Override
180    public final LongWritable getCurrentKey() {
181      return key;
182    }
183
184    @Override
185    public final ArcRecordWritable getCurrentValue() {
186      return value;
187    }
188
189    @Override
190    public final float getProgress() throws IOException {
191      if (start == end) {
192        return 0.0f;
193      } else {
194        return Math.min(1.0f, (getFilePosition() - start) / (float)
195                (end - start));
196      }
197    }
198
199    @Override
200    public final synchronized void close() throws IOException {
201      reader.close();
202    }
203  }
204}