在Ceph中一切数据(图片、视频,音频、文本、字符串等)都看成个对象来存储(读取其二进制流存储),而不以其格式来区分它们。
架构:
RADOS:底层存储系统
LIBRADOS:各种语言的客户端库
RADOSGW、RBD、CEPH FS:基于RADOS和LIBRADOS进一步实现的 Web API库、块存储、文件系统
其他概念:
pool(逻辑上,对象存储池)
pg(逻辑上,placement group对象归置组)
osd(物理上,object storage device可理解为一个硬盘,一台主机里可能有多个)
pool、pg是逻辑上的概念,起到namespace的作用,以对数据进行分区,在创建一个pool时指定pg数。
数据存储位置计算:
存储一个对象时毋庸置疑需要指定对象名(如 objectKey),此外也指定了所属的pool,要做的就是确定对象最终应存在哪个OSD上。
计算过程可由服务端完成,但是这样的话任何一个对象存储时都需要服务端计算存储位置,服务端压力会大。一个合格的分布式存储系统肯定应将此计算任务交由客户端来完成。
计算过程主要包括两部分(PS:数据分布原理很简单,官方吹嘘时总爱故弄玄虚。万变不离其宗,数据分布原理与读硕时所做时空数据索引系统的本质上大同小异):
1、由对象的pool和objectKey确定所属的pg,Ceph用hash实现(pg数改了后重hash的问题?由于创建一个pool时指定了pool数,相当于pg数固定所以不用考虑重hash?)
2、确定一个pg映射到哪个或哪些osd上,对应到多个是为了冗余存储提高可靠性。Ceph采用CRUSH算法,本质上就是一种伪随机选择的过程:
对于一个pg:
a、CRUSH_HASH( PG_ID, OSD_ID, r ) ===> draw :其和每个osd分别确定一个随机数,称为draw
b、( draw &0xffff ) * osd_weight ===> osd_straw :各osd的权重(该osd以T为单位的存储容量值)乘各自的draw得到一个值,称为straw
c、pick up high_osd_straw :选straw最大的osd作为pg应存入的osd
这里第一步中的 r 为一个常数,用于通过调节之来为同一个pg对应到多个osd上,如分别为0、1、2等。
原理图如下:
利用Ceph实现一个网盘:
1 package com.marchon.sensestudy.web; 2 3 import java.io.BufferedOutputStream; 4 import java.io.IOException; 5 import java.io.InputStream; 6 import java.net.URL; 7 import java.net.URLEncoder; 8 import java.util.ArrayList; 9 import java.util.Comparator; 10 import java.util.Date; 11 import java.util.HashMap; 12 import java.util.HashSet; 13 import java.util.List; 14 import java.util.Map; 15 import java.util.Set; 16 import java.util.regex.Pattern; 17 import java.util.stream.Collectors; 18 import java.util.zip.ZipEntry; 19 import java.util.zip.ZipOutputStream; 20 21 import javax.servlet.http.HttpServletRequest; 22 import javax.servlet.http.HttpServletResponse; 23 24 import org.slf4j.Logger; 25 import org.slf4j.LoggerFactory; 26 import org.springframework.beans.factory.annotation.Autowired; 27 import org.springframework.core.io.InputStreamResource; 28 import org.springframework.http.HttpStatus; 29 import org.springframework.http.MediaType; 30 import org.springframework.http.ResponseEntity; 31 import org.springframework.security.access.prepost.PreAuthorize; 32 import org.springframework.web.bind.annotation.DeleteMapping; 33 import org.springframework.web.bind.annotation.GetMapping; 34 import org.springframework.web.bind.annotation.PostMapping; 35 import org.springframework.web.bind.annotation.PutMapping; 36 import org.springframework.web.bind.annotation.RequestBody; 37 import org.springframework.web.bind.annotation.RequestParam; 38 import org.springframework.web.bind.annotation.RestController; 39 import org.springframework.web.multipart.MultipartFile; 40 41 import com.amazonaws.services.s3.model.CannedAccessControlList; 42 import com.amazonaws.services.s3.model.DeleteObjectsResult; 43 import com.amazonaws.services.s3.model.ObjectListing; 44 import com.amazonaws.services.s3.model.S3ObjectSummary; 45 import com.marchon.sensestudy.common.config.ConfigParam; 46 import com.marchon.sensestudy.common.utils.CephClientUtils; 47 import com.marchon.sensestudy.common.utils.ControllerUtils; 48 import com.marchon.sensestudy.responsewrapper.ApiCustomException; 49 import com.marchon.sensestudy.responsewrapper.ApiErrorCode; 50 51 /** 52 * 用户存储空间管理(网盘功能)。存储位于Ceph上,Ceph本身是key-value存储,为模拟文件系统目录,这里key采用层级路径形式:${userId}+分隔符+${absolutePath}。 53 * 可在读或写时模拟文件系统层级关系,若 读维护则增删快查改慢、写维护则与之相反。这里在读时维护 54 * 55 * note: 56 * 1. 增、改需要校验文件(夹)名有效性,查、删不需要。文件或目录名不能包含/等; 57 * 2. 在Ceph上并没有目录概念,也不存储目录,所有数据都是以key-value存储的,这里key命名时采用层级路径形式如/a/b/1.jpg、/a/b/2、/a/b/3/,展现给用户的文件夹或文件列表通过解析层级路径来获取。 58 * 为区分Ceph中一个key代表的是用户的文件还是文件夹,此以最后一层级后是否带"/"来区分:带与不带分别表示用户的文件夹和文件。可见: 59 *
1 package com.marchon.sensestudy.common.utils; 2 3 import java.io.BufferedReader; 4 import java.io.IOException; 5 import java.io.InputStream; 6 import java.io.InputStreamReader; 7 import java.io.UnsupportedEncodingException; 8 import java.net.URL; 9 import java.util.ArrayList; 10 import java.util.Date; 11 import java.util.List; 12 import java.util.stream.Collectors; 13 14 import org.springframework.web.multipart.MultipartFile; 15 16 import com.amazonaws.ClientConfiguration; 17 import com.amazonaws.Protocol; 18 import com.amazonaws.auth.AWSCredentials; 19 import com.amazonaws.auth.BasicAWSCredentials; 20 import com.amazonaws.services.s3.AmazonS3; 21 import com.amazonaws.services.s3.AmazonS3Client; 22 import com.amazonaws.services.s3.model.CannedAccessControlList; 23 import com.amazonaws.services.s3.model.CopyObjectResult; 24 import com.amazonaws.services.s3.model.DeleteObjectsRequest; 25 import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; 26 import com.amazonaws.services.s3.model.DeleteObjectsResult; 27 import com.amazonaws.services.s3.model.ListObjectsRequest; 28 import com.amazonaws.services.s3.model.ObjectListing; 29 import com.amazonaws.services.s3.model.ObjectMetadata; 30 import com.amazonaws.services.s3.model.PutObjectRequest; 31 import com.amazonaws.services.s3.model.PutObjectResult; 32 import com.amazonaws.services.s3.model.S3Object; 33 import com.marchon.sensestudy.common.config.ConfigParam; 34 35 public class CephClientUtils { 36 37 private static AmazonS3 amazonS3Client = null; 38 private static Protocol protocol = Protocol.HTTP; 39 public static String CEPH_URL_STR = protocol.name() + "://" + ConfigParam.cephHost + ":" + ConfigParam.cephPort; 40 41 static { 42 AWSCredentials credentials = new BasicAWSCredentials(ConfigParam.cephAccessKey, ConfigParam.cephSecretKey); 43 ClientConfiguration clientConfig = new ClientConfiguration(); 44 clientConfig.setProtocol(protocol); 45 amazonS3Client = new AmazonS3Client(credentials, clientConfig); 46 amazonS3Client.setEndpoint(ConfigParam.cephHost + ":" + ConfigParam.cephPort); 47 } 48 49 /** 批量删除key以指定前缀开头的数据 */ 50 public static DeleteObjectsResult deleteObjects(String bucketName, String keyPrefix) { 51 ListkeyStrs = amazonS3Client.listObjects(bucketName, keyPrefix).getObjectSummaries().stream() 52 .map(e -> e.getKey()).collect(Collectors.toList()); 53 List keys = keyStrs.stream().map(e -> new KeyVersion(e)).collect(Collectors.toList()); 54 DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName); 55 deleteObjectsRequest.setKeys(keys); 56 return amazonS3Client.deleteObjects(deleteObjectsRequest); 57 } 58 59 /** 删除精确指定key的数据 */ 60 public static void deleteObject(String bucketName, String objKey) { 61 amazonS3Client.deleteObject(bucketName, objKey); 62 } 63 64 /** 65 * 获取存储的资源信息列表。通过正确指定prefix和delimiter可以以类文件系统的方式列取资源信息,即若多个文件在当前目录的同一个子文件夹下则它们只以一个子文件夹的形合并显示在当前目录下。这里的合并规则为 66 * ”${prefix}\w*${first_delimiter}“,即 prefix到delimiter首次出现 67 * 间(包括prefix和delimiter自身)的subKey视为共有文件夹。缺点:这里的”文件夹“缺乏最近修改时间、大小等元信息。 68 * 典型应用: 69 * 若delimiter为空串则查得所有以prefix为前缀的key; 70 *若所存的key是以"/"分隔的如"xx/a/b"、"xx/a/b"等,则当delimiter为"/"是:若prefix以"/"结尾则查得的是当前目录下的所有文件夹或文件、否则查得的是当前目录下以prefix为前缀的所有文件夹或文件 71 * 72 * @return 返回的ObjectListing List至少有一个元素 73 */ 74 public static ListlistObjects(String bucketName, String keyPrefix, String delimiter) { 75 ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); 76 listObjectsRequest.setBucketName(bucketName); 77 listObjectsRequest.setPrefix(keyPrefix); 78 listObjectsRequest.setDelimiter(delimiter); 79 // listObjectsRequest.setMaxKeys(2);// 1000 in default 80 // listObjectsRequest.setMarker(null); 81 82 // 可能有多个truncate 83 List objectListings = new ArrayList<>(); 84 ObjectListing res = amazonS3Client.listObjects(listObjectsRequest); 85 objectListings.add(res); 86 while (res.isTruncated()) { 87 res = amazonS3Client.listNextBatchOfObjects(res); 88 objectListings.add(res); 89 } 90 return objectListings; 91 } 92 93 public static List listObjects(String bucketName, String keyPrefix) { 94 return listObjects(bucketName, keyPrefix, null); 95 } 96 97 /** 文件存储 */ 98 public static PutObjectResult writeObject(String bucketName, String objKey, MultipartFile file, 99 CannedAccessControlList cannedAccessControlList) {100 PutObjectResult res;101 if (!amazonS3Client.doesBucketExist(bucketName)) {102 amazonS3Client.createBucket(bucketName);103 }104 ObjectMetadata objectMetadata = new ObjectMetadata();105 objectMetadata.setContentLength(file.getSize()); // must set, otherwise stream contents will be buffered in106 // memory and could result in out of memory errors.107 // objectMetadata.getUserMetadata().put("type", "pdf");//are added in http request header, which cann't only108 // contain iso8859-1 charset109 InputStream inputStream = null;110 try {111 inputStream = file.getInputStream();112 } catch (IOException e1) {113 e1.printStackTrace();114 }115 res = amazonS3Client.putObject(new PutObjectRequest(bucketName, objKey, inputStream, objectMetadata));116 if (null != cannedAccessControlList) {117 amazonS3Client.setObjectAcl(bucketName, objKey, cannedAccessControlList);118 }119 try {120 inputStream.close();121 } catch (IOException e) {122 e.printStackTrace();123 }124 return res;125 }126 127 /** 文件获取 */128 public static S3Object getObject(String bucketName, String objKey) {129 return amazonS3Client.getObject(bucketName, objKey);130 }131 132 public static PutObjectResult writeObject(String bucketName, String objKey, MultipartFile file) {133 return writeObject(bucketName, objKey, file, null);134 }135 136 /** 生成直接访问的永久URL。要求被访问资源的权限为public才能访问到 */137 public static URL generatePublicUrl(String bucketName, String objKey) {138 return amazonS3Client.getUrl(bucketName, objKey);139 }140 141 /** 生成直接访问的临时URL,失效时间默认15min */142 // url formate returned: scheme://host[:port]/bucketName/objKey?{Query}143 public static URL generatePresignedUrl(String bucketName, String key, Date expiration) {144 return amazonS3Client.generatePresignedUrl(bucketName, key, expiration);// 失效时间点必设,默认为15min,最多只能7天145 }146 147 /**148 * 5 default metadata are set, for example: 149 * {Accept-Ranges=bytes, Content-Length=5, Content-Type=text/plain, ETag=5d41402abc4b2a76b9719d911017c592,150 * Last-Modified=Sun Nov 04 15:35:17 CST 2018}151 */152 public static ObjectMetadata getObjectMetaData(String bucketName, String objKey) {153 return amazonS3Client.getObjectMetadata(bucketName, objKey);154 }155 156 public static boolean isObjExist(String bucketName, String objKey) {157 return amazonS3Client.doesObjectExist(bucketName, objKey);158 }159 160 public static CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName,161 String destinationKey) {162 return amazonS3Client.copyObject(sourceBucketName, sourceKey, destinationBucketName, destinationKey);163 }164 165 /** 文本存储,会被s3 sdk以UTF-8格式编码成字节流存储 */166 public static PutObjectResult writeObject(String bucketName, String objKey, String objContent) {167 if (null == objContent) { // objContent为null时下面putObject会出错168 objContent = "";169 }170 if (!amazonS3Client.doesBucketExist(bucketName)) {171 amazonS3Client.createBucket(bucketName);172 }173 return amazonS3Client.putObject(bucketName, objKey, objContent);// String will be encoded to bytes with UTF-8174 // encoding.175 }176 177 /** 文本获取 */178 public static String getObjectStr(String bucketName, String objKey) {179 if (!amazonS3Client.doesObjectExist(bucketName, objKey)) { // 判断是否存在,不存在时下面直接获取会报错180 return null;181 }182 183 S3Object s3Object = amazonS3Client.getObject(bucketName, objKey);184 BufferedReader bufferedReader = null;185 try {186 bufferedReader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent(), "UTF-8"));// 存入时被UTF-8编码了,故对应之187 } catch (UnsupportedEncodingException e1) {188 e1.printStackTrace();189 }190 191 String res = null;192 try {193 res = bufferedReader.readLine();194 } catch (IOException e) {195 e.printStackTrace();196 }197 try {198 bufferedReader.close();199 } catch (IOException e) {200 e.printStackTrace();201 }202 203 return res;204 }205 206 private static byte[] getObjectBytes(String bucketName, String objKey) throws IOException {207 S3Object s3Object = amazonS3Client.getObject(bucketName, objKey);208 InputStream in = s3Object.getObjectContent();209 210 byte[] bytes = new byte[(int) s3Object.getObjectMetadata().getInstanceLength()];211 in.read(bytes);212 213 in.close();214 return bytes;215 }216 }