001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 /**
019   *  @deprecated as of 0.12.0 and will be replaced
020   *  with WacInputFormat in a future release.
021   */
022
023package io.archivesunleashed.mapreduce;
024
025import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat;
026import io.archivesunleashed.io.GenericArchiveRecordWritable;
027import java.io.BufferedInputStream;
028import java.io.IOException;
029import java.util.Iterator;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.Seekable;
035import org.apache.hadoop.io.LongWritable;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.JobContext;
038import org.apache.hadoop.mapreduce.RecordReader;
039import org.apache.hadoop.mapreduce.TaskAttemptContext;
040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
041import org.apache.hadoop.mapreduce.lib.input.FileSplit;
042import org.archive.io.ArchiveReader;
043import org.archive.io.ArchiveReaderFactory;
044import org.archive.io.ArchiveRecord;
045import org.archive.io.arc.ARCReader;
046import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
047import org.archive.io.warc.WARCReader;
048import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
049
050/**
051 * Extends FileInputFormat for Web Archive Commons Generic InputFormat.
052 */
053public class WacGenericInputFormat extends FileInputFormat<LongWritable,
054       GenericArchiveRecordWritable> {
055  @Override
056  public final RecordReader<LongWritable,
057  GenericArchiveRecordWritable> createRecordReader(final InputSplit split,
058      final TaskAttemptContext context) throws IOException,
059  InterruptedException {
060    return new GenericArchiveRecordReader();
061  }
062
063  @Override
064  protected final boolean isSplitable(final JobContext context,
065          final Path filename) {
066    return false;
067  }
068
069  /**
070   * Extends RecordReader for Generic Record Reader.
071   */
072  public class GenericArchiveRecordReader extends RecordReader<LongWritable,
073         GenericArchiveRecordWritable> {
074
075    /**
076     * Generic archive reader.
077     */
078    private ArchiveReader reader;
079
080    /**
081     * Generic archive format.
082     */
083    private ArchiveFormat format;
084
085    /**
086     * Start position of generic archive being read.
087     */
088    private long start;
089
090    /**
091     * A given position of a generic archive being read.
092     */
093    private long pos;
094
095    /**
096     * End position of a generic archive being read.
097     */
098    private long end;
099
100    /**
101     * LongWritable key.
102     */
103    private LongWritable key = null;
104
105    /**
106     * GenericArchiveRecordWritable value.
107     */
108    private GenericArchiveRecordWritable value = null;
109
110    /**
111     * Seekable file position.
112     */
113    private Seekable filePosition;
114
115    /**
116     * Iterator for generic archive record.
117     */
118    private Iterator<ArchiveRecord> iter;
119
120    @Override
121    public final void initialize(final InputSplit genericSplit,
122            final TaskAttemptContext context)
123    throws IOException {
124      FileSplit split = (FileSplit) genericSplit;
125      Configuration job = context.getConfiguration();
126      start = split.getStart();
127      end = start + split.getLength();
128      final Path file = split.getPath();
129
130      FileSystem fs = file.getFileSystem(job);
131      FSDataInputStream fileIn = fs.open(split.getPath());
132
133      reader = ArchiveReaderFactory.get(split.getPath().toString(),
134          new BufferedInputStream(fileIn), true);
135
136      if (reader instanceof ARCReader) {
137        format = ArchiveFormat.ARC;
138        iter = reader.iterator();
139      }
140
141      if (reader instanceof WARCReader) {
142        format = ArchiveFormat.WARC;
143        iter = reader.iterator();
144      }
145
146      this.pos = start;
147    }
148
149    /**
150     * Determins if generic archive is compressed.
151     *
152     * @return instanceof if ARC/WARC
153     */
154    private boolean isCompressedInput() {
155      if (format == ArchiveFormat.ARC) {
156        return reader instanceof CompressedARCReader;
157      } else {
158        return reader instanceof CompressedWARCReader;
159      }
160    }
161
162    /**
163     * Get file position of generic archive.
164     *
165     * @return retVal position of generic archive
166     * @throws IOException if there is an issue
167     */
168    private long getFilePosition() throws IOException {
169      long retVal;
170      if (isCompressedInput() && null != filePosition) {
171        retVal = filePosition.getPos();
172      } else {
173        retVal = pos;
174      }
175      return retVal;
176    }
177
178    @Override
179    public final boolean nextKeyValue() throws IOException {
180      if (!iter.hasNext()) {
181        return false;
182      }
183
184      if (key == null) {
185        key = new LongWritable();
186      }
187      key.set(pos);
188
189      ArchiveRecord record = null;
190      try {
191        record = iter.next();
192      } catch (Exception e) {
193        return false;
194      }
195
196      if (record == null) {
197        return false;
198      }
199
200      if (value == null) {
201        value = new GenericArchiveRecordWritable();
202      }
203      value.setRecord(record);
204
205      return true;
206    }
207
208    @Override
209    public final LongWritable getCurrentKey() {
210      return key;
211    }
212
213    @Override
214    public final GenericArchiveRecordWritable getCurrentValue() {
215      return value;
216    }
217
218    @Override
219    public final float getProgress() throws IOException {
220      if (start == end) {
221        return 0.0f;
222      } else {
223        return Math.min(1.0f, (getFilePosition() - start) / (float)
224                (end - start));
225      }
226    }
227
228    @Override
229    public final synchronized void close() throws IOException {
230      reader.close();
231    }
232  }
233}