001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 /**
019   *  @deprecated as of 0.12.0 and will be removed
020   *  in a future release. Use WacGenericArchiveInputFormat instead.
021   */
022
023package io.archivesunleashed.mapreduce;
024
025import io.archivesunleashed.io.WarcRecordWritable;
026import java.io.BufferedInputStream;
027import java.io.IOException;
028import java.util.Iterator;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.Seekable;
034import org.apache.hadoop.io.LongWritable;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.JobContext;
037import org.apache.hadoop.mapreduce.RecordReader;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
040import org.apache.hadoop.mapreduce.lib.input.FileSplit;
041import org.archive.io.ArchiveRecord;
042import org.archive.io.warc.WARCReader;
043import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
044import org.archive.io.warc.WARCReaderFactory;
045import org.archive.io.warc.WARCRecord;
046
047/**
048 * Extends FileInputFormat for Web Archive Commons WARC InputFormat.
049 */
050@Deprecated
051public class WacWarcInputFormat extends FileInputFormat<LongWritable,
052       WarcRecordWritable> {
053  @Override
054  public final RecordReader<LongWritable, WarcRecordWritable>
055  createRecordReader(
056          final InputSplit split,
057      final TaskAttemptContext context) throws IOException,
058  InterruptedException {
059    return new WarcRecordReader();
060  }
061
062  @Override
063  protected final boolean isSplitable(final JobContext context,
064          final Path filename) {
065    return false;
066  }
067
068  /**
069   * Extends RecordReader for WARC Record Reader.
070   */
071  public class WarcRecordReader extends RecordReader<LongWritable,
072         WarcRecordWritable> {
073
074    /**
075     * WARC reader.
076     */
077    private WARCReader reader;
078
079    /**
080     * Start position of WARC being read.
081     */
082    private long start;
083
084    /**
085     * A given position of a WARC being read.
086     */
087    private long pos;
088
089    /**
090     * End position of a WARC being read.
091     */
092    private long end;
093
094    /**
095     * LongWritable key.
096     */
097    private LongWritable key = null;
098
099    /**
100     * WarcRecordWritable value.
101     */
102    private WarcRecordWritable value = null;
103
104    /**
105     * Seekable file position.
106     */
107    private Seekable filePosition;
108
109    /**
110     * Iterator for ArchiveRecord.
111     */
112    private Iterator<ArchiveRecord> iter;
113
114    @Override
115    public final void initialize(final InputSplit genericSplit,
116            final TaskAttemptContext context)
117    throws IOException {
118      FileSplit split = (FileSplit) genericSplit;
119      Configuration job = context.getConfiguration();
120      start = split.getStart();
121      end = start + split.getLength();
122      final Path file = split.getPath();
123
124      FileSystem fs = file.getFileSystem(job);
125      FSDataInputStream fileIn = fs.open(split.getPath());
126
127      reader = (WARCReader) WARCReaderFactory.get(split.getPath().toString(),
128          new BufferedInputStream(fileIn), true);
129
130      iter = reader.iterator();
131      //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(),
132      //fileIn, true);
133
134      this.pos = start;
135    }
136
137    /**
138     * Determins if WARC is compressed.
139     *
140     * @return reader true/false
141     */
142    private boolean isCompressedInput() {
143      return reader instanceof CompressedWARCReader;
144    }
145
146    /**
147     * Get file position of WARC.
148     *
149     * @return retVal position of WARC
150     * @throws IOException is there is an issue
151     */
152    private long getFilePosition() throws IOException {
153      long retVal;
154      if (isCompressedInput() && null != filePosition) {
155        retVal = filePosition.getPos();
156      } else {
157        retVal = pos;
158      }
159      return retVal;
160    }
161
162    @Override
163    public final boolean nextKeyValue() throws IOException {
164      if (!iter.hasNext()) {
165        return false;
166      }
167
168      if (key == null) {
169        key = new LongWritable();
170      }
171      key.set(pos);
172
173      WARCRecord record = (WARCRecord) iter.next();
174      if (record == null) {
175        return false;
176      }
177
178      if (value == null) {
179        value = new WarcRecordWritable();
180      }
181      value.setRecord(record);
182
183      return true;
184    }
185
186    @Override
187    public final LongWritable getCurrentKey() {
188      return key;
189    }
190
191    @Override
192    public final WarcRecordWritable getCurrentValue() {
193      return value;
194    }
195
196    @Override
197    public final float getProgress() throws IOException {
198      if (start == end) {
199        return 0.0f;
200      } else {
201        return Math.min(1.0f, (getFilePosition() - start) / (float)
202                (end - start));
203      }
204    }
205
206    @Override
207    public final synchronized void close() throws IOException {
208      reader.close();
209    }
210  }
211}