mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jake Mannix <jake.man...@gmail.com>
Subject Re: Questions about LDA CVB TopicModel class usage for inferring new docs to topics.
Date Fri, 26 Jul 2013 12:11:20 GMT
Hi Henry,

  You're right, this is not very well documented at all, maybe we can use
this
discussion to open up some JIRA tickets to document this better (both in
javadocs and on the wiki).


On Fri, Jul 26, 2013 at 12:46 AM, Henry Lee <honesthenry@gmail.com> wrote:

> I like to build an app where I build an LDA model offline periodically by
> Amazon EMR/Hadoop, and I make a document/topic inference for a new document
> online.
>
> I read a post/a reply about using LDA CVB model to match a new doc to
> topics
> (http://tinyurl.com/mef4czr)
>
> I have some questions about using LDA CVB TopicModel class which isn't well
> documented:
>
> Q: how many iterations are good? and why?
>

This will depend heavily on your number of topics and vocabulary size, as
well as the size of your document.  BUT I've found that I've never seen
much gains after 40-50 iterations, and most of the time there's not much
after 20 (I typically default to somewhere between 20 and 30 independent
of the document size).

The way you *really* want to be sure is to verify that the
p(doc | topic_distribution(doc)) has plateaued (equivalently, that the
perplexity
has plateaued).  The way to check this is to by periodically (maybe every 5
iterations) call model.perplexity(doc, docTopics), and verify that the
*relative*
decrease in that is always greater than epsilon, for some epsilon.  But of
course,
now instead of an arbitrary maxIters (20-30), you have an arbitrary epsilon
(0.001,
say).

So if you're really still having trouble with the arbitrariness of this
decision,
you can verify that the *ordering* of the topics (by probability) for the
document
has stabilized.  This will probably happen before perplexity has plateaued,
and
before complete convergence (as the probabilities will still be changing,
but the
relative sizes will have already set themselves up in an ordering).

Q: do we get model mutated by training w/ new doc? why?
>

No, we do not.  If you want to, this is easy to do, however: you can
either update the current model you're using (which means you need to
be careful about concurrency if you're running this as a singleton model
in a webservice of some kind), or you can accumulate the model updates
onto a new model.  To see this in action, look at the ModelTrainer class
in the same package, basically it works like this: after converging the
docTopics
vector, you take the SparseMatrix (docTopicModel) and just call
model.update(docTopicModel).  This is an asynchronous update which
updates the model in a multithreaded way.  I don't know how safe it is
to be simultaneously calling model.train() and model.update() on the same
model from different threads.

The way this is done in ModelTrainer, is that all calls to
ModelTrainer.train()
are async, and queue up incoming documents into a work queue, which
calls model.trainDocTopicModel()  (a read-only operation) on many docs
in parallel (from different threads), but the calls to model.update() are
by their nature also queued and threaded (so no topic vector in the model
is getting written to by more than one thread at a time).  But even here,
the question of how entirely safe/accurate it is to do these simultaneous
many-reads and single-writes (per topic).  In practice, the default
operation
in Mahout is to *not* read and write from the same model, while training,
but to read from one model (which was averaged over all mapper outputs
after the previous MR iteration) and write updates to a fresh empty model,
which is what will be emitted at the end of the iteration.  In theory,
running this in "online learning mode" should work, and converge way faster
(see e.g. Hoffman, Blei, and Bach:
pdf<http://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf>),
but warning: we're not doing what
they do here to guarantee convergence, so in theory this may not converge
at all.


> Q: what is inferred in this program? how to use that infer() method?
>

Infer is a weirdly named method - it is basically the LDA version of what
you
do in SVD/LSI where you project onto your reduced dimensional space, then
"re-embed" back in your original space.  What it is exactly is that it
calculates
the posterior probability of all of the original terms in your document,
given
the model and the p(topic | doc) distribution which the model converged to
for the given document.  It's a vector over terms in your original
vocabulary,
restricted to only those in the original input document, where the value at
term i is p(i) = sum_(topic x) (p(i|x) * p(x|doc)).

It can be a way of seeing what the model thinks the most relevant terms in
a document are.


> and Q: Has anyone seen good example usages/sample code of doing this kind
> of task?
>

Mahout clustering and classification really needs more of a service
interface
(collaborative filtering already has Taste), so that we can show off this
kind of
thing properly.  I have code which does this at Twitter, and while we could
open source it fairly easily, it doesn't really have "a place to live".


>
> Thanks,
> Henry Lee.
> See my code below:
>
> @Testpublic void testOfJakeMannixIdeaAndQuestions() { //
> jake.mannix@gmail.com
>     val conf = new Configuration();
>     val dictionary = readDictionary(new Path("/tmp/dictionary.file-0"),
> conf);
>     assertThat(dictionary.length, equalTo(41807));
>
>     // tfidf_vector represents a document in RandomAccessSparseVector.
>     val tfidf_vector = readTFVectorsInRange(new
> Path("/tmp/tfidf-vectors"), conf, 0, 1)[0].getSecond();
>     assertThat(tfidf_vector.size(), equalTo(41807));
>
>     // reads 'model' dense matrix (20 x 41K), and in 'topicSum' dense
> vector.
>     TopicModel model = readModel(dictionary, new
> Path("/tmp/reuters-lda-model-splits"), conf);
>     assertThat(model.getNumTopics(), equalTo(20));
>     assertThat(model.getNumTerms(), equalTo(41807));
>
>     val doc = tfidf_vector;
>     Vector docTopics = new DenseVector(new
> double[model.getNumTopics()]).assign(1.0/model.getNumTopics());
>     Matrix docTopicModel = new SparseRowMatrix(model.getNumTopics(),
> doc.size());
>
>     // Q: How many iterations are good? Why?
>     for (int i = 0; i < 100 /* maxItrs */; i++) {
>         model.trainDocTopicModel(doc, docTopics, docTopicModel);
>         System.out.println(docTopics.toString());
>         // Q: Do you think that 'model' got mutated, or not? why?
>     }
>
>     Vector inferred = model.infer(doc, docTopics);
>     System.out.println(inferred); // Q: What is this inferred? How can
> I use it?}
> @SneakyThrows({ IOException.class })private static Pair<String,
> Vector>[] readTFVectorsInRange(Path path, Configuration conf, int
> offset, int length) {
>     val seq = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
>     val documentName = new Text();
>     @SuppressWarnings("unchecked")
>     Pair<String, Vector>[] vectors = new Pair[length];
>     VectorWritable vector = new VectorWritable();
>     for (int i = 0; i < offset + length && seq.next(documentName,
> vector); i++) {
>         if (i >= offset) {
>             vectors[i - offset] = Pair.of(documentName.toString(),
> vector.get());
>         }
>     }
>     return vectors;}
> @SneakyThrows({ IOException.class })private static TopicModel
> readModel(String[] dictionary, Path path, Configuration conf) {
>     double alpha = 0.0001; // default: doc-topic smoothing
>     double eta = 0.0001; // default: term-topic smoothing
>     double modelWeight = 1f;
>     return new TopicModel(conf, eta, alpha, dictionary, 1,
> modelWeight, listModelPath(path, conf));}
> @SneakyThrows({ IOException.class })private static Path[]
> listModelPath(Path path, Configuration conf) {
>     if (FileSystem.get(conf).isFile(path)) {
>         return new Path[] { path };
>     } else {
>         val statuses = FileSystem.get(conf).listStatus(path,
> PathFilters.partFilter());
>         val modelPaths = new Path[statuses.length];
>         for (int i = 0; i < statuses.length; i++) {
>             modelPaths[i] = new
> Path(statuses[i].getPath().toUri().toString());
>         }
>         return modelPaths;
>     }}
> @SneakyThrows({ IOException.class })private static String[]
> readDictionary(Path path, Configuration conf) {
>     val term = new Text();
>     val id = new IntWritable();
>     val reader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
>     val termIds = ImmutableList.<Pair<String, Integer>>builder();
>     int maxId = 0;
>     while (reader.next(term, id)) {
>         termIds.add(Pair.of(term.toString(), id.get()));
>         maxId = max(maxId, id.get());
>     }
>     String[] terms = new String[maxId + 1];
>     for (val termId : termIds.build()) {
>         terms[termId.getSecond().intValue()] =
> termId.getFirst().toString();
>     }
>     return terms;}
>



-- 

  -jake

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message