【Nutch2.2.1源代码分析之5】索引的基本流程

mac2022-06-30  24

一、各个主要类之间的关系 SolrIndexerJob extends IndexerJob 1、 IndexerJob:主要完成 2、Solr IndexerJob:主要完成 3、 IndexUtil:主要只有一个方法public NutchDocument index(String key, WebPage page),用于根据网页信息,返回一个solr的Document对象。 二、程序调用流程 查看Nutch中的执行脚本--nutch,得到以下信息: elif [ "$COMMAND" = "solrindex" ] ; thenCLASS=org.apache.nutch.indexer.solr.SolrIndexerJob 因此程序入口位于 SolrIndexerJob类中。 (一)org.apache.nutch.indexer.SolrIndexerJob 1、程序入口 public static void main(String[] args) throws Exception { final int res = ToolRunner.run(NutchConfiguration.create(), new SolrIndexerJob(), args); System.exit(res); }使用了ToolRunner.run()来执行程序,可参考: 使用ToolRunner运行Hadoop程序基本原理分析。 其中第一个参数主要是加载了nutch相关的参数,主要包括hadoop的core-default.xml、core-site.xml以及nutch的 nutch-default.xml、nutch-site.xml。 第二个参数指明了运行 SolrIndexerJob的run(String[])方法. 2、执行SolrIndexerJob类的run(String[])方法    public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: SolrIndexerJob <solr url> (<batchId> | -all | -reindex) [-crawlId <id>]"); return -1; } if (args.length == 4 && "-crawlId".equals(args[2])) { getConf().set(Nutch.CRAWL_ID_KEY, args[3]); } try { indexSolr(args[0], args[1]); return 0; } catch (final Exception e) { LOG.error("SolrIndexerJob: " + StringUtils.stringifyException(e)); return -1; } }先判断参数的合理性,然后执行 执行indexSolr(String,String)方法。 3、执行indexSolr(String,String)方法    public void indexSolr(String solrUrl, String batchId) throws Exception { LOG.info("SolrIndexerJob: starting"); run(ToolUtil.toArgMap( Nutch.ARG_SOLR, solrUrl, Nutch.ARG_BATCH, batchId)); // do the commits once and for all the reducers in one go getConf().set(SolrConstants.SERVER_URL,solrUrl); SolrServer solr = SolrUtils.getCommonsHttpSolrServer(getConf()); if (getConf().getBoolean(SolrConstants.COMMIT_INDEX, true)) { solr.commit(); } LOG.info("SolrIndexerJob: done."); } 4、执行run(Map<...>)方法   @Override public Map<String,Object> run(Map<String,Object> args) throws Exception { String solrUrl = (String)args.get(Nutch.ARG_SOLR); String batchId = (String)args.get(Nutch.ARG_BATCH); NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class); getConf().set(SolrConstants.SERVER_URL, solrUrl); currentJob = createIndexJob(getConf(), "solr-index", batchId); currentJob.waitForCompletion(true); ToolUtil.recordJobStatus(null, currentJob, results); return results; } (二)org.apache.nutch.indexer.IndexerJob 1、执行createIndexJob()方法。 protected Job createIndexJob(Configuration conf, String jobName, String batchId) throws IOException, ClassNotFoundException { conf.set(GeneratorJob.BATCH_ID, batchId); Job job = new NutchJob(conf, jobName); // TODO: Figure out why this needs to be here job.getConfiguration().setClass("mapred.output.key.comparator.class", StringComparator.class, RawComparator.class); Collection<WebPage.Field> fields = getFields(job); StorageUtils.initMapperJob(job, fields, String.class, NutchDocument.class, IndexerMapper.class); job.setNumReduceTasks(0); job.setOutputFormatClass(IndexerOutputFormat.class); return job; } } 2、执行map相关的方法,包括setup(),map(),cleanup() public static class IndexerMapper extends GoraMapper<String, WebPage, String, NutchDocument> { public IndexUtil indexUtil; public DataStore<String, WebPage> store; protected Utf8 batchId; @Override public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR)); indexUtil = new IndexUtil(conf); try { store = StorageUtils.createWebStore(conf, String.class, WebPage.class); } catch (ClassNotFoundException e) { throw new IOException(e); } } protected void cleanup(Context context) throws IOException ,InterruptedException { store.close(); }; @Override public void map(String key, WebPage page, Context context) throws IOException, InterruptedException { ParseStatus pstatus = page.getParseStatus(); if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus) || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) { return; // filter urls not parsed } Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page); if (!batchId.equals(REINDEX)) { if (!NutchJob.shouldProcess(mark, batchId)) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")"); } return; } } NutchDocument doc = indexUtil.index(key, page); if (doc == null) { return; } if (mark != null) { Mark.INDEX_MARK.putMark(page, Mark.UPDATEDB_MARK.checkMark(page)); store.put(key, page); } context.write(key, doc); } } 3、调用context.write() 由于  job.setOutputFormatClass(IndexerOutputFormat.class);  所以写入index?? (三)public class IndexUtil  1、调用index()方法 public NutchDocument index(String key, WebPage page) { NutchDocument doc = new NutchDocument(); doc.add("id", key); doc.add("digest", StringUtil.toHexString(page.getSignature())); if (page.getBatchId() != null) { doc.add("batchId", page.getBatchId().toString()); } String url = TableUtil.unreverseUrl(key); if (LOG.isDebugEnabled()) { LOG.debug("Indexing URL: " + url); } try { doc = filters.filter(doc, url, page); } catch (IndexingException e) { LOG.warn("Error indexing "+key+": "+e); return null; } // skip documents discarded by indexing filters if (doc == null) return null; float boost = 1.0f; // run scoring filters try { boost = scoringFilters.indexerScore(url, doc, page, boost); } catch (final ScoringFilterException e) { LOG.warn("Error calculating score " + key + ": " + e); return null; } doc.setScore(boost); // store boost for use by explain and dedup doc.add("boost", Float.toString(boost)); return doc; } 三、plugin中的字段索引 1、关于basic字段的索引在public class BasicIndexingFilter implements IndexingFilter 中

转载于:https://www.cnblogs.com/jinhong-lu/p/4559382.html

相关资源:JAVA上百实例源码以及开源项目
最新回复(0)