extern crate boolinator;
extern crate downcast_rs;
extern crate glob;
extern crate yaml_rust;
use crate::command::ExtCommand;
use crate::error::Error;
use crate::executor::*;
use crate::run::Run;
use crate::source::*;
use crate::*;
use boolinator::Boolinator;
use downcast_rs::Downcast;
use failure::ResultExt;
use glob::glob;
use log::error;
use std::collections::{HashMap, HashSet};
use std::convert::{From, Into};
use std::fmt::Debug;
use std::fs::read_to_string;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::str::FromStr;
use yaml_rust::{Yaml, YamlLoader};
mod yaml;
pub use yaml::FromYaml;
pub use yaml::ParseYaml;
pub type CollectionMap = HashMap<String, Rc<Collection>>;
pub trait YamlExt {
fn require_string(&self, field: &str) -> Result<&str, Error>;
}
impl YamlExt for Yaml {
fn require_string(&self, field: &str) -> Result<&str, Error> {
self[field]
.as_str()
.ok_or_else(|| Error::from(format!("field {} missing or not string", field)))
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct Encoding(String);
impl FromStr for Encoding {
type Err = Error;
fn from_str(name: &str) -> Result<Self, Error> {
Ok(Self(name.to_string()))
}
}
impl From<&str> for Encoding {
fn from(name: &str) -> Self {
Self(name.to_string())
}
}
impl AsRef<str> for Encoding {
fn as_ref(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Display for Encoding {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}
impl FromYaml for Encoding {
fn from_yaml(yaml: &Yaml) -> Result<Self, Error> {
Ok(Self(String::from_yaml(yaml)?))
}
}
pub trait CollectionType: Debug + Downcast + fmt::Display {
fn parse_command(
&self,
executor: &dyn PisaExecutor,
collection: &Collection,
) -> Result<ExtCommand, Error>;
}
impl_downcast!(CollectionType);
impl FromYaml for Box<CollectionType> {
fn from_yaml(yaml: &Yaml) -> Result<Self, Error> {
CollectionType::from(yaml.parse::<String>()?)
}
}
impl CollectionType {
pub fn from<S>(name: S) -> Result<Box<dyn CollectionType>, Error>
where
S: AsRef<str>,
{
match name.as_ref() {
"wapo" => Ok(WashingtonPostCollection::boxed()),
"trecweb" => Ok(TrecWebCollection::boxed()),
"warc" => Ok(WarcCollection::boxed()),
_ => Err(Error::from(format!(
"Unknown collection type: {}",
name.as_ref()
))),
}
}
}
pub(crate) fn resolve_files<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {
let pattern = path.as_ref().to_str().unwrap();
let files: Vec<_> = glob(pattern).unwrap().filter_map(Result::ok).collect();
(!files.is_empty()).ok_or(format!(
"could not resolve any files for pattern: {}",
pattern
))?;
Ok(files)
}
#[derive(Debug, Default, PartialEq)]
pub struct TrecWebCollection;
impl TrecWebCollection {
pub fn boxed() -> Box<Self> {
Box::new(Self::default())
}
}
impl fmt::Display for TrecWebCollection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "trecweb")
}
}
impl CollectionType for TrecWebCollection {
fn parse_command(
&self,
executor: &dyn PisaExecutor,
collection: &Collection,
) -> Result<ExtCommand, Error> {
let input_files = resolve_files(collection.collection_dir.join("GX*/*.gz"))?;
Ok(ExtCommand::new("zcat")
.args(&input_files)
.pipe_command(executor.command("parse_collection"))
.args(&[
"-o",
collection.forward_index.to_str().unwrap(),
"-f",
"trecweb",
"--stemmer",
"porter2",
"--content-parser",
"html",
"--batch-size",
"1000",
]))
}
}
#[derive(Debug)]
pub struct WarcCollection;
impl WarcCollection {
pub fn boxed() -> Box<Self> {
Box::new(Self {})
}
}
impl fmt::Display for WarcCollection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "warc")
}
}
impl CollectionType for WarcCollection {
fn parse_command(
&self,
executor: &dyn PisaExecutor,
collection: &Collection,
) -> Result<ExtCommand, Error> {
let input_files = resolve_files(collection.collection_dir.join("en*/*.gz"))?;
Ok(ExtCommand::new("zcat")
.args(&input_files)
.pipe_command(executor.command("parse_collection"))
.args(&["-o", collection.forward_index.to_str().unwrap()])
.args(&["-f", "warc"])
.args(&["--stemmer", "porter2"])
.args(&["--content-parser", "html"])
.args(&["--batch-size", "1000"]))
}
}
#[derive(Debug)]
pub struct WashingtonPostCollection;
impl WashingtonPostCollection {
pub fn boxed() -> Box<Self> {
Box::new(Self {})
}
}
impl fmt::Display for WashingtonPostCollection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "wapo")
}
}
impl CollectionType for WashingtonPostCollection {
fn parse_command(
&self,
executor: &dyn PisaExecutor,
collection: &Collection,
) -> Result<ExtCommand, Error> {
let input_files = resolve_files(collection.collection_dir.join("data/*.jl"))?;
Ok(ExtCommand::new("cat")
.args(&input_files)
.pipe_command(executor.command("parse_collection"))
.args(&[
"-o",
collection.forward_index.to_str().unwrap(),
"-f",
"wapo",
"--stemmer",
"porter2",
"--content-parser",
"html",
"--batch-size",
"1000",
]))
}
}
#[derive(Debug)]
pub struct Collection {
pub name: String,
pub kind: Box<dyn CollectionType>,
pub collection_dir: PathBuf,
pub forward_index: PathBuf,
pub inverted_index: PathBuf,
pub encodings: Vec<Encoding>,
}
impl PartialEq for Collection {
fn eq(&self, other: &Self) -> bool {
(
self.kind.to_string(),
&self.collection_dir,
&self.forward_index,
&self.inverted_index,
&self.encodings,
) == (
other.kind.to_string(),
&other.collection_dir,
&other.forward_index,
&other.inverted_index,
&other.encodings,
)
}
}
impl FromYaml for Collection {
fn from_yaml(yaml: &Yaml) -> Result<Self, Error> {
let name: String = yaml.parse_field("name")?;
let forward_index: PathBuf = yaml
.parse_optional_field("forward_index")?
.unwrap_or_else(|| format!("fwd/{}", &name).into());
let inverted_index: PathBuf = yaml
.parse_optional_field("inverted_index")?
.unwrap_or_else(|| format!("fwd/{}", &name).into());
let encodings: Vec<Encoding> = yaml
.parse_field("encodings")
.context(format!("Failed to parse encodings for collection {}", name))?;
Ok(Self {
name,
kind: yaml.parse_field("kind")?,
collection_dir: yaml.parse_field("collection_dir")?,
forward_index,
inverted_index,
encodings,
})
}
}
impl Collection {
#[cfg_attr(tarpaulin, skip)]
pub fn fwd(&self) -> Result<&str, Error> {
let fwd = self
.forward_index
.to_str()
.ok_or("Failed to parse forward index path")?;
Ok(fwd)
}
#[cfg_attr(tarpaulin, skip)]
pub fn inv(&self) -> Result<&str, Error> {
let inv = self
.inverted_index
.to_str()
.ok_or("Failed to parse inverted index path")?;
Ok(inv)
}
}
#[derive(Debug)]
pub struct Config {
pub workdir: PathBuf,
pub source: Box<dyn PisaSource>,
suppressed: HashSet<Stage>,
pub collections: Vec<Rc<Collection>>,
pub runs: Vec<Run>,
}
impl FromYaml for Config {
fn from_yaml(yaml: &Yaml) -> Result<Self, Error> {
let workdir: PathBuf = yaml.parse_field("workdir")?;
let source: Box<dyn PisaSource> = yaml.parse_field("source")?;
let mut conf = Self::new(workdir, source);
let collections = conf.parse_collections(&yaml["collections"])?;
conf.parse_runs(&yaml["runs"], &collections)?;
Ok(conf)
}
}
impl Config {
pub fn new<P>(workdir: P, source: Box<dyn PisaSource>) -> Self
where
P: AsRef<Path>,
{
Self {
workdir: workdir.as_ref().to_path_buf(),
source,
suppressed: HashSet::new(),
collections: Vec::new(),
runs: Vec::new(),
}
}
pub fn executor(&self) -> Result<Box<dyn PisaExecutor>, Error> {
self.source.executor(&self)
}
pub fn from_file<P>(file: P) -> Result<Self, Error>
where
P: AsRef<Path>,
{
let content = read_to_string(&file).context("Failed to read config file")?;
match YamlLoader::load_from_str(&content) {
Ok(yaml) => Self::from_yaml(&yaml[0]),
Err(_) => Err("could not parse YAML file".into()),
}
}
fn parse_runs(&mut self, runs: &Yaml, collections: &CollectionMap) -> Result<(), Error> {
match runs {
Yaml::Array(runs) => {
for run in runs {
self.runs
.push(Run::parse(&run, collections, &self.workdir)?);
}
Ok(())
}
_ => Ok(()),
}
}
fn parse_collections(&mut self, collections: &Yaml) -> Result<CollectionMap, Error> {
match collections {
Yaml::Array(collections) => {
let mut collection_map: CollectionMap = HashMap::new();
for collection in collections {
match self.parse_collection(&collection) {
Ok(coll_config) => {
let name = coll_config.name.clone();
let collrc = Rc::new(coll_config);
self.collections.push(Rc::clone(&collrc));
collection_map.insert(name, collrc);
}
Err(err) => error!("Unable to parse collection config: {}", err),
}
}
if self.collections.is_empty() {
Err("no correct collection configurations found".into())
} else {
Ok(collection_map)
}
}
_ => Err("missing or corrupted collections config".into()),
}
}
pub fn suppress_stage(&mut self, stage: Stage) {
self.suppressed.insert(stage);
}
pub fn is_suppressed(&self, stage: Stage) -> bool {
self.suppressed.contains(&stage)
}
fn parse_collection(&self, yaml: &Yaml) -> Result<Collection, Error> {
let mut collconf: Collection = yaml.parse()?;
if !collconf.forward_index.is_absolute() {
collconf.forward_index = self.workdir.join(collconf.forward_index);
}
if !collconf.inverted_index.is_absolute() {
collconf.inverted_index = self.workdir.join(collconf.inverted_index);
}
Ok(collconf)
}
}
#[cfg(test)]
mod tests;