@Bean(name = "couchbaseClient")
public CouchbaseClient couchbaseClient() throws IOException, URISyntaxException {
if (couchbaseClientEnabled) {
logger.info("Creating CouchbaseClient for servers: {}", couchbaseServers);
CouchbaseConnectionFactoryBuilder builder = new CouchbaseConnectionFactoryBuilder();
builder.setOpTimeout(opTimeout);
CouchbaseConnectionFactory cf = builder.buildCouchbaseConnection(getServersList(), couchbaseBucket,
StringUtils.trimAllWhitespace(Optional.ofNullable(couchbasePassword)
.orElse("")));
return new CouchbaseClient(
cf);
}
return null;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
int numMapTasks = conf.getInt("com.b5m.couchbase.num.map.tasks", 120);
final List<URI> ClientURIList = new ArrayList<URI>();
try {
List<String> uris = Arrays.asList(conf.get(
CouchbaseConfig.CB_INPUT_CLUSTER).split(","));
for (String uri : uris) {
final URI ClusterURI = new URI(uri);
ClientURIList.add(ClusterURI.resolve("/pools"));
}
} catch (URISyntaxException e) {
throw new IOException(e);
}
final String bucket = conf.get(CouchbaseConfig.CB_INPUT_BUCKET, "");
final String password = conf.get(CouchbaseConfig.CB_INPUT_PASSWORD, "");
final CouchbaseConnectionFactory fact = new CouchbaseConnectionFactory(
ClientURIList, bucket, password);
final com.couchbase.client.vbucket.config.Config vbconfig = fact
.getVBucketConfig();
final List<VBucket> allVBuckets = vbconfig.getVbuckets();
int numSplits = Math.min(numMapTasks, allVBuckets.size());
int numVBucketsPerSplit = (int) Math.ceil(allVBuckets.size()
/ (double) numSplits);
Log.info("VBuckets size = {}", allVBuckets.size());
@SuppressWarnings("unchecked")
final ArrayList<Integer>[] vblists = new ArrayList[numSplits];
int splitIndex = 0;
int vbuckets = 0;
for (int vbid = 0; vbid < allVBuckets.size(); vbid++) {
if (vbuckets >= numVBucketsPerSplit) {
vbuckets = 0;
splitIndex++;
}
if (null == vblists[splitIndex]) {
vblists[splitIndex] = new ArrayList<Integer>(
numVBucketsPerSplit);
}
vblists[splitIndex].add(vbid);
vbuckets++;
}
// int vbid = 0;
// for(VBucket v : allVBuckets) {
// if(vblists[v.getMaster()] == null) {
// vblists[v.getMaster()] = new ArrayList<Integer>();
// }
// vblists[v.getMaster()].add(vbid);
// vbid++;
// }
final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
for (ArrayList<Integer> vblist : vblists) {
if (null != vblist) {
splits.add(new CouchbaseSplit(vblist));
}
}
return splits;
}