001/*
002 * Copyright © 2017 The Archives Unleashed Project
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package io.archivesunleashed.data;
018
019import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat;
020import org.apache.log4j.Logger;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.fs.Seekable;
026import org.apache.hadoop.io.LongWritable;
027import org.apache.hadoop.mapreduce.InputSplit;
028import org.apache.hadoop.mapreduce.JobContext;
029import org.apache.hadoop.mapreduce.RecordReader;
030import org.apache.hadoop.mapreduce.TaskAttemptContext;
031import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
032import org.apache.hadoop.mapreduce.lib.input.FileSplit;
033import org.archive.io.ArchiveReader;
034import org.archive.io.ArchiveReaderFactory;
035import org.archive.io.ArchiveRecord;
036import org.archive.io.arc.ARCReader;
037import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
038import org.archive.io.warc.WARCReader;
039import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
040
041import java.io.BufferedInputStream;
042import java.io.IOException;
043import java.util.Iterator;
044
045/**
046 * Extends FileInputFormat for Web Archive Commons InputFormat.
047 */
048public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable,
049  ArchiveRecordWritable> {
050  /**
051   * Setup logger.
052   */
053  private static final Logger LOG =
054    Logger.getLogger(ArchiveRecordInputFormat.class);
055
056  @Override
057  public final RecordReader<LongWritable,
058    ArchiveRecordWritable> createRecordReader(final InputSplit split,
059      final TaskAttemptContext context) throws IOException,
060  InterruptedException {
061    return new ArchiveRecordReader();
062  }
063
064  @Override
065  protected final boolean isSplitable(final JobContext context,
066          final Path filename) {
067    return false;
068  }
069
070  /**
071   * Extends RecordReader for Record Reader.
072   */
073  public class ArchiveRecordReader extends RecordReader<LongWritable,
074    ArchiveRecordWritable> {
075
076    /**
077     * Archive reader.
078     */
079    private ArchiveReader reader;
080
081    /**
082     * Archive format.
083     */
084    private ArchiveFormat format;
085
086    /**
087     * Start position of archive being read.
088     */
089    private long start;
090
091    /**
092     * A given position of an archive being read.
093     */
094    private long pos;
095
096    /**
097     * End position of an archive being read.
098     */
099    private long end;
100
101    /**
102     * LongWritable key.
103     */
104    private LongWritable key = null;
105
106    /**
107     * ArchiveRecordWritable value.
108     */
109    private ArchiveRecordWritable value = null;
110
111    /**
112     * Archive file name.
113     */
114    private String fileName;
115
116    /**
117     * Seekable file position.
118     */
119    private Seekable filePosition;
120
121    /**
122     * Iterator for an archive record.
123     */
124    private Iterator<ArchiveRecord> iter;
125
126    @Override
127    public final void initialize(final InputSplit archiveRecordSplit,
128            final TaskAttemptContext context)
129    throws IOException {
130      FileSplit split = (FileSplit) archiveRecordSplit;
131      Configuration job = context.getConfiguration();
132      start = split.getStart();
133      end = start + split.getLength();
134      final Path file = split.getPath();
135
136      FileSystem fs = file.getFileSystem(job);
137      FSDataInputStream fileIn = fs.open(split.getPath());
138      fileName = split.getPath().toString();
139
140      reader = ArchiveReaderFactory.get(fileName,
141          new BufferedInputStream(fileIn), true);
142
143      if (reader instanceof ARCReader) {
144        format = ArchiveFormat.ARC;
145        iter = reader.iterator();
146      }
147
148      if (reader instanceof WARCReader) {
149        format = ArchiveFormat.WARC;
150        iter = reader.iterator();
151      }
152
153      this.pos = start;
154    }
155
156    /**
157     * Determines if archive is compressed.
158     *
159     * @return instanceof if ARC/WARC
160     */
161    private boolean isCompressedInput() {
162      if (format == ArchiveFormat.ARC) {
163        return reader instanceof CompressedARCReader;
164      } else {
165        return reader instanceof CompressedWARCReader;
166      }
167    }
168
169    /**
170     * Get file position of archive.
171     *
172     * @return retVal position of archive
173     * @throws IOException if there is an issue
174     */
175    private long getFilePosition() throws IOException {
176      long retVal;
177      if (isCompressedInput() && null != filePosition) {
178        retVal = filePosition.getPos();
179      } else {
180        retVal = pos;
181      }
182      return retVal;
183    }
184
185    @Override
186    public final boolean nextKeyValue() throws IOException {
187      if (!iter.hasNext()) {
188        return false;
189      }
190
191      if (key == null) {
192        key = new LongWritable();
193      }
194      key.set(pos);
195
196      ArchiveRecord record = null;
197      try {
198        record = iter.next();
199      } catch (Exception e) {
200        return false;
201      }
202
203      if (record == null) {
204        return false;
205      }
206
207      if (value == null) {
208        value = new ArchiveRecordWritable();
209      }
210      value.setRecord(record);
211
212      return true;
213    }
214
215    @Override
216    public final LongWritable getCurrentKey() {
217      return key;
218    }
219
220    @Override
221    public final ArchiveRecordWritable getCurrentValue() {
222      return value;
223    }
224
225    @Override
226    public final float getProgress() throws IOException {
227      if (start == end) {
228        return 0.0f;
229      } else {
230        return Math.min(1.0f, (getFilePosition() - start) / (float)
231                (end - start));
232      }
233    }
234
235    @Override
236    public final synchronized void close() throws IOException {
237      reader.close();
238      LOG.info("Closed archive file " + fileName);
239    }
240  }
241}