001/*
002 * Archives Unleashed Toolkit (AUT):
003 * An open-source platform for analyzing web archives.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 /**
019   *  @deprecated as of 0.12.0 and will be removed
020   *  in a future release. Use WacGenericArchiveInputFormat instead.
021   */
022
023
024package io.archivesunleashed.mapreduce;
025
026import io.archivesunleashed.io.ArcRecordWritable;
027import java.io.BufferedInputStream;
028import java.io.IOException;
029import java.util.Iterator;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.Seekable;
035import org.apache.hadoop.io.LongWritable;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.JobContext;
038import org.apache.hadoop.mapreduce.RecordReader;
039import org.apache.hadoop.mapreduce.TaskAttemptContext;
040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
041import org.apache.hadoop.mapreduce.lib.input.FileSplit;
042import org.archive.io.ArchiveRecord;
043import org.archive.io.arc.ARCReader;
044import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
045import org.archive.io.arc.ARCReaderFactory;
046import org.archive.io.arc.ARCRecord;
047
048/**
049 * Extends FileInputFormat for Web Archive Commons ARC InputFormat.
050 */
051@Deprecated
052public class WacArcInputFormat extends FileInputFormat<LongWritable,
053       ArcRecordWritable> {
054  @Override
055  public final RecordReader<LongWritable, ArcRecordWritable>
056  createRecordReader(
057          final InputSplit split,
058      final TaskAttemptContext context) throws IOException,
059  InterruptedException {
060    return new ArcRecordReader();
061  }
062
063  @Override
064  protected final boolean isSplitable(final JobContext context,
065          final Path filename) {
066    return false;
067  }
068
069  /**
070   * Extends RecordReader for ARC Record Reader.
071   */
072  @Deprecated
073  public class ArcRecordReader extends RecordReader<LongWritable,
074         ArcRecordWritable> {
075
076    /**
077     * ARC reader.
078     */
079    private ARCReader reader;
080
081    /**
082     * Start position of ARC being read.
083     */
084    private long start;
085
086    /**
087     * A given position of a ARC being read.
088     */
089    private long pos;
090
091    /**
092     * End position of a ARC being read.
093     */
094    private long end;
095
096    /**
097     * LongWritable key.
098     */
099    private LongWritable key = null;
100
101    /**
102     * ArcRecordWritable value.
103     */
104    private ArcRecordWritable value = null;
105
106    /**
107     * Seekable file position.
108     */
109    private Seekable filePosition;
110
111    /**
112     * Iterator for ArchiveRecord.
113     */
114    private Iterator<ArchiveRecord> iter;
115
116    @Override
117    public final void initialize(final InputSplit genericSplit,
118            final TaskAttemptContext context)
119    throws IOException {
120      FileSplit split = (FileSplit) genericSplit;
121      Configuration job = context.getConfiguration();
122      start = split.getStart();
123      end = start + split.getLength();
124      final Path file = split.getPath();
125
126      FileSystem fs = file.getFileSystem(job);
127      FSDataInputStream fileIn = fs.open(split.getPath());
128
129      reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(),
130          new BufferedInputStream(fileIn), true);
131
132      iter = reader.iterator();
133      //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(),
134      //fileIn, true);
135
136      this.pos = start;
137    }
138
139    /**
140     * Determins if ARC is compressed.
141     *
142     * @return reader true/false
143     */
144    private boolean isCompressedInput() {
145      return reader instanceof CompressedARCReader;
146    }
147
148    /**
149     * Get file postion of ARC.
150     *
151     * @return retVal position of ARC
152     * @throws IOException is there is an issue
153     */
154    private long getFilePosition() throws IOException {
155      long retVal;
156      if (isCompressedInput() && null != filePosition) {
157        retVal = filePosition.getPos();
158      } else {
159        retVal = pos;
160      }
161      return retVal;
162    }
163
164    @Override
165    public final boolean nextKeyValue() throws IOException {
166      if (!iter.hasNext()) {
167        return false;
168      }
169
170      if (key == null) {
171        key = new LongWritable();
172      }
173      key.set(pos);
174
175      ARCRecord record = (ARCRecord) iter.next();
176      if (record == null) {
177        return false;
178      }
179
180      if (value == null) {
181        value = new ArcRecordWritable();
182      }
183      value.setRecord(record);
184
185      return true;
186    }
187
188    @Override
189    public final LongWritable getCurrentKey() {
190      return key;
191    }
192
193    @Override
194    public final ArcRecordWritable getCurrentValue() {
195      return value;
196    }
197
198    @Override
199    public final float getProgress() throws IOException {
200      if (start == end) {
201        return 0.0f;
202      } else {
203        return Math.min(1.0f, (getFilePosition() - start) / (float)
204                (end - start));
205      }
206    }
207
208    @Override
209    public final synchronized void close() throws IOException {
210      reader.close();
211    }
212  }
213}