001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package io.archivesunleashed.mapreduce;
018
019import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat;
020import io.archivesunleashed.io.GenericArchiveRecordWritable;
021import java.io.BufferedInputStream;
022import java.io.IOException;
023import java.util.Iterator;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.Seekable;
029import org.apache.hadoop.io.LongWritable;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.RecordReader;
033import org.apache.hadoop.mapreduce.TaskAttemptContext;
034import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
035import org.apache.hadoop.mapreduce.lib.input.FileSplit;
036import org.archive.io.ArchiveReader;
037import org.archive.io.ArchiveReaderFactory;
038import org.archive.io.ArchiveRecord;
039import org.archive.io.arc.ARCReader;
040import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
041import org.archive.io.warc.WARCReader;
042import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
043
044/**
045 * Extends FileInputFormat for Web Archive Commons Generic InputFormat.
046 */
047public class WacGenericInputFormat extends FileInputFormat<LongWritable,
048       GenericArchiveRecordWritable> {
049  @Override
050  public final RecordReader<LongWritable,
051  GenericArchiveRecordWritable> createRecordReader(final InputSplit split,
052      final TaskAttemptContext context) throws IOException,
053  InterruptedException {
054    return new GenericArchiveRecordReader();
055  }
056
057  @Override
058  protected final boolean isSplitable(final JobContext context,
059          final Path filename) {
060    return false;
061  }
062
063  /**
064   * Extends RecordReader for Generic Record Reader.
065   */
066  public class GenericArchiveRecordReader extends RecordReader<LongWritable,
067         GenericArchiveRecordWritable> {
068
069    /**
070     * Generic archive reader.
071     */
072    private ArchiveReader reader;
073
074    /**
075     * Generic archive format.
076     */
077    private ArchiveFormat format;
078
079    /**
080     * Start position of generic archive being read.
081     */
082    private long start;
083
084    /**
085     * A given position of a generic archive being read.
086     */
087    private long pos;
088
089    /**
090     * End position of a generic archive being read.
091     */
092    private long end;
093
094    /**
095     * LongWritable key.
096     */
097    private LongWritable key = null;
098
099    /**
100     * GenericArchiveRecordWritable value.
101     */
102    private GenericArchiveRecordWritable value = null;
103
104    /**
105     * Seekable file position.
106     */
107    private Seekable filePosition;
108
109    /**
110     * Iterator for generic archive record.
111     */
112    private Iterator<ArchiveRecord> iter;
113
114    @Override
115    public final void initialize(final InputSplit genericSplit,
116            final TaskAttemptContext context)
117    throws IOException {
118      FileSplit split = (FileSplit) genericSplit;
119      Configuration job = context.getConfiguration();
120      start = split.getStart();
121      end = start + split.getLength();
122      final Path file = split.getPath();
123
124      FileSystem fs = file.getFileSystem(job);
125      FSDataInputStream fileIn = fs.open(split.getPath());
126
127      reader = ArchiveReaderFactory.get(split.getPath().toString(),
128          new BufferedInputStream(fileIn), true);
129
130      if (reader instanceof ARCReader) {
131        format = ArchiveFormat.ARC;
132        iter = reader.iterator();
133      }
134
135      if (reader instanceof WARCReader) {
136        format = ArchiveFormat.WARC;
137        iter = reader.iterator();
138      }
139
140      this.pos = start;
141    }
142
143    /**
144     * Determins if generic archive is compressed.
145     *
146     * @return instanceof if ARC/WARC
147     */
148    private boolean isCompressedInput() {
149      if (format == ArchiveFormat.ARC) {
150        return reader instanceof CompressedARCReader;
151      } else {
152        return reader instanceof CompressedWARCReader;
153      }
154    }
155
156    /**
157     * Get file position of generic archive.
158     *
159     * @return retVal position of generic archive
160     * @throws IOException if there is an issue
161     */
162    private long getFilePosition() throws IOException {
163      long retVal;
164      if (isCompressedInput() && null != filePosition) {
165        retVal = filePosition.getPos();
166      } else {
167        retVal = pos;
168      }
169      return retVal;
170    }
171
172    @Override
173    public final boolean nextKeyValue() throws IOException {
174      if (!iter.hasNext()) {
175        return false;
176      }
177
178      if (key == null) {
179        key = new LongWritable();
180      }
181      key.set(pos);
182
183      ArchiveRecord record = null;
184      try {
185        record = iter.next();
186      } catch (Exception e) {
187        return false;
188      }
189
190      if (record == null) {
191        return false;
192      }
193
194      if (value == null) {
195        value = new GenericArchiveRecordWritable();
196      }
197      value.setRecord(record);
198
199      return true;
200    }
201
202    @Override
203    public final LongWritable getCurrentKey() {
204      return key;
205    }
206
207    @Override
208    public final GenericArchiveRecordWritable getCurrentValue() {
209      return value;
210    }
211
212    @Override
213    public final float getProgress() throws IOException {
214      if (start == end) {
215        return 0.0f;
216      } else {
217        return Math.min(1.0f, (getFilePosition() - start) / (float)
218                (end - start));
219      }
220    }
221
222    @Override
223    public final synchronized void close() throws IOException {
224      reader.close();
225    }
226  }
227}