IT虾米网

Hbase(四) 过滤器查询详解

admin 2018年06月05日 大数据 473 0

引言:过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;

一、hbase过滤器的分类

   1、比较过滤器

      行键过滤器 RowFilter

Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);

      列族过滤器 FamilyFilter

Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);

     列过滤器 QualifierFilter

Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new  BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);

    值过滤器 ValueFilter

Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
scan.setFilter(filter1);

   2、专用过滤器

单列值过滤器 SingleColumnValueFilter ----会返回满足条件的整行

SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为 true,则那些不包含指定 column 的行也会返回
scan.setFilter(filter1);

单列值排除器 SingleColumnValueExcludeFilter -----返回排除了该列的结果 与上面的结果相反

前缀过滤器 PrefixFilter----针对行键

Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);

列前缀过滤器 ColumnPrefixFilter

Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);

分页过滤器 PageFilter

代码实现:

package com.ghgj.hbase; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.BinaryComparator; 
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; 
import org.apache.hadoop.hbase.filter.ByteArrayComparable; 
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.FamilyFilter; 
import org.apache.hadoop.hbase.filter.Filter; 
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; 
import org.apache.hadoop.hbase.filter.PageFilter; 
import org.apache.hadoop.hbase.filter.PrefixFilter; 
import org.apache.hadoop.hbase.filter.QualifierFilter; 
import org.apache.hadoop.hbase.filter.RegexStringComparator; 
import org.apache.hadoop.hbase.filter.RowFilter; 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
import org.apache.hadoop.hbase.filter.SubstringComparator; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.junit.Test; 
public class HbasePageDemo { 
// 声明静态配置 
static Configuration conf = null; 
private static final String ZK_CONNECT_STR = 
"hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181"; 
static { 
conf = HBaseConfiguration.create(); 
conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR); 
} 
public static void main(String[] args) throws Exception { 
String tableName = "testfilter"; 
String cfName = "f1"; 
final byte[] POSTFIX = new byte[] { 0x00 }; 
HTable table = new HTable(conf, tableName); 
Filter filter = new PageFilter(3); 
byte[] lastRow = null; 
int totalRows = 0; 
while (true) { 
Scan scan = new Scan(); 
scan.setFilter(filter); 
if(lastRow != null){ 
//注意这里添加了 POSTFIX 操作,用来重置扫描边界 
byte[] startRow = Bytes.add(lastRow,POSTFIX); 
scan.setStartRow(startRow); 
} 
ResultScanner scanner = table.getScanner(scan); 
int localRows = 0; 
Result result; 
while((result = scanner.next()) != null){ 
System.out.println(localRows++ + ":" + result); 
totalRows ++; 
lastRow = result.getRow(); 
} 
scanner.close(); 
if(localRows == 0) break; 
} 
System.out.println("total rows:" + totalRows); 
} / 
** 
* 多种过滤条件的使用方法 
* @throws Exception 
*/ 
@Test 
public void testScan() throws Exception{ 
HTable table = new HTable(conf, "person".getBytes()); 
Scan scan = new Scan(Bytes.toBytes("person_zhang_000001"), 
Bytes.toBytes("person_zhang_000002")); 
//前缀过滤器----针对行键 
Filter filter = new PrefixFilter(Bytes.toBytes("person")); 
//行过滤器 ---针对行键 
ByteArrayComparable rowComparator = new 
BinaryComparator(Bytes.toBytes("person_zhang_000001")); 
RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator); 
rf = new RowFilter(CompareOp.EQUAL , new 
SubstringComparator("_2016-12-31_")); 
//单值过滤器 1 完整匹配字节数组 
new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), 
CompareOp.EQUAL, "zhangsan".getBytes()); 
//单值过滤器 2 匹配正则表达式 
ByteArrayComparable comparator = new RegexStringComparator("zhang."); 
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), 
CompareOp.EQUAL, comparator); 
//单值过滤器 3 匹配是否包含子串,大小写不敏感 
comparator = new SubstringComparator("wu"); 
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), 
CompareOp.EQUAL, comparator); 
//键值对元数据过滤-----family 过滤----字节数组完整匹配 
FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL , 
new BinaryComparator(Bytes.toBytes("base_info")) //表中不存 
在 inf 列族,过滤结果为空 
); 
//键值对元数据过滤-----family 过滤----字节数组前缀匹配 
ff = new FamilyFilter( 
CompareOp.EQUAL , 
new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以 
inf 打头的列族 info,过滤结果为该列族所有行 
); 
//键值对元数据过滤-----qualifier 过滤----字节数组完整匹配 
filter = new QualifierFilter( 
CompareOp.EQUAL , 
new BinaryComparator(Bytes.toBytes("na")) //表中不存在 na 
列,过滤结果为空 
); 
filter = new QualifierFilter( 
CompareOp.EQUAL , 
new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以 
na 打头的列 name,过滤结果为所有行的该列数据 
); 
//基于列名(即 Qualifier)前缀过滤数据的 ColumnPrefixFilter 
filter = new ColumnPrefixFilter("na".getBytes()); 
//基于列名(即 Qualifier)多个前缀过滤数据的 MultipleColumnPrefixFilter 
byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")}; 
filter = new MultipleColumnPrefixFilter(prefixes); 
//为查询设置过滤条件 
scan.setFilter(filter); 
scan.addFamily(Bytes.toBytes("base_info")); 
//一行 
// Result result = table.get(get); 
//多行的数据 
ResultScanner scanner = table.getScanner(scan); 
for(Result r : scanner){ 
/** 
for(KeyValue kv : r.list()){ 
String family = new String(kv.getFamily()); 
System.out.println(family); 
String qualifier = new String(kv.getQualifier()); 
System.out.println(qualifier); 
System.out.println(new String(kv.getValue())); 
} 
*/ 
//直接从 result 中取到某个特定的 value 
byte[] value = r.getValue(Bytes.toBytes("base_info"), 
Bytes.toBytes("name")); 
System.out.println(new String(value)); 
} 
table.close(); 
} 
}

分页过滤器  代码实现:

package com.ghgj.hbase.test1610; 
 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.Filter; 
import org.apache.hadoop.hbase.filter.PageFilter; 
import org.apache.hadoop.hbase.util.Bytes; 
 
/** 
 * 501条 
 *  
 * 每页100条,求第四页 : 301 - 400 
 *  
 * pageIndex:第几页   
 * pageNumber:每页几条 
 * 
 * 
 * 在hbase当中取一部分数据的取法: 
 * scan 'user_info',{COLUMNS => 'base_info:name',  
 * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'} 
 *  
 * mysqL:从第几条开始,取多少条 
 *  
 * 从mysql的分页规则引申到hbase的分页:把startRow转换成mysql的第几条 
 */ 
public class HBasePageFilterDemo { 
 
	private static final String ZK_CONNECT_STR = "hadoop03:2181,hadoop04:2181,hadoop05:2181"; 
 
	private static final String TABLE_NAME = "user_info"; 
	private static final String FAMILY_BASIC = "base_info"; 
	private static final String FAMILY_EXTRA = "extra_info"; 
	private static final String COLUMN_NAME = "name"; 
	private static final String COLUMN_AGE = "age"; 
	private static final String ROW_KEY = "rk0001"; 
 
	private static Configuration config = null; 
	private static HTable table = null; 
	static { 
		config = HBaseConfiguration.create(); 
		config.set("hbase.zookeeper.quorum", ZK_CONNECT_STR); 
		try { 
			table = new HTable(config, TABLE_NAME); 
		} catch (Exception e) { 
			e.printStackTrace(); 
		} 
	} 
	 
	public static void main(String[] args) throws Exception { 
		 
//		ResultScanner pageData = getPageData("zhangsan_20150701_0001", 4); 
		ResultScanner pageData = getPageData(2, 4); 
		HBasePrintUtil.printResultScanner(pageData); 
		 
//		String lastRowkey = getLastRowkey(pageData); 
//		System.out.println(lastRowkey); 
		 
	} 
	 
	public static ResultScanner getPageData(int pageIndex, int pageNumber) throws Exception{ 
		// 怎么把pageIndex 转换成 startRow 
		String startRow = null; 
		 
		if(pageIndex == 1){	// 当客户方法只取第一页的分页数据时, 
			ResultScanner pageData = getPageData(startRow, pageNumber); 
			return pageData; 
			 
		}else{ 
			ResultScanner newPageData = null; 
			for(int i=0; i<pageIndex - 1; i++){	// 总共循环次数是比你取的页数少1 
				newPageData = getPageData(startRow, pageNumber); 
				startRow = getLastRowkey(newPageData); 
				byte[] add = Bytes.add(Bytes.toBytes(startRow), new byte[]{ 0X00 }); 
				startRow = Bytes.toString(add); 
			} 
			newPageData = getPageData(startRow, pageNumber); 
			return newPageData; 
		} 
	} 
	 
	/** 
	 * @param startRow 
	 * @param pageNumber 
	 * @return 
	 *  * scan 'user_info',{COLUMNS => 'base_info:name',  
     * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'} 
	 * @throws Exception  
	 */ 
	public static ResultScanner getPageData(String startRow, int pageNumber) throws Exception{ 
		Scan scan  = new Scan(); 
		scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name")); 
		// 設置當前查询的其实位置 
		if(!StringUtils.isBlank(startRow)){ 
			scan.setStartRow(Bytes.toBytes(startRow)); 
		} 
		// 第二个参数 
		Filter pageFilter = new PageFilter(pageNumber); 
		scan.setFilter(pageFilter); 
		 
		ResultScanner rs = table.getScanner(scan); 
		return rs; 
	} 
	 
	public static String getLastRowkey(ResultScanner rs){ 
		String lastRowkey = null; 
		for(Result result : rs){ 
//			System.out.println(result.getRow()); 
			lastRowkey = Bytes.toString(result.getRow()); 
		} 
		return lastRowkey; 
//		return null; 
	} 
}

多条件过滤时,可以使用FilterList

List<Filter> filters = new ArrayList<Filter>(); 
		SingleColumnValueFilter filter =new  SingleColumnValueFilter( 
				Bytes.toBytes("info"), 
				Bytes.toBytes("age"), 
				CompareOp.LESS_OR_EQUAL, 
				new BinaryComparator(Bytes.toBytes("20"))); 
		filters.add(filter); 
		 
		SingleColumnValueFilter filter1 =new  SingleColumnValueFilter( 
				Bytes.toBytes("info"), 
				Bytes.toBytes("age"), 
				CompareOp.GREATER, 
				new BinaryComparator(Bytes.toBytes("18"))); 
		 
		filters.add(filter1); 
		 
		Filter filter2 = new ValueFilter(CompareOp.EQUAL, new SubstringComparator("lisi") ); 
		filters.add(filter2); 
		 
		 
		FilterList f=new FilterList(filters); 
                scan.setFilter(f);
发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

Hbase(五) hbase内部原理详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。