001/*
002 * Copyright © 2017 The Archives Unleashed Project
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package io.archivesunleashed.data;
018
019import io.archivesunleashed.data.ArchiveRecordWritable.ArchiveFormat;
020import java.io.BufferedInputStream;
021import java.io.IOException;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FSDataInputStream;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.fs.Seekable;
028import org.apache.hadoop.io.LongWritable;
029import org.apache.hadoop.mapreduce.InputSplit;
030import org.apache.hadoop.mapreduce.JobContext;
031import org.apache.hadoop.mapreduce.RecordReader;
032import org.apache.hadoop.mapreduce.TaskAttemptContext;
033import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
034import org.apache.hadoop.mapreduce.lib.input.FileSplit;
035import org.apache.log4j.Logger;
036import org.archive.io.ArchiveReader;
037import org.archive.io.ArchiveReaderFactory;
038import org.archive.io.ArchiveRecord;
039import org.archive.io.arc.ARCReader;
040import org.archive.io.arc.ARCReaderFactory.CompressedARCReader;
041import org.archive.io.warc.WARCReader;
042import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader;
043
044/** Extends FileInputFormat for Web Archive Commons InputFormat. */
045public class ArchiveRecordInputFormat extends FileInputFormat<LongWritable, ArchiveRecordWritable> {
046  /** Setup logger. */
047  private static final Logger LOG = Logger.getLogger(ArchiveRecordInputFormat.class);
048
049  @Override
050  public final RecordReader<LongWritable, ArchiveRecordWritable> createRecordReader(
051      final InputSplit split, final TaskAttemptContext context)
052      throws IOException, InterruptedException {
053    return new ArchiveRecordReader();
054  }
055
056  @Override
057  protected final boolean isSplitable(final JobContext context, final Path filename) {
058    return false;
059  }
060
061  /** Extends RecordReader for Record Reader. */
062  public class ArchiveRecordReader extends RecordReader<LongWritable, ArchiveRecordWritable> {
063
064    /** Archive reader. */
065    private ArchiveReader reader;
066
067    /** Archive format. */
068    private ArchiveFormat format;
069
070    /** Start position of archive being read. */
071    private long start;
072
073    /** A given position of an archive being read. */
074    private long pos;
075
076    /** End position of an archive being read. */
077    private long end;
078
079    /** LongWritable key. */
080    private LongWritable key = null;
081
082    /** ArchiveRecordWritable value. */
083    private ArchiveRecordWritable value = null;
084
085    /** Archive file name. */
086    private String fileName;
087
088    /** Seekable file position. */
089    private Seekable filePosition;
090
091    /** Iterator for an archive record. */
092    private Iterator<ArchiveRecord> iter;
093
094    @Override
095    public final void initialize(
096        final InputSplit archiveRecordSplit, final TaskAttemptContext context) throws IOException {
097      FileSplit split = (FileSplit) archiveRecordSplit;
098      Configuration job = context.getConfiguration();
099      start = split.getStart();
100      end = start + split.getLength();
101      final Path file = split.getPath();
102
103      FileSystem fs = file.getFileSystem(job);
104      FSDataInputStream fileIn = fs.open(split.getPath());
105      fileName = split.getPath().toString();
106
107      reader = ArchiveReaderFactory.get(fileName, new BufferedInputStream(fileIn), true);
108
109      if (reader instanceof ARCReader) {
110        format = ArchiveFormat.ARC;
111        iter = reader.iterator();
112      }
113
114      if (reader instanceof WARCReader) {
115        format = ArchiveFormat.WARC;
116        iter = reader.iterator();
117      }
118
119      this.pos = start;
120    }
121
122    /**
123     * Determines if archive is compressed.
124     *
125     * @return instanceof if ARC/WARC
126     */
127    private boolean isCompressedInput() {
128      if (format == ArchiveFormat.ARC) {
129        return reader instanceof CompressedARCReader;
130      } else {
131        return reader instanceof CompressedWARCReader;
132      }
133    }
134
135    /**
136     * Get file position of archive.
137     *
138     * @return retVal position of archive
139     * @throws IOException if there is an issue
140     */
141    private long getFilePosition() throws IOException {
142      long retVal;
143      if (isCompressedInput() && null != filePosition) {
144        retVal = filePosition.getPos();
145      } else {
146        retVal = pos;
147      }
148      return retVal;
149    }
150
151    @Override
152    public final boolean nextKeyValue() throws IOException {
153      if (!iter.hasNext()) {
154        return false;
155      }
156
157      if (key == null) {
158        key = new LongWritable();
159      }
160      key.set(pos);
161
162      ArchiveRecord record = null;
163      try {
164        record = iter.next();
165      } catch (Exception e) {
166        return false;
167      }
168
169      if (record == null) {
170        return false;
171      }
172
173      if (value == null) {
174        value = new ArchiveRecordWritable();
175      }
176      value.setRecord(record);
177
178      return true;
179    }
180
181    @Override
182    public final LongWritable getCurrentKey() {
183      return key;
184    }
185
186    @Override
187    public final ArchiveRecordWritable getCurrentValue() {
188      return value;
189    }
190
191    @Override
192    public final float getProgress() throws IOException {
193      if (start == end) {
194        return 0.0f;
195      } else {
196        return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
197      }
198    }
199
200    @Override
201    public final synchronized void close() throws IOException {
202      reader.close();
203      LOG.info("Closed archive file " + fileName);
204    }
205  }
206}